View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Ehcache;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.Status;
23  
24  import java.io.Serializable;
25  import java.rmi.UnmarshalException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  
31  import org.slf4j.LoggerFactory;
32  import org.slf4j.Logger;
33  
34  /***
35   * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
36   * {@link CachePeer} peers of the Cache asynchronously.
37   * <p/>
38   * Updates are guaranteed to be replicated in the order in which they are received.
39   * <p/>
40   * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
41   * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
42   * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
43   * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
44   * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
45   * just using the DiskStore.
46   * <p/>
47   * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
48   * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
49   * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
50   * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
51   * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
52   * up, or put up with a few lost messages while the heap grows.
53   *
54   * @author Greg Luck
55   * @version $Id: RMIAsynchronousCacheReplicator.html 13146 2011-08-01 17:12:39Z oletizi $
56   */
57  public class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
58  
59  
60      private static final Logger LOG = LoggerFactory.getLogger(RMIAsynchronousCacheReplicator.class.getName());
61      
62  
63      /***
64       * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
65       */
66      protected Thread replicationThread = new ReplicationThread();
67  
68      /***
69       * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
70       * before checking again.
71       */
72      protected int asynchronousReplicationInterval;
73  
74      /***
75       * A queue of updates.
76       */
77      protected final Queue<CacheEventMessage> replicationQueue = new ConcurrentLinkedQueue<CacheEventMessage>();
78  
79      /***
80       * Constructor for internal and subclass use
81       */
82      public RMIAsynchronousCacheReplicator(
83              boolean replicatePuts,
84              boolean replicatePutsViaCopy,
85              boolean replicateUpdates,
86              boolean replicateUpdatesViaCopy,
87              boolean replicateRemovals,
88              int asynchronousReplicationInterval) {
89          super(replicatePuts,
90                  replicatePutsViaCopy,
91                  replicateUpdates,
92                  replicateUpdatesViaCopy,
93                  replicateRemovals);
94          this.asynchronousReplicationInterval = asynchronousReplicationInterval;
95          status = Status.STATUS_ALIVE;
96          replicationThread.start();
97      }
98  
99      /***
100      * RemoteDebugger method for the replicationQueue thread.
101      * <p/>
102      * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
103      */
104     private void replicationThreadMain() {
105         while (true) {
106             // Wait for elements in the replicationQueue
107             while (alive() && replicationQueue != null && replicationQueue.isEmpty()) {
108                 try {
109                     Thread.sleep(asynchronousReplicationInterval);
110                 } catch (InterruptedException e) {
111                     LOG.debug("Spool Thread interrupted.");
112                     return;
113                 }
114             }
115             if (notAlive()) {
116                 return;
117             }
118             try {
119                 flushReplicationQueue();
120             } catch (Throwable e) {
121                 LOG.error("Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
122             }
123         }
124     }
125 
126 
127     /***
128      * {@inheritDoc}
129      * <p/>
130      * This implementation queues the put notification for in-order replication to peers.
131      *
132      * @param cache   the cache emitting the notification
133      * @param element the element which was just put into the cache.
134      */
135     public final void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
136         if (notAlive()) {
137             return;
138         }
139 
140         if (!replicatePuts) {
141             return;
142         }
143 
144         if (replicatePutsViaCopy) {
145             if (!element.isSerializable()) {
146                 if (LOG.isWarnEnabled()) {
147                     LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
148                 }
149                 return;
150             }
151             addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
152         } else {
153             if (!element.isKeySerializable()) {
154                 if (LOG.isWarnEnabled()) {
155                     LOG.warn("Object with key " + element.getObjectKey()
156                             + " does not have a Serializable key and cannot be replicated via invalidate.");
157                 }
158                 return;
159             }
160             addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
161         }
162 
163     }
164 
165     /***
166      * Called immediately after an element has been put into the cache and the element already
167      * existed in the cache. This is thus an update.
168      * <p/>
169      * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
170      * will block until this method returns.
171      * <p/>
172      * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
173      * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
174      *
175      * @param cache   the cache emitting the notification
176      * @param element the element which was just put into the cache.
177      */
178     public final void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
179         if (notAlive()) {
180             return;
181         }
182         if (!replicateUpdates) {
183             return;
184         }
185 
186         if (replicateUpdatesViaCopy) {
187             if (!element.isSerializable()) {
188                 if (LOG.isWarnEnabled()) {
189                     LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy.");
190                 }
191                 return;
192             }
193             addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
194         } else {
195             if (!element.isKeySerializable()) {
196                 if (LOG.isWarnEnabled()) {
197                     LOG.warn("Object with key " + element.getObjectKey()
198                             + " does not have a Serializable key and cannot be replicated via invalidate.");
199                 }
200                 return;
201             }
202             addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
203         }
204     }
205 
206     /***
207      * Called immediately after an attempt to remove an element. The remove method will block until
208      * this method returns.
209      * <p/>
210      * This notification is received regardless of whether the cache had an element matching
211      * the removal key or not. If an element was removed, the element is passed to this method,
212      * otherwise a synthetic element, with only the key set is passed in.
213      * <p/>
214      *
215      * @param cache   the cache emitting the notification
216      * @param element the element just deleted, or a synthetic element with just the key set if
217      *                no element was removed.
218      */
219     public final void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
220         if (notAlive()) {
221             return;
222         }
223 
224         if (!replicateRemovals) {
225             return;
226         }
227 
228         if (!element.isKeySerializable()) {
229             if (LOG.isWarnEnabled()) {
230                 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
231             }
232             return;
233         }
234         addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
235     }
236 
237 
238     /***
239      * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
240      * elements have been removed from the cache in a bulk operation. The usual
241      * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
242      * is not called.
243      * <p/>
244      * This notification exists because clearing a cache is a special case. It is often
245      * not practical to serially process notifications where potentially millions of elements
246      * have been bulk deleted.
247      *
248      * @param cache the cache emitting the notification
249      */
250     public void notifyRemoveAll(final Ehcache cache) {
251         if (notAlive()) {
252             return;
253         }
254 
255         if (!replicateRemovals) {
256             return;
257         }
258 
259         addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE_ALL, cache, null, null));
260     }
261 
262 
263     /***
264      * Adds a message to the queue.
265      * <p/>
266      * This method checks the state of the replication thread and warns
267      * if it has stopped and then discards the message.
268      *
269      * @param cacheEventMessage
270      */
271     protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
272         if (!replicationThread.isAlive()) {
273             LOG.error("CacheEventMessages cannot be added to the replication queue because the replication thread has died.");
274         } else {
275             replicationQueue.add(cacheEventMessage);
276         }
277     }
278 
279 
280     /***
281      * Gets called once per {@link #asynchronousReplicationInterval}.
282      * <p/>
283      * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
284      * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
285      * <p/>
286      * Makes a copy of the queue so as not to hold up the enqueue operations.
287      * <p/>
288      * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
289      * due to peers becoming unavailable.
290      * <p/>
291      * This method issues warnings for problems that can be fixed with configuration changes.
292      */
293     private void flushReplicationQueue() {
294         CacheEventMessage head = replicationQueue.peek();
295         if (head == null) {
296             return;
297         }
298         Ehcache cache = head.cache;
299         List cachePeers = listRemoteCachePeers(cache);
300 
301         int limit = replicationQueue.size();
302         List<EventMessage> resolvedEventMessages = extractAndResolveEventMessages(limit);
303 
304         for (int j = 0; j < cachePeers.size(); j++) {
305             CachePeer cachePeer = (CachePeer) cachePeers.get(j);
306             try {
307                 cachePeer.send(resolvedEventMessages);
308             } catch (UnmarshalException e) {
309                 String message = e.getMessage();
310                 if (message.contains("Read time out") || message.contains("Read timed out")) {
311                     LOG.warn("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
312                             " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
313                             "Message was: " + message);
314                 } else {
315                     LOG.debug("Unable to send message to remote peer.  Message was: " + message);
316                 }
317             } catch (Throwable t) {
318                 LOG.warn("Unable to send message to remote peer.  Message was: " + t.getMessage(), t);
319             }
320         }
321         if (LOG.isWarnEnabled()) {
322             int eventMessagesNotResolved = limit - resolvedEventMessages.size();
323             if (eventMessagesNotResolved > 0) {
324                 LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
325                         "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
326                         "starting heap size to a higher value.");
327             }
328 
329         }
330     }
331 
332     /***
333      * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
334      * <p/>
335      * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
336      * propagated. This only affects puts and updates via copy.
337      *
338      * @param replicationQueueCopy
339      * @return a list of EventMessages which were able to be resolved
340      */
341     private List extractAndResolveEventMessages(int limit) {
342         List list = new ArrayList();
343         for (int i = 0; i < limit; i++) {
344             CacheEventMessage message = replicationQueue.poll();
345             if (message == null) {
346                 break;
347             } else {
348                 EventMessage eventMessage = message.getEventMessage();
349                 if (eventMessage != null && eventMessage.isValid()) {
350                     list.add(eventMessage);
351                 }
352             }
353         }
354         return list;
355     }
356 
357     /***
358      * A background daemon thread that writes objects to the file.
359      */
360     private final class ReplicationThread extends Thread {
361         public ReplicationThread() {
362             super("Replication Thread");
363             setDaemon(true);
364             setPriority(Thread.NORM_PRIORITY);
365         }
366 
367         /***
368          * RemoteDebugger thread method.
369          */
370         public final void run() {
371             replicationThreadMain();
372         }
373     }
374 
375 
376     /***
377      * A wrapper around an EventMessage, which enables the element to be enqueued along with
378      * what is to be done with it.
379      * <p/>
380      * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
381      * the cause of an {@link OutOfMemoryError}
382      */
383     private static class CacheEventMessage {
384 
385         private final Ehcache cache;
386         private final EventMessage eventMessage;
387 
388         public CacheEventMessage(int event, Ehcache cache, Element element, Serializable key) {
389             eventMessage = new EventMessage(event, key, element);
390             this.cache = cache;
391         }
392 
393         /***
394          * Gets the component EventMessage
395          */
396         public final EventMessage getEventMessage() {
397             return eventMessage;
398         }
399 
400     }
401 
402     /***
403      * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
404      */
405     public final void dispose() {
406         status = Status.STATUS_SHUTDOWN;
407         flushReplicationQueue();
408     }
409 
410 
411     /***
412      * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
413      * <p/>
414      * This may not be possible for listeners after they have been initialized. Implementations should throw
415      * CloneNotSupportedException if they do not support clone.
416      *
417      * @return a clone
418      * @throws CloneNotSupportedException if the listener could not be cloned.
419      */
420     public Object clone() throws CloneNotSupportedException {
421         //shutup checkstyle
422         super.clone();
423         return new RMIAsynchronousCacheReplicator(replicatePuts, replicatePutsViaCopy,
424                 replicateUpdates, replicateUpdatesViaCopy, replicateRemovals, asynchronousReplicationInterval);
425     }
426 
427 
428 }