View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.blocking;
18  
19  import java.io.Serializable;
20  import java.util.Collection;
21  import java.util.Map;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import net.sf.ehcache.CacheException;
25  import net.sf.ehcache.Ehcache;
26  import net.sf.ehcache.Element;
27  import net.sf.ehcache.concurrent.CacheLockProvider;
28  import net.sf.ehcache.concurrent.LockType;
29  import net.sf.ehcache.concurrent.StripedReadWriteLockSync;
30  import net.sf.ehcache.concurrent.Sync;
31  import net.sf.ehcache.constructs.EhcacheDecoratorAdapter;
32  import net.sf.ehcache.loader.CacheLoader;
33  
34  /***
35   * A blocking decorator for an Ehcache, backed by a {@link Ehcache}.
36   * <p/>
37   * It allows concurrent read access to elements already in the cache. If the element is null, other
38   * reads will block until an element with the same key is put into the cache.
39   * <p/>
40   * This is useful for constructing read-through or self-populating caches.
41   * <p/>
42   * This implementation uses the {@link net.sf.ehcache.concurrent.ReadWriteLockSync} class. If you wish to use
43   * this class, you will need the concurrent package in your class path.
44   * <p/>
45   * It features:
46   * <ul>
47   * <li>Excellent liveness.
48   * <li>Fine-grained locking on each element, rather than the cache as a whole.
49   * <li>Scalability to a large number of threads.
50   * </ul>
51   * <p/>
52   * "Hashtable / synchronizedMap uses the "one big fat lock" approach to guard the mutable state of the map.
53   * That works, but is a big concurrency bottleneck, as you've observed.  You went to the opposite extreme, one lock per key.
54   * That works (as long as you've got sufficient synchronization in the cache itself to protect its own data structures.)
55   * <p/>
56   * Lock striping is a middle ground, partitioning keys into a fixed number of subsets, like the trick used at large
57   * theaters for will-call ticket pickup -- there are separate lines for "A-F, G-M, N-R, and S-Z".
58   * This way, there are a fixed number of locks, each guarding (hopefully) 1/Nth of the keys."
59   * - Brian Goetz
60   * <p/>
61   * Further improvements to hashing suggested by Joe Bowbeer.
62   *
63   * @author Greg Luck
64   * @version $Id: BlockingCache.html 13146 2011-08-01 17:12:39Z oletizi $
65   */
66  public class BlockingCache extends EhcacheDecoratorAdapter {
67  
68      /***
69       * The amount of time to block a thread before a LockTimeoutException is thrown
70       */
71      protected volatile int timeoutMillis;
72  
73      private final int stripes;
74      private final AtomicReference<CacheLockProvider> cacheLockProviderReference;
75      
76      /***
77       * Creates a BlockingCache which decorates the supplied cache.
78       *
79       * @param cache           a backing ehcache.
80       * @param numberOfStripes how many stripes to has the keys against. Must be a non-zero even number. This is a trade-off between
81       *                        memory use and concurrency
82       * @throws CacheException shouldn't happen
83       * @since 1.2
84       */
85      public BlockingCache(final Ehcache cache, int numberOfStripes) throws CacheException {
86          super(cache);
87          this.stripes = numberOfStripes;
88          this.cacheLockProviderReference = new AtomicReference<CacheLockProvider>();
89      }
90  
91      /***
92       * Creates a BlockingCache which decorates the supplied cache.
93       *
94       * @param cache a backing ehcache.
95       * @throws CacheException shouldn't happen
96       * @since 1.6.1
97       */
98      public BlockingCache(final Ehcache cache) throws CacheException {
99          this(cache, StripedReadWriteLockSync.DEFAULT_NUMBER_OF_MUTEXES);
100     }
101 
102     private CacheLockProvider getCacheLockProvider() {
103         CacheLockProvider provider = cacheLockProviderReference.get();
104         while (provider == null) {
105             cacheLockProviderReference.compareAndSet(null, createCacheLockProvider());
106             provider = cacheLockProviderReference.get();
107         }
108         return provider;
109     }
110 
111     private CacheLockProvider createCacheLockProvider() {
112         Object context = underlyingCache.getInternalContext();
113         if (underlyingCache.getCacheConfiguration().isTerracottaClustered() && context != null) {
114             return (CacheLockProvider) context;
115         } else {
116             return new StripedReadWriteLockSync(stripes);
117         }
118     }
119 
120     /***
121      * Retrieve the EHCache backing cache
122      *
123      * @return the backing cache
124      */
125     protected Ehcache getCache() {
126         return underlyingCache;
127     }
128 
129     /***
130      * Looks up an entry.  Blocks if the entry is null until a call to {@link #put} is done
131      * to put an Element in.
132      * <p/>
133      * If a put is not done, the lock is never released.
134      * <p/>
135      * If this method throws an exception, it is the responsibility of the caller to catch that exception and call
136      * <code>put(new Element(key, null));</code> to release the lock acquired. See {@link net.sf.ehcache.constructs.blocking.SelfPopulatingCache}
137      * for an example.
138      * <p/>
139      * Note. If a LockTimeoutException is thrown while doing a <code>get</code> it means the lock was never acquired,
140      * therefore it is a threading error to call {@link #put}
141      *
142      * @throws LockTimeoutException if timeout millis is non zero and this method has been unable to
143      *                              acquire a lock in that time
144      * @throws RuntimeException     if thrown the lock will not released. Catch and call<code>put(new Element(key, null));</code>
145      *                              to release the lock acquired.
146      */
147     @Override
148     public Element get(final Object key) throws RuntimeException, LockTimeoutException {
149 
150         Sync lock = getLockForKey(key);
151         acquiredLockForKey(key, lock, LockType.READ);
152         Element element;
153         try {
154             element = underlyingCache.get(key);
155         } finally {
156             lock.unlock(LockType.READ);
157         }
158         if (element == null) {
159             acquiredLockForKey(key, lock, LockType.WRITE);
160             element = underlyingCache.getQuiet(key);
161             if (element != null) {
162                 if (underlyingCache.isStatisticsEnabled()) {
163                     element = underlyingCache.get(key);
164                 }
165                 lock.unlock(LockType.WRITE);
166             }
167         }
168         return element;
169     }
170 
171     private void acquiredLockForKey(final Object key, final Sync lock, final LockType lockType) {
172         if (timeoutMillis > 0) {
173             try {
174                 boolean acquired = lock.tryLock(lockType, timeoutMillis);
175                 if (!acquired) {
176                     StringBuilder message = new StringBuilder("Lock timeout. Waited more than ")
177                             .append(timeoutMillis)
178                             .append("ms to acquire lock for key ")
179                             .append(key).append(" on blocking cache ").append(underlyingCache.getName());
180                     throw new LockTimeoutException(message.toString());
181                 }
182             } catch (InterruptedException e) {
183                 throw new LockTimeoutException("Got interrupted while trying to acquire lock for key " + key, e);
184             }
185         } else {
186             lock.lock(lockType);
187         }
188     }
189 
190 
191     /***
192      * Gets the Sync to use for a given key.
193      *
194      * @param key the key
195      * @return one of a limited number of Sync's.
196      */
197     protected Sync getLockForKey(final Object key) {
198         return getCacheLockProvider().getSyncForKey(key);
199     }
200 
201     /***
202      * Adds an entry and unlocks it
203      */
204     @Override
205     public void put(Element element) {
206 
207         if (element == null) {
208             return;
209         }
210         Object key = element.getObjectKey();
211         Object value = element.getObjectValue();
212 
213         Sync lock = getLockForKey(key);
214         
215         if (!lock.isHeldByCurrentThread(LockType.WRITE)) {
216             lock.lock(LockType.WRITE);
217         }
218         try {
219             if (value != null) {
220                 underlyingCache.put(element);
221             } else {
222                 underlyingCache.remove(key);
223             }
224         } finally {
225             //Release the readlock here. This will have been acquired in the get, where the element was null
226             lock.unlock(LockType.WRITE);
227         }
228     }
229 
230     /***
231      * Gets an element from the cache. Updates Element Statistics
232      * <p/>
233      * Note that the Element's lastAccessTime is always the time of this get.
234      * Use {@link #getQuiet(Object)} to peak into the Element to see its last access time with get
235      *
236      * @param key a serializable value
237      * @return the element, or null, if it does not exist.
238      * @throws IllegalStateException if the cache is not {@link net.sf.ehcache.Status#STATUS_ALIVE}
239      * @see #isExpired
240      */
241     @Override
242     public Element get(Serializable key) throws IllegalStateException, CacheException {
243         return this.get((Object) key);
244     }
245 
246     /***
247      * Synchronized version of getName to test liveness of the object lock.
248      * <p/>
249      * The time taken for this method to return is a useful measure of runtime contention on the cache.
250      *
251      * @return the name of the cache.
252      */
253     public synchronized String liveness() {
254         return getName();
255     }
256 
257     /***
258      * Sets the time to wait to acquire a lock. This may be modified at any time.
259      * <p/>
260      * The consequences of setting a timeout are:
261      * <ol>
262      * <li>if a lock cannot be acquired in the given time a LockTimeoutException is thrown.
263      * <li>if there is a queue of threads waiting for the first thread to complete, but it does not complete within
264      * the time out period, the successive threads may find that they have exceeded their lock timeouts and fail. This
265      * is usually a good thing because it stops a build up of threads from overwhelming a busy resource, but it does
266      * need to be considered in the design of user interfaces. The timeout should be set no greater than the time a user
267      * would be expected to wait before considering the action will never return
268      * <li>it will be common to see a number of threads timeout trying to get the same lock. This is a normal and desired
269      * consequence.
270      * </ol>
271      * The consequences of not setting a timeout (or setting it to 0) are:
272      * <ol>
273      * <li>There are no partial failures in the system. But there is a greater possibility that a temporary overload
274      * in one part of the system can cause a back up that may take a long time to recover from.
275      * <li>A failing method that perhaps fails because a resource is overloaded will be hit by each thread in turn, no matter whether there is a still a user who
276      * cares about getting a response.
277      * </ol>
278      *
279      * @param timeoutMillis the time in ms. Must be a positive number. 0 means wait forever.
280      */
281     public void setTimeoutMillis(int timeoutMillis) {
282         if (timeoutMillis < 0) {
283             throw new CacheException("The lock timeout must be a positive number of ms. Value was " + timeoutMillis);
284         }
285         this.timeoutMillis = timeoutMillis;
286     }
287 
288     /***
289      * Gets the time to wait to acquire a lock.
290      *
291      * @return the time in ms.
292      */
293     public int getTimeoutMillis() {
294         return timeoutMillis;
295     }
296 
297     /***
298      * Register a {@link CacheLoader} with the cache. It will then be tied into the cache lifecycle.
299      * <p/>
300      * If the CacheLoader is not initialised, initialise it.
301      *
302      * @param cacheLoader A Cache Loader to register
303      */
304     @Override
305     public void registerCacheLoader(CacheLoader cacheLoader) {
306         throw new CacheException("This method is not appropriate for a blocking cache.");
307     }
308 
309     /***
310      * Unregister a {@link CacheLoader} with the cache. It will then be detached from the cache lifecycle.
311      *
312      * @param cacheLoader A Cache Loader to unregister
313      */
314     @Override
315     public void unregisterCacheLoader(CacheLoader cacheLoader) {
316         throw new CacheException("This method is not appropriate for a blocking cache.");
317     }
318 
319     /***
320      * This method is not appropriate to use with BlockingCache.
321      *
322      * @throws CacheException if this method is called
323      */
324     @Override
325     public Element getWithLoader(Object key, CacheLoader loader, Object loaderArgument) throws CacheException {
326         throw new CacheException("This method is not appropriate for a Blocking Cache");
327     }
328 
329     /***
330      * This method is not appropriate to use with BlockingCache.
331      *
332      * @throws CacheException if this method is called
333      */
334     @Override
335     public Map getAllWithLoader(Collection keys, Object loaderArgument) throws CacheException {
336         throw new CacheException("This method is not appropriate for a Blocking Cache");
337     }
338 
339     /***
340      * This method is not appropriate to use with BlockingCache.
341      *
342      * @throws CacheException if this method is called
343      */
344     @Override
345     public void load(Object key) throws CacheException {
346         throw new CacheException("This method is not appropriate for a Blocking Cache");
347     }
348 
349     /***
350      * This method is not appropriate to use with BlockingCache.
351      *
352      * @throws CacheException if this method is called
353      */
354     @Override
355     public void loadAll(Collection keys, Object argument) throws CacheException {
356         throw new CacheException("This method is not appropriate for a Blocking Cache");
357     }
358 }
359 
360 
361