1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.CacheManager;
21 import net.sf.ehcache.Ehcache;
22 import net.sf.ehcache.Status;
23 import net.sf.ehcache.event.CacheEventListener;
24
25 import java.io.IOException;
26 import java.net.InetAddress;
27 import java.net.ServerSocket;
28 import java.net.UnknownHostException;
29 import java.rmi.Naming;
30 import java.rmi.NotBoundException;
31 import java.rmi.Remote;
32 import java.rmi.RemoteException;
33 import java.rmi.registry.LocateRegistry;
34 import java.rmi.registry.Registry;
35 import java.rmi.server.ExportException;
36 import java.rmi.server.UnicastRemoteObject;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Set;
43
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /***
48 * A cache server which exposes available cache operations remotely through RMI.
49 * <p/>
50 * It acts as a Decorator to a Cache. It holds an instance of cache, which is a local cache it talks to.
51 * <p/>
52 * This class could specify a security manager with code like:
53 * <pre>
54 * if (System.getSecurityManager() == null) {
55 * System.setSecurityManager(new RMISecurityManager());
56 * }
57 * </pre>
58 * Doing so would require the addition of <code>grant</code> statements in the <code>java.policy</code> file.
59 * <p/>
60 * Per the JDK documentation: "If no security manager is specified no class loading, by RMI clients or servers, is allowed,
61 * aside from what can be found in the local CLASSPATH." The classpath of each instance of this class should have
62 * all required classes to enable distribution, so no remote classloading is required or desirable. Accordingly,
63 * no security manager is set and there are no special JVM configuration requirements.
64 * <p/>
65 * This class opens a ServerSocket. The dispose method should be called for orderly closure of that socket. This class
66 * has a shutdown hook which calls dispose() as a convenience feature for developers.
67 *
68 * @author Greg Luck
69 * @version $Id: RMICacheManagerPeerListener.html 13146 2011-08-01 17:12:39Z oletizi $
70 */
71 public class RMICacheManagerPeerListener implements CacheManagerPeerListener {
72
73 private static final Logger LOG = LoggerFactory.getLogger(RMICacheManagerPeerListener.class.getName());
74 private static final int MINIMUM_SENSIBLE_TIMEOUT = 200;
75 private static final int NAMING_UNBIND_RETRY_INTERVAL = 400;
76 private static final int NAMING_UNBIND_MAX_RETRIES = 10;
77
78 /***
79 * The cache peers. The value is an RMICachePeer.
80 */
81 protected final Map cachePeers = new HashMap();
82
83 /***
84 * status.
85 */
86 protected Status status;
87
88 /***
89 * The RMI listener port
90 */
91 protected Integer port;
92
93 private Registry registry;
94 private boolean registryCreated;
95 private final String hostName;
96
97 private CacheManager cacheManager;
98 private Integer socketTimeoutMillis;
99 private Integer remoteObjectPort;
100
101 /***
102 * Constructor with full arguments.
103 *
104 * @param hostName may be null, in which case the hostName will be looked up. Machines with multiple
105 * interfaces should specify this if they do not want it to be the default NIC.
106 * @param port a port in the range 1025 - 65536
107 * @param remoteObjectPort the port number on which the remote objects bound in the registry receive calls.
108 This defaults to a free port if not specified.
109 * @param cacheManager the CacheManager this listener belongs to
110 * @param socketTimeoutMillis TCP/IP Socket timeout when waiting on response
111 */
112 public RMICacheManagerPeerListener(String hostName, Integer port, Integer remoteObjectPort, CacheManager cacheManager,
113 Integer socketTimeoutMillis) throws UnknownHostException {
114
115 status = Status.STATUS_UNINITIALISED;
116
117 if (hostName != null && hostName.length() != 0) {
118 this.hostName = hostName;
119 if (hostName.equals("localhost")) {
120 LOG.warn("Explicitly setting the listener hostname to 'localhost' is not recommended. "
121 + "It will only work if all CacheManager peers are on the same machine.");
122 }
123 } else {
124 this.hostName = calculateHostAddress();
125 }
126 if (port == null || port.intValue() == 0) {
127 assignFreePort(false);
128 } else {
129 this.port = port;
130 }
131
132
133 this.remoteObjectPort = remoteObjectPort;
134
135 this.cacheManager = cacheManager;
136 if (socketTimeoutMillis == null || socketTimeoutMillis.intValue() < MINIMUM_SENSIBLE_TIMEOUT) {
137 throw new IllegalArgumentException("socketTimoutMillis must be a reasonable value greater than 200ms");
138 }
139 this.socketTimeoutMillis = socketTimeoutMillis;
140
141 }
142
143 /***
144 * Assigns a free port to be the listener port.
145 *
146 * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
147 */
148 protected void assignFreePort(boolean forced) throws IllegalStateException {
149 if (status != Status.STATUS_UNINITIALISED) {
150 throw new IllegalStateException("Cannot change the port of an already started listener.");
151 }
152 this.port = Integer.valueOf(this.getFreePort());
153 if (forced) {
154 LOG.warn("Resolving RMI port conflict by automatically using a free TCP/IP port to listen on: " + this.port);
155 } else {
156 LOG.debug("Automatically finding a free TCP/IP port to listen on: " + this.port);
157 }
158 }
159
160
161 /***
162 * Calculates the host address as the default NICs IP address
163 *
164 * @throws UnknownHostException
165 */
166 protected String calculateHostAddress() throws UnknownHostException {
167 return InetAddress.getLocalHost().getHostAddress();
168 }
169
170
171 /***
172 * Gets a free server socket port.
173 *
174 * @return a number in the range 1025 - 65536 that was free at the time this method was executed
175 * @throws IllegalArgumentException
176 */
177 protected int getFreePort() throws IllegalArgumentException {
178 ServerSocket serverSocket = null;
179 try {
180 serverSocket = new ServerSocket(0);
181 return serverSocket.getLocalPort();
182 } catch (IOException e) {
183 throw new IllegalArgumentException("Could not acquire a free port number.");
184 } finally {
185 if (serverSocket != null && !serverSocket.isClosed()) {
186 try {
187 serverSocket.close();
188 } catch (Exception e) {
189 LOG.debug("Error closing ServerSocket: " + e.getMessage());
190 }
191 }
192 }
193 }
194
195
196 /***
197 * {@inheritDoc}
198 */
199 public void init() throws CacheException {
200 if (!status.equals(Status.STATUS_UNINITIALISED)) {
201 return;
202 }
203 RMICachePeer rmiCachePeer = null;
204 try {
205 startRegistry();
206 int counter = 0;
207 populateListOfRemoteCachePeers();
208 synchronized (cachePeers) {
209 for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
210 rmiCachePeer = (RMICachePeer) iterator.next();
211 bind(rmiCachePeer.getUrl(), rmiCachePeer);
212 counter++;
213 }
214 }
215 LOG.debug(counter + " RMICachePeers bound in registry for RMI listener");
216 status = Status.STATUS_ALIVE;
217 } catch (Exception e) {
218 String url = null;
219 if (rmiCachePeer != null) {
220 url = rmiCachePeer.getUrl();
221 }
222
223 throw new CacheException("Problem starting listener for RMICachePeer "
224 + url + ". Initial cause was " + e.getMessage(), e);
225 }
226 }
227
228 /***
229 * Bind a cache peer
230 *
231 * @param rmiCachePeer
232 */
233 protected void bind(String peerName, RMICachePeer rmiCachePeer) throws Exception {
234 Naming.rebind(peerName, rmiCachePeer);
235 }
236
237 /***
238 * Returns a list of bound objects.
239 * <p/>
240 * This should match the list of cachePeers i.e. they should always be bound
241 *
242 * @return a list of String representations of <code>RMICachePeer</code> objects
243 */
244 protected String[] listBoundRMICachePeers() throws CacheException {
245 try {
246 return registry.list();
247 } catch (RemoteException e) {
248 throw new CacheException("Unable to list cache peers " + e.getMessage());
249 }
250 }
251
252 /***
253 * Returns a reference to the remote object.
254 *
255 * @param name the name of the cache e.g. <code>sampleCache1</code>
256 */
257 protected Remote lookupPeer(String name) throws CacheException {
258 try {
259 return registry.lookup(name);
260 } catch (Exception e) {
261 throw new CacheException("Unable to lookup peer for replicated cache " + name + " "
262 + e.getMessage());
263 }
264 }
265
266 /***
267 * Should be called on init because this is one of the last things that should happen on CacheManager startup.
268 */
269 protected void populateListOfRemoteCachePeers() throws RemoteException {
270 String[] names = cacheManager.getCacheNames();
271 for (int i = 0; i < names.length; i++) {
272 String name = names[i];
273 Ehcache cache = cacheManager.getEhcache(name);
274 synchronized (cachePeers) {
275 if (cachePeers.get(name) == null) {
276 if (isDistributed(cache)) {
277 RMICachePeer peer;
278 if (cache.getCacheConfiguration().getTransactionalMode().isTransactional()) {
279 peer = new TransactionalRMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
280 } else {
281 peer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
282 }
283 cachePeers.put(name, peer);
284 }
285 }
286 }
287 }
288
289 }
290
291 /***
292 * Determine if the given cache is distributed.
293 *
294 * @param cache the cache to check
295 * @return true if a <code>CacheReplicator</code> is found in the listeners
296 */
297 protected boolean isDistributed(Ehcache cache) {
298 Set listeners = cache.getCacheEventNotificationService().getCacheEventListeners();
299 for (Iterator iterator = listeners.iterator(); iterator.hasNext();) {
300 CacheEventListener cacheEventListener = (CacheEventListener) iterator.next();
301 if (cacheEventListener instanceof CacheReplicator) {
302 return true;
303 }
304 }
305 return false;
306 }
307
308 /***
309 * Start the rmiregistry.
310 * <p/>
311 * The alternative is to use the <code>rmiregistry</code> binary, in which case:
312 * <ol/>
313 * <li>rmiregistry running
314 * <li>-Djava.rmi.server.codebase="file:///Users/gluck/work/ehcache/build/classes/ file:///Users/gluck/work/ehcache/lib/commons-logging-1.0.4.jar"
315 * </ol>
316 *
317 * @throws RemoteException
318 */
319 protected void startRegistry() throws RemoteException {
320 try {
321 registry = LocateRegistry.getRegistry(port.intValue());
322 try {
323 registry.list();
324 } catch (RemoteException e) {
325
326 registry = LocateRegistry.createRegistry(port.intValue());
327 registryCreated = true;
328 }
329 } catch (ExportException exception) {
330 LOG.error("Exception starting RMI registry. Error was " + exception.getMessage(), exception);
331 }
332 }
333
334 /***
335 * Stop the rmiregistry if it was started by this class.
336 *
337 * @throws RemoteException
338 */
339 protected void stopRegistry() throws RemoteException {
340 if (registryCreated) {
341
342
343
344 boolean success = UnicastRemoteObject.unexportObject(registry, true);
345 if (success) {
346 LOG.debug("rmiregistry unexported.");
347 } else {
348 LOG.warn("Could not unexport rmiregistry.");
349 }
350 }
351 }
352
353 /***
354 * Stop the listener. It
355 * <ul>
356 * <li>unbinds the objects from the registry
357 * <li>unexports Remote objects
358 * </ul>
359 */
360 public void dispose() throws CacheException {
361 if (!status.equals(Status.STATUS_ALIVE)) {
362 return;
363 }
364 try {
365 int counter = 0;
366 synchronized (cachePeers) {
367 for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
368 RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next();
369 disposeRMICachePeer(rmiCachePeer);
370 counter++;
371 }
372 stopRegistry();
373 }
374 LOG.debug(counter + " RMICachePeers unbound from registry in RMI listener");
375 status = Status.STATUS_SHUTDOWN;
376 } catch (Exception e) {
377 throw new CacheException("Problem unbinding remote cache peers. Initial cause was " + e.getMessage(), e);
378 }
379 }
380
381 /***
382 * A template method to dispose an individual RMICachePeer. This consists of:
383 * <ol>
384 * <li>Unbinding the peer from the naming service
385 * <li>Unexporting the peer
386 * </ol>
387 * Override to specialise behaviour
388 *
389 * @param rmiCachePeer the cache peer to dispose of
390 * @throws Exception thrown if something goes wrong
391 */
392 protected void disposeRMICachePeer(RMICachePeer rmiCachePeer) throws Exception {
393 unbind(rmiCachePeer);
394 }
395
396 /***
397 * Unbinds an RMICachePeer and unexports it.
398 * <p/>
399 * We unbind from the registry first before unexporting.
400 * Unbinding first removes the very small possibility of a client
401 * getting the object from the registry while we are trying to unexport it.
402 * <p/>
403 * This method may take up to 4 seconds to complete, if we are having trouble
404 * unexporting the peer.
405 *
406 * @param rmiCachePeer the bound and exported cache peer
407 * @throws Exception
408 */
409 protected void unbind(RMICachePeer rmiCachePeer) throws Exception {
410 String url = rmiCachePeer.getUrl();
411 try {
412 Naming.unbind(url);
413 } catch (NotBoundException e) {
414 LOG.warn(url + " not bound therefore not unbinding.");
415 }
416
417 boolean unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false);
418 for (int count = 1; (count < NAMING_UNBIND_MAX_RETRIES) && !unexported; count++) {
419 try {
420 Thread.sleep(NAMING_UNBIND_RETRY_INTERVAL);
421 } catch (InterruptedException ie) {
422
423 break;
424 }
425 unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false);
426 }
427
428
429
430 if (!unexported) {
431 if (!UnicastRemoteObject.unexportObject(rmiCachePeer, true)) {
432 LOG.warn("Unable to unexport rmiCachePeer: " + rmiCachePeer.getUrl() + ". Skipping.");
433 }
434 }
435 }
436
437 /***
438 * All of the caches which are listening for remote changes.
439 *
440 * @return a list of <code>RMICachePeer</code> objects. The list if not live
441 */
442 public List getBoundCachePeers() {
443 List cachePeerList = new ArrayList();
444 synchronized (cachePeers) {
445 for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
446 RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next();
447 cachePeerList.add(rmiCachePeer);
448 }
449 }
450 return cachePeerList;
451 }
452
453 /***
454 * Returns the listener status.
455 */
456 public Status getStatus() {
457 return status;
458 }
459
460 /***
461 * A listener will normally have a resource that only one instance can use at the same time,
462 * such as a port. This identifier is used to tell if it is unique and will not conflict with an
463 * existing instance using the resource.
464 *
465 * @return a String identifier for the resource
466 */
467 public String getUniqueResourceIdentifier() {
468 return "RMI listener port: " + port;
469 }
470
471 /***
472 * If a conflict is detected in unique resource use, this method signals the listener to attempt
473 * automatic resolution of the resource conflict.
474 *
475 * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
476 */
477 public void attemptResolutionOfUniqueResourceConflict() throws IllegalStateException, CacheException {
478 assignFreePort(true);
479 }
480
481 /***
482 * The replication scheme this listener interacts with.
483 * Each peer provider has a scheme name, which can be used by caches to specify for replication and bootstrap purposes.
484 *
485 * @return the well-known scheme name, which is determined by the replication provider author.
486 */
487 public String getScheme() {
488 return "RMI";
489 }
490
491 /***
492 * Called immediately after a cache has been added and activated.
493 * <p/>
494 * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
495 * method on CacheManager from this method will cause a deadlock.
496 * <p/>
497 * Note that activation will also cause a CacheEventListener status change notification from
498 * {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} to {@link net.sf.ehcache.Status#STATUS_ALIVE}. Care should be
499 * taken on processing that notification because:
500 * <ul>
501 * <li>the cache will not yet be accessible from the CacheManager.
502 * <li>the addCaches methods whih cause this notification are synchronized on the CacheManager. An attempt to call
503 * {@link net.sf.ehcache.CacheManager#getCache(String)} will cause a deadlock.
504 * </ul>
505 * The calling method will block until this method returns.
506 * <p/>
507 * Repopulates the list of cache peers and rebinds the list.
508 * This method should be called if a cache is dynamically added
509 *
510 * @param cacheName the name of the <code>Cache</code> the operation relates to
511 * @see net.sf.ehcache.event.CacheEventListener
512 */
513 public void notifyCacheAdded(String cacheName) throws CacheException {
514
515
516 LOG.debug("Adding to RMI listener", cacheName);
517
518
519 synchronized (cachePeers) {
520 if (cachePeers.get(cacheName) != null) {
521 return;
522 }
523 }
524
525 Ehcache cache = cacheManager.getEhcache(cacheName);
526 if (isDistributed(cache)) {
527 RMICachePeer rmiCachePeer = null;
528 String url = null;
529 try {
530 if (cache.getCacheConfiguration().getTransactionalMode().isTransactional()) {
531 rmiCachePeer = new TransactionalRMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
532 } else {
533 rmiCachePeer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
534 }
535 url = rmiCachePeer.getUrl();
536 bind(url, rmiCachePeer);
537 } catch (Exception e) {
538 throw new CacheException("Problem starting listener for RMICachePeer "
539 + url + ". Initial cause was " + e.getMessage(), e);
540 }
541
542 synchronized (cachePeers) {
543 cachePeers.put(cacheName, rmiCachePeer);
544 }
545
546 }
547 if (LOG.isDebugEnabled()) {
548 LOG.debug(cachePeers.size() + " RMICachePeers bound in registry for RMI listener");
549 }
550 }
551
552 /***
553 * Called immediately after a cache has been disposed and removed. The calling method will block until
554 * this method returns.
555 * <p/>
556 * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
557 * method on CacheManager from this method will cause a deadlock.
558 * <p/>
559 * Note that a {@link net.sf.ehcache.event.CacheEventListener} status changed will also be triggered. Any attempt from that notification
560 * to access CacheManager will also result in a deadlock.
561 *
562 * @param cacheName the name of the <code>Cache</code> the operation relates to
563 */
564 public void notifyCacheRemoved(String cacheName) {
565
566
567 LOG.debug("Removing from RMI listener", cacheName);
568
569
570 synchronized (cachePeers) {
571 if (cachePeers.get(cacheName) == null) {
572 return;
573 }
574 }
575
576 RMICachePeer rmiCachePeer;
577 synchronized (cachePeers) {
578 rmiCachePeer = (RMICachePeer) cachePeers.remove(cacheName);
579 }
580 String url = null;
581 try {
582 unbind(rmiCachePeer);
583 } catch (Exception e) {
584 throw new CacheException("Error removing Cache Peer "
585 + url + " from listener. Message was: " + e.getMessage(), e);
586 }
587
588 if (LOG.isDebugEnabled()) {
589 LOG.debug(cachePeers.size() + " RMICachePeers bound in registry for RMI listener");
590 }
591 }
592
593
594 /***
595 * Package local method for testing
596 */
597 void addCachePeer(String name, RMICachePeer peer) {
598 synchronized (cachePeers) {
599 cachePeers.put(name, peer);
600
601 }
602 }
603 }