1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.Ehcache;
21 import net.sf.ehcache.Element;
22 import net.sf.ehcache.Status;
23
24 import java.io.Serializable;
25 import java.rmi.UnmarshalException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30
31 import org.slf4j.LoggerFactory;
32 import org.slf4j.Logger;
33
34 /***
35 * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
36 * {@link CachePeer} peers of the Cache asynchronously.
37 * <p/>
38 * Updates are guaranteed to be replicated in the order in which they are received.
39 * <p/>
40 * While much faster in operation than {@link RMISynchronousCacheReplicator}, it does suffer from a number
41 * of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
42 * are being held to them from {@link EventMessage}s which are queued up. The replication thread runs once
43 * per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
44 * to get an {@link OutOfMemoryError} using distribution in circumstances when it would not happen if we were
45 * just using the DiskStore.
46 * <p/>
47 * Accordingly, the Element values in {@link EventMessage}s are held by {@link java.lang.ref.SoftReference} in the queue,
48 * so that they can be discarded if required by the GC to avoid an {@link OutOfMemoryError}. A log message
49 * will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
50 * of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
51 * The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
52 * up, or put up with a few lost messages while the heap grows.
53 *
54 * @author Greg Luck
55 * @version $Id: RMIAsynchronousCacheReplicator.html 13146 2011-08-01 17:12:39Z oletizi $
56 */
57 public class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator {
58
59
60 private static final Logger LOG = LoggerFactory.getLogger(RMIAsynchronousCacheReplicator.class.getName());
61
62
63 /***
64 * A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
65 */
66 protected Thread replicationThread = new ReplicationThread();
67
68 /***
69 * The amount of time the replication thread sleeps after it detects the replicationQueue is empty
70 * before checking again.
71 */
72 protected int asynchronousReplicationInterval;
73
74 /***
75 * A queue of updates.
76 */
77 protected final Queue<CacheEventMessage> replicationQueue = new ConcurrentLinkedQueue<CacheEventMessage>();
78
79 /***
80 * Constructor for internal and subclass use
81 */
82 public RMIAsynchronousCacheReplicator(
83 boolean replicatePuts,
84 boolean replicatePutsViaCopy,
85 boolean replicateUpdates,
86 boolean replicateUpdatesViaCopy,
87 boolean replicateRemovals,
88 int asynchronousReplicationInterval) {
89 super(replicatePuts,
90 replicatePutsViaCopy,
91 replicateUpdates,
92 replicateUpdatesViaCopy,
93 replicateRemovals);
94 this.asynchronousReplicationInterval = asynchronousReplicationInterval;
95 status = Status.STATUS_ALIVE;
96 replicationThread.start();
97 }
98
99 /***
100 * RemoteDebugger method for the replicationQueue thread.
101 * <p/>
102 * Note that the replicationQueue thread locks the cache for the entire time it is writing elements to the disk.
103 */
104 private void replicationThreadMain() {
105 while (true) {
106
107 while (alive() && replicationQueue != null && replicationQueue.isEmpty()) {
108 try {
109 Thread.sleep(asynchronousReplicationInterval);
110 } catch (InterruptedException e) {
111 LOG.debug("Spool Thread interrupted.");
112 return;
113 }
114 }
115 if (notAlive()) {
116 return;
117 }
118 try {
119 flushReplicationQueue();
120 } catch (Throwable e) {
121 LOG.error("Exception on flushing of replication queue: " + e.getMessage() + ". Continuing...", e);
122 }
123 }
124 }
125
126
127 /***
128 * {@inheritDoc}
129 * <p/>
130 * This implementation queues the put notification for in-order replication to peers.
131 *
132 * @param cache the cache emitting the notification
133 * @param element the element which was just put into the cache.
134 */
135 public final void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
136 if (notAlive()) {
137 return;
138 }
139
140 if (!replicatePuts) {
141 return;
142 }
143
144 if (replicatePutsViaCopy) {
145 if (!element.isSerializable()) {
146 if (LOG.isWarnEnabled()) {
147 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
148 }
149 return;
150 }
151 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
152 } else {
153 if (!element.isKeySerializable()) {
154 if (LOG.isWarnEnabled()) {
155 LOG.warn("Object with key " + element.getObjectKey()
156 + " does not have a Serializable key and cannot be replicated via invalidate.");
157 }
158 return;
159 }
160 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
161 }
162
163 }
164
165 /***
166 * Called immediately after an element has been put into the cache and the element already
167 * existed in the cache. This is thus an update.
168 * <p/>
169 * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
170 * will block until this method returns.
171 * <p/>
172 * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
173 * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
174 *
175 * @param cache the cache emitting the notification
176 * @param element the element which was just put into the cache.
177 */
178 public final void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
179 if (notAlive()) {
180 return;
181 }
182 if (!replicateUpdates) {
183 return;
184 }
185
186 if (replicateUpdatesViaCopy) {
187 if (!element.isSerializable()) {
188 if (LOG.isWarnEnabled()) {
189 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be updated via copy.");
190 }
191 return;
192 }
193 addToReplicationQueue(new CacheEventMessage(EventMessage.PUT, cache, element, null));
194 } else {
195 if (!element.isKeySerializable()) {
196 if (LOG.isWarnEnabled()) {
197 LOG.warn("Object with key " + element.getObjectKey()
198 + " does not have a Serializable key and cannot be replicated via invalidate.");
199 }
200 return;
201 }
202 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
203 }
204 }
205
206 /***
207 * Called immediately after an attempt to remove an element. The remove method will block until
208 * this method returns.
209 * <p/>
210 * This notification is received regardless of whether the cache had an element matching
211 * the removal key or not. If an element was removed, the element is passed to this method,
212 * otherwise a synthetic element, with only the key set is passed in.
213 * <p/>
214 *
215 * @param cache the cache emitting the notification
216 * @param element the element just deleted, or a synthetic element with just the key set if
217 * no element was removed.
218 */
219 public final void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
220 if (notAlive()) {
221 return;
222 }
223
224 if (!replicateRemovals) {
225 return;
226 }
227
228 if (!element.isKeySerializable()) {
229 if (LOG.isWarnEnabled()) {
230 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
231 }
232 return;
233 }
234 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE, cache, null, element.getKey()));
235 }
236
237
238 /***
239 * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
240 * elements have been removed from the cache in a bulk operation. The usual
241 * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
242 * is not called.
243 * <p/>
244 * This notification exists because clearing a cache is a special case. It is often
245 * not practical to serially process notifications where potentially millions of elements
246 * have been bulk deleted.
247 *
248 * @param cache the cache emitting the notification
249 */
250 public void notifyRemoveAll(final Ehcache cache) {
251 if (notAlive()) {
252 return;
253 }
254
255 if (!replicateRemovals) {
256 return;
257 }
258
259 addToReplicationQueue(new CacheEventMessage(EventMessage.REMOVE_ALL, cache, null, null));
260 }
261
262
263 /***
264 * Adds a message to the queue.
265 * <p/>
266 * This method checks the state of the replication thread and warns
267 * if it has stopped and then discards the message.
268 *
269 * @param cacheEventMessage
270 */
271 protected void addToReplicationQueue(CacheEventMessage cacheEventMessage) {
272 if (!replicationThread.isAlive()) {
273 LOG.error("CacheEventMessages cannot be added to the replication queue because the replication thread has died.");
274 } else {
275 replicationQueue.add(cacheEventMessage);
276 }
277 }
278
279
280 /***
281 * Gets called once per {@link #asynchronousReplicationInterval}.
282 * <p/>
283 * Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
284 * 1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
285 * <p/>
286 * Makes a copy of the queue so as not to hold up the enqueue operations.
287 * <p/>
288 * Any exceptions are caught so that the replication thread does not die, and because errors are expected,
289 * due to peers becoming unavailable.
290 * <p/>
291 * This method issues warnings for problems that can be fixed with configuration changes.
292 */
293 private void flushReplicationQueue() {
294 CacheEventMessage head = replicationQueue.peek();
295 if (head == null) {
296 return;
297 }
298 Ehcache cache = head.cache;
299 List cachePeers = listRemoteCachePeers(cache);
300
301 int limit = replicationQueue.size();
302 List<EventMessage> resolvedEventMessages = extractAndResolveEventMessages(limit);
303
304 for (int j = 0; j < cachePeers.size(); j++) {
305 CachePeer cachePeer = (CachePeer) cachePeers.get(j);
306 try {
307 cachePeer.send(resolvedEventMessages);
308 } catch (UnmarshalException e) {
309 String message = e.getMessage();
310 if (message.contains("Read time out") || message.contains("Read timed out")) {
311 LOG.warn("Unable to send message to remote peer due to socket read timeout. Consider increasing" +
312 " the socketTimeoutMillis setting in the cacheManagerPeerListenerFactory. " +
313 "Message was: " + message);
314 } else {
315 LOG.debug("Unable to send message to remote peer. Message was: " + message);
316 }
317 } catch (Throwable t) {
318 LOG.warn("Unable to send message to remote peer. Message was: " + t.getMessage(), t);
319 }
320 }
321 if (LOG.isWarnEnabled()) {
322 int eventMessagesNotResolved = limit - resolvedEventMessages.size();
323 if (eventMessagesNotResolved > 0) {
324 LOG.warn(eventMessagesNotResolved + " messages were discarded on replicate due to reclamation of " +
325 "SoftReferences by the VM. Consider increasing the maximum heap size and/or setting the " +
326 "starting heap size to a higher value.");
327 }
328
329 }
330 }
331
332 /***
333 * Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
334 * <p/>
335 * If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
336 * propagated. This only affects puts and updates via copy.
337 *
338 * @param replicationQueueCopy
339 * @return a list of EventMessages which were able to be resolved
340 */
341 private List extractAndResolveEventMessages(int limit) {
342 List list = new ArrayList();
343 for (int i = 0; i < limit; i++) {
344 CacheEventMessage message = replicationQueue.poll();
345 if (message == null) {
346 break;
347 } else {
348 EventMessage eventMessage = message.getEventMessage();
349 if (eventMessage != null && eventMessage.isValid()) {
350 list.add(eventMessage);
351 }
352 }
353 }
354 return list;
355 }
356
357 /***
358 * A background daemon thread that writes objects to the file.
359 */
360 private final class ReplicationThread extends Thread {
361 public ReplicationThread() {
362 super("Replication Thread");
363 setDaemon(true);
364 setPriority(Thread.NORM_PRIORITY);
365 }
366
367 /***
368 * RemoteDebugger thread method.
369 */
370 public final void run() {
371 replicationThreadMain();
372 }
373 }
374
375
376 /***
377 * A wrapper around an EventMessage, which enables the element to be enqueued along with
378 * what is to be done with it.
379 * <p/>
380 * The wrapper holds a {@link java.lang.ref.SoftReference} to the {@link EventMessage}, so that the queue is never
381 * the cause of an {@link OutOfMemoryError}
382 */
383 private static class CacheEventMessage {
384
385 private final Ehcache cache;
386 private final EventMessage eventMessage;
387
388 public CacheEventMessage(int event, Ehcache cache, Element element, Serializable key) {
389 eventMessage = new EventMessage(event, key, element);
390 this.cache = cache;
391 }
392
393 /***
394 * Gets the component EventMessage
395 */
396 public final EventMessage getEventMessage() {
397 return eventMessage;
398 }
399
400 }
401
402 /***
403 * Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
404 */
405 public final void dispose() {
406 status = Status.STATUS_SHUTDOWN;
407 flushReplicationQueue();
408 }
409
410
411 /***
412 * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
413 * <p/>
414 * This may not be possible for listeners after they have been initialized. Implementations should throw
415 * CloneNotSupportedException if they do not support clone.
416 *
417 * @return a clone
418 * @throws CloneNotSupportedException if the listener could not be cloned.
419 */
420 public Object clone() throws CloneNotSupportedException {
421
422 super.clone();
423 return new RMIAsynchronousCacheReplicator(replicatePuts, replicatePutsViaCopy,
424 replicateUpdates, replicateUpdatesViaCopy, replicateRemovals, asynchronousReplicationInterval);
425 }
426
427
428 }