View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Ehcache;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.Status;
23  
24  import java.io.Serializable;
25  import java.util.List;
26  
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  /***
31   * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
32   * {@link CachePeer} peers of the Cache.
33   *
34   * @author Greg Luck
35   * @version $Id: RMISynchronousCacheReplicator.html 13146 2011-08-01 17:12:39Z oletizi $
36   */
37  public class RMISynchronousCacheReplicator implements CacheReplicator {
38  
39      private static final Logger LOG = LoggerFactory.getLogger(RMISynchronousCacheReplicator.class.getName());
40  
41  
42      /***
43       * The status of the replicator. Only replicates when <code>STATUS_ALIVE</code>
44       */
45      protected Status status;
46  
47      /***
48       * Whether to replicate puts.
49       */
50      protected final boolean replicatePuts;
51  
52      /***
53       * Whether a put should replicated by copy or by invalidation, (a remove).
54       * <p/>
55       * By copy is best when the entry is expensive to produce. By invalidation is best when
56       * we are really trying to force other caches to sync back to a canonical source like a database.
57       * An example of a latter usage would be a read/write cache being used in Hibernate.
58       * <p/>
59       * This setting only has effect if <code>#replicateUpdates</code> is true.
60       */
61      protected boolean replicatePutsViaCopy;
62  
63      /***
64       * Whether to replicate updates.
65       */
66      protected final boolean replicateUpdates;
67  
68      /***
69       * Whether an update (a put) should be by copy or by invalidation, (a remove).
70       * <p/>
71       * By copy is best when the entry is expensive to produce. By invalidation is best when
72       * we are really trying to force other caches to sync back to a canonical source like a database.
73       * An example of a latter usage would be a read/write cache being used in Hibernate.
74       * <p/>
75       * This setting only has effect if <code>#replicateUpdates</code> is true.
76       */
77      protected final boolean replicateUpdatesViaCopy;
78      /***
79       * Whether to replicate removes
80       */
81      protected final boolean replicateRemovals;
82  
83      /***
84       * Constructor for internal and subclass use
85       *
86       * @param replicatePuts
87       * @param replicateUpdates
88       * @param replicateUpdatesViaCopy
89       * @param replicateRemovals
90       */
91      public RMISynchronousCacheReplicator(
92              boolean replicatePuts,
93              boolean replicatePutsViaCopy,
94              boolean replicateUpdates,
95              boolean replicateUpdatesViaCopy,
96              boolean replicateRemovals) {
97          this.replicatePuts = replicatePuts;
98          this.replicatePutsViaCopy = replicatePutsViaCopy;
99          this.replicateUpdates = replicateUpdates;
100         this.replicateUpdatesViaCopy = replicateUpdatesViaCopy;
101         this.replicateRemovals = replicateRemovals;
102         status = Status.STATUS_ALIVE;
103     }
104 
105     /***
106      * Called immediately after an element has been put into the cache. The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
107      * will block until this method returns.
108      * <p/>
109      * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
110      * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
111      *
112      * @param cache   the cache emitting the notification
113      * @param element the element which was just put into the cache.
114      */
115     public void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
116         if (notAlive()) {
117             return;
118         }
119 
120         if (!replicatePuts) {
121             return;
122         }
123 
124         if (!element.isSerializable()) {
125             if (LOG.isWarnEnabled()) {
126                 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
127             }
128             return;
129         }
130 
131         if (replicatePutsViaCopy) {
132             replicatePutNotification(cache, element);
133         } else {
134             replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
135         }
136     }
137 
138     /***
139      * Does the actual RMI remote call.
140      * <p/>
141      * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
142      * peers will continue.
143      */
144     protected static void replicatePutNotification(Ehcache cache, Element element) throws RemoteCacheException {
145         List cachePeers = listRemoteCachePeers(cache);
146         for (Object cachePeer1 : cachePeers) {
147             CachePeer cachePeer = (CachePeer) cachePeer1;
148             try {
149                 cachePeer.put(element);
150             } catch (Throwable t) {
151                 LOG.error("Exception on replication of putNotification. " + t.getMessage() + ". Continuing...", t);
152             }
153         }
154     }
155 
156 
157     /***
158      * Called immediately after an element has been put into the cache and the element already
159      * existed in the cache. This is thus an update.
160      * <p/>
161      * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
162      * will block until this method returns.
163      * <p/>
164      * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
165      * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
166      *
167      * @param cache   the cache emitting the notification
168      * @param element the element which was just put into the cache.
169      */
170     public void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
171         if (notAlive()) {
172             return;
173         }
174         if (!replicateUpdates) {
175             return;
176         }
177 
178         if (replicateUpdatesViaCopy) {
179             if (!element.isSerializable()) {
180                 if (LOG.isWarnEnabled()) {
181                     LOG.warn("Object with key " + element.getObjectKey()
182                             + " is not Serializable and cannot be updated via copy");
183                 }
184                 return;
185             }
186 
187             replicatePutNotification(cache, element);
188         } else {
189             if (!element.isKeySerializable()) {
190                 if (LOG.isWarnEnabled()) {
191                     LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
192                 }
193                 return;
194             }
195 
196             replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
197         }
198     }
199 
200     /***
201      * Called immediately after an attempt to remove an element. The remove method will block until
202      * this method returns.
203      * <p/>
204      * This notification is received regardless of whether the cache had an element matching
205      * the removal key or not. If an element was removed, the element is passed to this method,
206      * otherwise a synthetic element, with only the key set is passed in.
207      * <p/>
208      *
209      * @param cache   the cache emitting the notification
210      * @param element the element just deleted, or a synthetic element with just the key set if
211      *                no element was removed.param element just deleted
212      */
213     public void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
214         if (notAlive()) {
215             return;
216         }
217 
218         if (!replicateRemovals) {
219             return;
220         }
221 
222         if (!element.isKeySerializable()) {
223             if (LOG.isWarnEnabled()) {
224                 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
225             }
226             return;
227         }
228 
229         replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
230     }
231 
232     /***
233      * Does the actual RMI remote call.
234      * <p/>
235      * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
236      * peers will continue.
237      */
238     protected static void replicateRemovalNotification(Ehcache cache, Serializable key) throws RemoteCacheException {
239         List cachePeers = listRemoteCachePeers(cache);
240         for (Object cachePeer1 : cachePeers) {
241             CachePeer cachePeer = (CachePeer) cachePeer1;
242             try {
243                 cachePeer.remove(key);
244             } catch (Throwable t) {
245                 LOG.error("Exception on replication of removeNotification. " + t.getMessage() + ". Continuing...", t);
246             }
247         }
248     }
249 
250 
251     /***
252      * {@inheritDoc}
253      * <p/>
254      * This implementation does not propagate expiries. It does not need to do anything because the element will
255      * expire in the remote cache at the same time. If the remote peer is not configured the same way they should
256      * not be in an cache cluster.
257      */
258     public final void notifyElementExpired(final Ehcache cache, final Element element) {
259         /*do not propagate expiries. The element should expire in the remote cache at the same time, thus
260           preseerving coherency.
261           */
262     }
263 
264     /***
265      * Called immediately after an element is evicted from the cache. Evicted in this sense
266      * means evicted from one store and not moved to another, so that it exists nowhere in the
267      * local cache.
268      * <p/>
269      * In a sense the Element has been <i>removed</i> from the cache, but it is different,
270      * thus the separate notification.
271      * <p/>
272      * This replicator does not propagate these events
273      *
274      * @param cache   the cache emitting the notification
275      * @param element the element that has just been evicted
276      */
277     public void notifyElementEvicted(final Ehcache cache, final Element element) {
278         /***
279          * do not notify these
280          */
281     }
282 
283 
284     /***
285      * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
286      * elements have been removed from the cache in a bulk operation. The usual
287      * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
288      * is not called.
289      * <p/>
290      * This notification exists because clearing a cache is a special case. It is often
291      * not practical to serially process notifications where potentially millions of elements
292      * have been bulk deleted.
293      *
294      * @param cache the cache emitting the notification
295      */
296     public void notifyRemoveAll(final Ehcache cache) {
297         if (notAlive()) {
298             return;
299         }
300 
301         if (!replicateRemovals) {
302             return;
303         }
304 
305         replicateRemoveAllNotification(cache);
306     }
307 
308     /***
309      * Does the actual RMI remote call.
310      *
311      * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
312      * peers will continue.
313      *
314      */
315     protected void replicateRemoveAllNotification(Ehcache cache) {
316         List cachePeers = listRemoteCachePeers(cache);
317         for (Object cachePeer1 : cachePeers) {
318             CachePeer cachePeer = (CachePeer) cachePeer1;
319             try {
320                 cachePeer.removeAll();
321             } catch (Throwable t) {
322                 LOG.error("Exception on replication of removeAllNotification. " + t.getMessage() + ". Continuing...", t);
323             }
324         }
325     }
326 
327     /***
328      * Package protected List of cache peers
329      *
330      * @param cache
331      * @return a list of {@link CachePeer} peers for the given cache, excluding the local peer.
332      */
333     static List listRemoteCachePeers(Ehcache cache) {
334         CacheManagerPeerProvider provider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
335         return provider.listRemoteCachePeers(cache);
336     }
337 
338 
339     /***
340      * @return whether update is through copy or invalidate
341      */
342     public final boolean isReplicateUpdatesViaCopy() {
343         return replicateUpdatesViaCopy;
344     }
345 
346     /***
347      * Asserts that the replicator is active.
348      *
349      * @return true if the status is not STATUS_ALIVE
350      */
351     public final boolean notAlive() {
352         return !alive();
353     }
354 
355     /***
356      * Checks that the replicator is is <code>STATUS_ALIVE</code>.
357      */
358     public final boolean alive() {
359         if (status == null) {
360             return false;
361         } else {
362             return (status.equals(Status.STATUS_ALIVE));
363         }
364     }
365 
366     /***
367      * Give the replicator a chance to cleanup and free resources when no longer needed
368      */
369     public void dispose() {
370         status = Status.STATUS_SHUTDOWN;
371     }
372 
373     /***
374      * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
375      * <p/>
376      * This may not be possible for listeners after they have been initialized. Implementations should throw
377      * CloneNotSupportedException if they do not support clone.
378      *
379      * @return a clone
380      * @throws CloneNotSupportedException if the listener could not be cloned.
381      */
382     public Object clone() throws CloneNotSupportedException {
383         //shutup checkstyle
384         super.clone();
385         return new RMISynchronousCacheReplicator(replicatePuts, replicatePutsViaCopy, replicateUpdates,
386                 replicateUpdatesViaCopy, replicateRemovals);
387     }
388 }