1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.Ehcache;
21 import net.sf.ehcache.Element;
22 import net.sf.ehcache.Status;
23
24 import java.io.Serializable;
25 import java.util.List;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /***
31 * Listens to {@link net.sf.ehcache.CacheManager} and {@link net.sf.ehcache.Cache} events and propagates those to
32 * {@link CachePeer} peers of the Cache.
33 *
34 * @author Greg Luck
35 * @version $Id: RMISynchronousCacheReplicator.html 13146 2011-08-01 17:12:39Z oletizi $
36 */
37 public class RMISynchronousCacheReplicator implements CacheReplicator {
38
39 private static final Logger LOG = LoggerFactory.getLogger(RMISynchronousCacheReplicator.class.getName());
40
41
42 /***
43 * The status of the replicator. Only replicates when <code>STATUS_ALIVE</code>
44 */
45 protected Status status;
46
47 /***
48 * Whether to replicate puts.
49 */
50 protected final boolean replicatePuts;
51
52 /***
53 * Whether a put should replicated by copy or by invalidation, (a remove).
54 * <p/>
55 * By copy is best when the entry is expensive to produce. By invalidation is best when
56 * we are really trying to force other caches to sync back to a canonical source like a database.
57 * An example of a latter usage would be a read/write cache being used in Hibernate.
58 * <p/>
59 * This setting only has effect if <code>#replicateUpdates</code> is true.
60 */
61 protected boolean replicatePutsViaCopy;
62
63 /***
64 * Whether to replicate updates.
65 */
66 protected final boolean replicateUpdates;
67
68 /***
69 * Whether an update (a put) should be by copy or by invalidation, (a remove).
70 * <p/>
71 * By copy is best when the entry is expensive to produce. By invalidation is best when
72 * we are really trying to force other caches to sync back to a canonical source like a database.
73 * An example of a latter usage would be a read/write cache being used in Hibernate.
74 * <p/>
75 * This setting only has effect if <code>#replicateUpdates</code> is true.
76 */
77 protected final boolean replicateUpdatesViaCopy;
78 /***
79 * Whether to replicate removes
80 */
81 protected final boolean replicateRemovals;
82
83 /***
84 * Constructor for internal and subclass use
85 *
86 * @param replicatePuts
87 * @param replicateUpdates
88 * @param replicateUpdatesViaCopy
89 * @param replicateRemovals
90 */
91 public RMISynchronousCacheReplicator(
92 boolean replicatePuts,
93 boolean replicatePutsViaCopy,
94 boolean replicateUpdates,
95 boolean replicateUpdatesViaCopy,
96 boolean replicateRemovals) {
97 this.replicatePuts = replicatePuts;
98 this.replicatePutsViaCopy = replicatePutsViaCopy;
99 this.replicateUpdates = replicateUpdates;
100 this.replicateUpdatesViaCopy = replicateUpdatesViaCopy;
101 this.replicateRemovals = replicateRemovals;
102 status = Status.STATUS_ALIVE;
103 }
104
105 /***
106 * Called immediately after an element has been put into the cache. The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
107 * will block until this method returns.
108 * <p/>
109 * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
110 * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
111 *
112 * @param cache the cache emitting the notification
113 * @param element the element which was just put into the cache.
114 */
115 public void notifyElementPut(final Ehcache cache, final Element element) throws CacheException {
116 if (notAlive()) {
117 return;
118 }
119
120 if (!replicatePuts) {
121 return;
122 }
123
124 if (!element.isSerializable()) {
125 if (LOG.isWarnEnabled()) {
126 LOG.warn("Object with key " + element.getObjectKey() + " is not Serializable and cannot be replicated");
127 }
128 return;
129 }
130
131 if (replicatePutsViaCopy) {
132 replicatePutNotification(cache, element);
133 } else {
134 replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
135 }
136 }
137
138 /***
139 * Does the actual RMI remote call.
140 * <p/>
141 * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
142 * peers will continue.
143 */
144 protected static void replicatePutNotification(Ehcache cache, Element element) throws RemoteCacheException {
145 List cachePeers = listRemoteCachePeers(cache);
146 for (Object cachePeer1 : cachePeers) {
147 CachePeer cachePeer = (CachePeer) cachePeer1;
148 try {
149 cachePeer.put(element);
150 } catch (Throwable t) {
151 LOG.error("Exception on replication of putNotification. " + t.getMessage() + ". Continuing...", t);
152 }
153 }
154 }
155
156
157 /***
158 * Called immediately after an element has been put into the cache and the element already
159 * existed in the cache. This is thus an update.
160 * <p/>
161 * The {@link net.sf.ehcache.Cache#put(net.sf.ehcache.Element)} method
162 * will block until this method returns.
163 * <p/>
164 * Implementers may wish to have access to the Element's fields, including value, so the element is provided.
165 * Implementers should be careful not to modify the element. The effect of any modifications is undefined.
166 *
167 * @param cache the cache emitting the notification
168 * @param element the element which was just put into the cache.
169 */
170 public void notifyElementUpdated(final Ehcache cache, final Element element) throws CacheException {
171 if (notAlive()) {
172 return;
173 }
174 if (!replicateUpdates) {
175 return;
176 }
177
178 if (replicateUpdatesViaCopy) {
179 if (!element.isSerializable()) {
180 if (LOG.isWarnEnabled()) {
181 LOG.warn("Object with key " + element.getObjectKey()
182 + " is not Serializable and cannot be updated via copy");
183 }
184 return;
185 }
186
187 replicatePutNotification(cache, element);
188 } else {
189 if (!element.isKeySerializable()) {
190 if (LOG.isWarnEnabled()) {
191 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
192 }
193 return;
194 }
195
196 replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
197 }
198 }
199
200 /***
201 * Called immediately after an attempt to remove an element. The remove method will block until
202 * this method returns.
203 * <p/>
204 * This notification is received regardless of whether the cache had an element matching
205 * the removal key or not. If an element was removed, the element is passed to this method,
206 * otherwise a synthetic element, with only the key set is passed in.
207 * <p/>
208 *
209 * @param cache the cache emitting the notification
210 * @param element the element just deleted, or a synthetic element with just the key set if
211 * no element was removed.param element just deleted
212 */
213 public void notifyElementRemoved(final Ehcache cache, final Element element) throws CacheException {
214 if (notAlive()) {
215 return;
216 }
217
218 if (!replicateRemovals) {
219 return;
220 }
221
222 if (!element.isKeySerializable()) {
223 if (LOG.isWarnEnabled()) {
224 LOG.warn("Key " + element.getObjectKey() + " is not Serializable and cannot be replicated.");
225 }
226 return;
227 }
228
229 replicateRemovalNotification(cache, (Serializable) element.getObjectKey());
230 }
231
232 /***
233 * Does the actual RMI remote call.
234 * <p/>
235 * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
236 * peers will continue.
237 */
238 protected static void replicateRemovalNotification(Ehcache cache, Serializable key) throws RemoteCacheException {
239 List cachePeers = listRemoteCachePeers(cache);
240 for (Object cachePeer1 : cachePeers) {
241 CachePeer cachePeer = (CachePeer) cachePeer1;
242 try {
243 cachePeer.remove(key);
244 } catch (Throwable t) {
245 LOG.error("Exception on replication of removeNotification. " + t.getMessage() + ". Continuing...", t);
246 }
247 }
248 }
249
250
251 /***
252 * {@inheritDoc}
253 * <p/>
254 * This implementation does not propagate expiries. It does not need to do anything because the element will
255 * expire in the remote cache at the same time. If the remote peer is not configured the same way they should
256 * not be in an cache cluster.
257 */
258 public final void notifyElementExpired(final Ehcache cache, final Element element) {
259
260
261
262 }
263
264 /***
265 * Called immediately after an element is evicted from the cache. Evicted in this sense
266 * means evicted from one store and not moved to another, so that it exists nowhere in the
267 * local cache.
268 * <p/>
269 * In a sense the Element has been <i>removed</i> from the cache, but it is different,
270 * thus the separate notification.
271 * <p/>
272 * This replicator does not propagate these events
273 *
274 * @param cache the cache emitting the notification
275 * @param element the element that has just been evicted
276 */
277 public void notifyElementEvicted(final Ehcache cache, final Element element) {
278 /***
279 * do not notify these
280 */
281 }
282
283
284 /***
285 * Called during {@link net.sf.ehcache.Ehcache#removeAll()} to indicate that the all
286 * elements have been removed from the cache in a bulk operation. The usual
287 * {@link #notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)}
288 * is not called.
289 * <p/>
290 * This notification exists because clearing a cache is a special case. It is often
291 * not practical to serially process notifications where potentially millions of elements
292 * have been bulk deleted.
293 *
294 * @param cache the cache emitting the notification
295 */
296 public void notifyRemoveAll(final Ehcache cache) {
297 if (notAlive()) {
298 return;
299 }
300
301 if (!replicateRemovals) {
302 return;
303 }
304
305 replicateRemoveAllNotification(cache);
306 }
307
308 /***
309 * Does the actual RMI remote call.
310 *
311 * If a Throwable occurs a SEVERE log message will be logged, but attempts to replicate to the other
312 * peers will continue.
313 *
314 */
315 protected void replicateRemoveAllNotification(Ehcache cache) {
316 List cachePeers = listRemoteCachePeers(cache);
317 for (Object cachePeer1 : cachePeers) {
318 CachePeer cachePeer = (CachePeer) cachePeer1;
319 try {
320 cachePeer.removeAll();
321 } catch (Throwable t) {
322 LOG.error("Exception on replication of removeAllNotification. " + t.getMessage() + ". Continuing...", t);
323 }
324 }
325 }
326
327 /***
328 * Package protected List of cache peers
329 *
330 * @param cache
331 * @return a list of {@link CachePeer} peers for the given cache, excluding the local peer.
332 */
333 static List listRemoteCachePeers(Ehcache cache) {
334 CacheManagerPeerProvider provider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
335 return provider.listRemoteCachePeers(cache);
336 }
337
338
339 /***
340 * @return whether update is through copy or invalidate
341 */
342 public final boolean isReplicateUpdatesViaCopy() {
343 return replicateUpdatesViaCopy;
344 }
345
346 /***
347 * Asserts that the replicator is active.
348 *
349 * @return true if the status is not STATUS_ALIVE
350 */
351 public final boolean notAlive() {
352 return !alive();
353 }
354
355 /***
356 * Checks that the replicator is is <code>STATUS_ALIVE</code>.
357 */
358 public final boolean alive() {
359 if (status == null) {
360 return false;
361 } else {
362 return (status.equals(Status.STATUS_ALIVE));
363 }
364 }
365
366 /***
367 * Give the replicator a chance to cleanup and free resources when no longer needed
368 */
369 public void dispose() {
370 status = Status.STATUS_SHUTDOWN;
371 }
372
373 /***
374 * Creates a clone of this listener. This method will only be called by ehcache before a cache is initialized.
375 * <p/>
376 * This may not be possible for listeners after they have been initialized. Implementations should throw
377 * CloneNotSupportedException if they do not support clone.
378 *
379 * @return a clone
380 * @throws CloneNotSupportedException if the listener could not be cloned.
381 */
382 public Object clone() throws CloneNotSupportedException {
383
384 super.clone();
385 return new RMISynchronousCacheReplicator(replicatePuts, replicatePutsViaCopy, replicateUpdates,
386 replicateUpdatesViaCopy, replicateRemovals);
387 }
388 }