View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.CacheManager;
21  import net.sf.ehcache.Ehcache;
22  import net.sf.ehcache.Status;
23  import net.sf.ehcache.event.CacheEventListener;
24  
25  import java.io.IOException;
26  import java.net.InetAddress;
27  import java.net.ServerSocket;
28  import java.net.UnknownHostException;
29  import java.rmi.Naming;
30  import java.rmi.NotBoundException;
31  import java.rmi.Remote;
32  import java.rmi.RemoteException;
33  import java.rmi.registry.LocateRegistry;
34  import java.rmi.registry.Registry;
35  import java.rmi.server.ExportException;
36  import java.rmi.server.UnicastRemoteObject;
37  import java.util.ArrayList;
38  import java.util.HashMap;
39  import java.util.Iterator;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.Set;
43  
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  
47  /***
48   * A cache server which exposes available cache operations remotely through RMI.
49   * <p/>
50   * It acts as a Decorator to a Cache. It holds an instance of cache, which is a local cache it talks to.
51   * <p/>
52   * This class could specify a security manager with code like:
53   * <pre>
54   * if (System.getSecurityManager() == null) {
55   *     System.setSecurityManager(new RMISecurityManager());
56   * }
57   * </pre>
58   * Doing so would require the addition of <code>grant</code> statements in the <code>java.policy</code> file.
59   * <p/>
60   * Per the JDK documentation: "If no security manager is specified no class loading, by RMI clients or servers, is allowed,
61   * aside from what can be found in the local CLASSPATH." The classpath of each instance of this class should have
62   * all required classes to enable distribution, so no remote classloading is required or desirable. Accordingly,
63   * no security manager is set and there are no special JVM configuration requirements.
64   * <p/>
65   * This class opens a ServerSocket. The dispose method should be called for orderly closure of that socket. This class
66   * has a shutdown hook which calls dispose() as a convenience feature for developers.
67   *
68   * @author Greg Luck
69   * @version $Id: RMICacheManagerPeerListener.html 13146 2011-08-01 17:12:39Z oletizi $
70   */
71  public class RMICacheManagerPeerListener implements CacheManagerPeerListener {
72  
73      private static final Logger LOG = LoggerFactory.getLogger(RMICacheManagerPeerListener.class.getName());
74      private static final int MINIMUM_SENSIBLE_TIMEOUT = 200;
75      private static final int NAMING_UNBIND_RETRY_INTERVAL = 400;
76      private static final int NAMING_UNBIND_MAX_RETRIES = 10;
77  
78      /***
79       * The cache peers. The value is an RMICachePeer.
80       */
81      protected final Map cachePeers = new HashMap();
82  
83      /***
84       * status.
85       */
86      protected Status status;
87  
88      /***
89       * The RMI listener port
90       */
91      protected Integer port;
92  
93      private Registry registry;
94      private boolean registryCreated;
95      private final String hostName;
96  
97      private CacheManager cacheManager;
98      private Integer socketTimeoutMillis;
99      private Integer remoteObjectPort;
100 
101     /***
102      * Constructor with full arguments.
103      *
104      * @param hostName            may be null, in which case the hostName will be looked up. Machines with multiple
105      *                            interfaces should specify this if they do not want it to be the default NIC.
106      * @param port                a port in the range 1025 - 65536
107      * @param remoteObjectPort    the port number on which the remote objects bound in the registry receive calls.
108                                   This defaults to a free port if not specified.
109      * @param cacheManager        the CacheManager this listener belongs to
110      * @param socketTimeoutMillis TCP/IP Socket timeout when waiting on response
111      */
112     public RMICacheManagerPeerListener(String hostName, Integer port, Integer remoteObjectPort, CacheManager cacheManager,
113                                        Integer socketTimeoutMillis) throws UnknownHostException {
114 
115         status = Status.STATUS_UNINITIALISED;
116 
117         if (hostName != null && hostName.length() != 0) {
118             this.hostName = hostName;
119             if (hostName.equals("localhost")) {
120                 LOG.warn("Explicitly setting the listener hostname to 'localhost' is not recommended. "
121                         + "It will only work if all CacheManager peers are on the same machine.");
122             }
123         } else {
124             this.hostName = calculateHostAddress();
125         }
126         if (port == null || port.intValue() == 0) {
127             assignFreePort(false);
128         } else {
129             this.port = port;
130         }
131 
132         //by default is 0, which is ok.
133         this.remoteObjectPort = remoteObjectPort;
134 
135         this.cacheManager = cacheManager;
136         if (socketTimeoutMillis == null || socketTimeoutMillis.intValue() < MINIMUM_SENSIBLE_TIMEOUT) {
137             throw new IllegalArgumentException("socketTimoutMillis must be a reasonable value greater than 200ms");
138         }
139         this.socketTimeoutMillis = socketTimeoutMillis;
140 
141     }
142 
143     /***
144      * Assigns a free port to be the listener port.
145      *
146      * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
147      */
148     protected void assignFreePort(boolean forced) throws IllegalStateException {
149         if (status != Status.STATUS_UNINITIALISED) {
150             throw new IllegalStateException("Cannot change the port of an already started listener.");
151         }
152         this.port = Integer.valueOf(this.getFreePort());
153         if (forced) {
154             LOG.warn("Resolving RMI port conflict by automatically using a free TCP/IP port to listen on: " + this.port);
155         } else {
156             LOG.debug("Automatically finding a free TCP/IP port to listen on: " + this.port);
157         }
158     }
159 
160 
161     /***
162      * Calculates the host address as the default NICs IP address
163      *
164      * @throws UnknownHostException
165      */
166     protected String calculateHostAddress() throws UnknownHostException {
167         return InetAddress.getLocalHost().getHostAddress();
168     }
169 
170 
171     /***
172      * Gets a free server socket port.
173      *
174      * @return a number in the range 1025 - 65536 that was free at the time this method was executed
175      * @throws IllegalArgumentException
176      */
177     protected int getFreePort() throws IllegalArgumentException {
178         ServerSocket serverSocket = null;
179         try {
180             serverSocket = new ServerSocket(0);
181             return serverSocket.getLocalPort();
182         } catch (IOException e) {
183             throw new IllegalArgumentException("Could not acquire a free port number.");
184         } finally {
185             if (serverSocket != null && !serverSocket.isClosed()) {
186                 try {
187                     serverSocket.close();
188                 } catch (Exception e) {
189                     LOG.debug("Error closing ServerSocket: " + e.getMessage());
190                 }
191             }
192         }
193     }
194 
195 
196     /***
197      * {@inheritDoc}
198      */
199     public void init() throws CacheException {
200         if (!status.equals(Status.STATUS_UNINITIALISED)) {
201             return;
202         }
203         RMICachePeer rmiCachePeer = null;
204         try {
205             startRegistry();
206             int counter = 0;
207             populateListOfRemoteCachePeers();
208             synchronized (cachePeers) {
209                 for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
210                     rmiCachePeer = (RMICachePeer) iterator.next();
211                     bind(rmiCachePeer.getUrl(), rmiCachePeer);
212                     counter++;
213                 }
214             }
215             LOG.debug(counter + " RMICachePeers bound in registry for RMI listener");
216             status = Status.STATUS_ALIVE;
217         } catch (Exception e) {
218             String url = null;
219             if (rmiCachePeer != null) {
220                 url = rmiCachePeer.getUrl();
221             }
222 
223             throw new CacheException("Problem starting listener for RMICachePeer "
224                     + url + ". Initial cause was " + e.getMessage(), e);
225         }
226     }
227 
228     /***
229      * Bind a cache peer
230      *
231      * @param rmiCachePeer
232      */
233     protected void bind(String peerName, RMICachePeer rmiCachePeer) throws Exception {
234         Naming.rebind(peerName, rmiCachePeer);
235     }
236 
237     /***
238      * Returns a list of bound objects.
239      * <p/>
240      * This should match the list of cachePeers i.e. they should always be bound
241      *
242      * @return a list of String representations of <code>RMICachePeer</code> objects
243      */
244     protected String[] listBoundRMICachePeers() throws CacheException {
245         try {
246             return registry.list();
247         } catch (RemoteException e) {
248             throw new CacheException("Unable to list cache peers " + e.getMessage());
249         }
250     }
251 
252     /***
253      * Returns a reference to the remote object.
254      *
255      * @param name the name of the cache e.g. <code>sampleCache1</code>
256      */
257     protected Remote lookupPeer(String name) throws CacheException {
258         try {
259             return registry.lookup(name);
260         } catch (Exception e) {
261             throw new CacheException("Unable to lookup peer for replicated cache " + name + " "
262                     + e.getMessage());
263         }
264     }
265 
266     /***
267      * Should be called on init because this is one of the last things that should happen on CacheManager startup.
268      */
269     protected void populateListOfRemoteCachePeers() throws RemoteException {
270         String[] names = cacheManager.getCacheNames();
271         for (int i = 0; i < names.length; i++) {
272             String name = names[i];
273             Ehcache cache = cacheManager.getEhcache(name);
274             synchronized (cachePeers) {
275                 if (cachePeers.get(name) == null) {
276                     if (isDistributed(cache)) {
277                         RMICachePeer peer;
278                         if (cache.getCacheConfiguration().getTransactionalMode().isTransactional()) {
279                             peer = new TransactionalRMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
280                         } else {
281                             peer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
282                         }
283                         cachePeers.put(name, peer);
284                     }
285                 }
286             }
287         }
288 
289     }
290 
291     /***
292      * Determine if the given cache is distributed.
293      *
294      * @param cache the cache to check
295      * @return true if a <code>CacheReplicator</code> is found in the listeners
296      */
297     protected boolean isDistributed(Ehcache cache) {
298         Set listeners = cache.getCacheEventNotificationService().getCacheEventListeners();
299         for (Iterator iterator = listeners.iterator(); iterator.hasNext();) {
300             CacheEventListener cacheEventListener = (CacheEventListener) iterator.next();
301             if (cacheEventListener instanceof CacheReplicator) {
302                 return true;
303             }
304         }
305         return false;
306     }
307 
308     /***
309      * Start the rmiregistry.
310      * <p/>
311      * The alternative is to use the <code>rmiregistry</code> binary, in which case:
312      * <ol/>
313      * <li>rmiregistry running
314      * <li>-Djava.rmi.server.codebase="file:///Users/gluck/work/ehcache/build/classes/ file:///Users/gluck/work/ehcache/lib/commons-logging-1.0.4.jar"
315      * </ol>
316      *
317      * @throws RemoteException
318      */
319     protected void startRegistry() throws RemoteException {
320         try {
321             registry = LocateRegistry.getRegistry(port.intValue());
322             try {
323                 registry.list();
324             } catch (RemoteException e) {
325                 //may not be created. Let's create it.
326                 registry = LocateRegistry.createRegistry(port.intValue());
327                 registryCreated = true;
328             }
329         } catch (ExportException exception) {
330             LOG.error("Exception starting RMI registry. Error was " + exception.getMessage(), exception);
331         }
332     }
333 
334     /***
335      * Stop the rmiregistry if it was started by this class.
336      *
337      * @throws RemoteException
338      */
339     protected void stopRegistry() throws RemoteException {
340         if (registryCreated) {
341             // the unexportObject call must be done on the Registry object returned
342             // by createRegistry not by getRegistry, a NoSuchObjectException is
343             // thrown otherwise
344             boolean success = UnicastRemoteObject.unexportObject(registry, true);
345             if (success) {
346                 LOG.debug("rmiregistry unexported.");
347             } else {
348                 LOG.warn("Could not unexport rmiregistry.");
349             }
350         }
351     }
352 
353     /***
354      * Stop the listener. It
355      * <ul>
356      * <li>unbinds the objects from the registry
357      * <li>unexports Remote objects
358      * </ul>
359      */
360     public void dispose() throws CacheException {
361         if (!status.equals(Status.STATUS_ALIVE)) {
362             return;
363         }
364         try {
365             int counter = 0;
366             synchronized (cachePeers) {
367                 for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
368                     RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next();
369                     disposeRMICachePeer(rmiCachePeer);
370                     counter++;
371                 }
372                 stopRegistry();
373             }
374             LOG.debug(counter + " RMICachePeers unbound from registry in RMI listener");
375             status = Status.STATUS_SHUTDOWN;
376         } catch (Exception e) {
377             throw new CacheException("Problem unbinding remote cache peers. Initial cause was " + e.getMessage(), e);
378         }
379     }
380 
381     /***
382      * A template method to dispose an individual RMICachePeer. This consists of:
383      * <ol>
384      * <li>Unbinding the peer from the naming service
385      * <li>Unexporting the peer
386      * </ol>
387      * Override to specialise behaviour
388      *
389      * @param rmiCachePeer the cache peer to dispose of
390      * @throws Exception thrown if something goes wrong
391      */
392     protected void disposeRMICachePeer(RMICachePeer rmiCachePeer) throws Exception {
393         unbind(rmiCachePeer);
394     }
395 
396     /***
397      * Unbinds an RMICachePeer and unexports it.
398      * <p/>
399      * We unbind from the registry first before unexporting.
400      * Unbinding first removes the very small possibility of a client
401      * getting the object from the registry while we are trying to unexport it.
402      * <p/>
403      * This method may take up to 4 seconds to complete, if we are having trouble
404      * unexporting the peer.
405      *
406      * @param rmiCachePeer the bound and exported cache peer
407      * @throws Exception
408      */
409     protected void unbind(RMICachePeer rmiCachePeer) throws Exception {
410         String url = rmiCachePeer.getUrl();
411         try {
412             Naming.unbind(url);
413         } catch (NotBoundException e) {
414             LOG.warn(url + " not bound therefore not unbinding.");
415         }
416         // Try to gracefully unexport before forcing it.
417         boolean unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false);
418         for (int count = 1; (count < NAMING_UNBIND_MAX_RETRIES) && !unexported; count++) {
419             try {
420                 Thread.sleep(NAMING_UNBIND_RETRY_INTERVAL);
421             } catch (InterruptedException ie) {
422                 // break out of the unexportObject loop
423                 break;
424             }
425             unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false);
426         }
427 
428         // If we still haven't been able to unexport, force the unexport
429         // as a last resort.
430         if (!unexported) {
431             if (!UnicastRemoteObject.unexportObject(rmiCachePeer, true)) {
432                 LOG.warn("Unable to unexport rmiCachePeer: " + rmiCachePeer.getUrl() + ".  Skipping.");
433             }
434         }
435     }
436 
437     /***
438      * All of the caches which are listening for remote changes.
439      *
440      * @return a list of <code>RMICachePeer</code> objects. The list if not live
441      */
442     public List getBoundCachePeers() {
443         List cachePeerList = new ArrayList();
444         synchronized (cachePeers) {
445             for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) {
446                 RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next();
447                 cachePeerList.add(rmiCachePeer);
448             }
449         }
450         return cachePeerList;
451     }
452 
453     /***
454      * Returns the listener status.
455      */
456     public Status getStatus() {
457         return status;
458     }
459 
460     /***
461      * A listener will normally have a resource that only one instance can use at the same time,
462      * such as a port. This identifier is used to tell if it is unique and will not conflict with an
463      * existing instance using the resource.
464      *
465      * @return a String identifier for the resource
466      */
467     public String getUniqueResourceIdentifier() {
468         return "RMI listener port: " + port;
469     }
470 
471     /***
472      * If a conflict is detected in unique resource use, this method signals the listener to attempt
473      * automatic resolution of the resource conflict.
474      *
475      * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED}
476      */
477     public void attemptResolutionOfUniqueResourceConflict() throws IllegalStateException, CacheException {
478         assignFreePort(true);
479     }
480 
481     /***
482      * The replication scheme this listener interacts with.
483      * Each peer provider has a scheme name, which can be used by caches to specify for replication and bootstrap purposes.
484      *
485      * @return the well-known scheme name, which is determined by the replication provider author.
486      */
487     public String getScheme() {
488         return "RMI";
489     }
490 
491     /***
492      * Called immediately after a cache has been added and activated.
493      * <p/>
494      * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
495      * method on CacheManager from this method will cause a deadlock.
496      * <p/>
497      * Note that activation will also cause a CacheEventListener status change notification from
498      * {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} to {@link net.sf.ehcache.Status#STATUS_ALIVE}. Care should be
499      * taken on processing that notification because:
500      * <ul>
501      * <li>the cache will not yet be accessible from the CacheManager.
502      * <li>the addCaches methods whih cause this notification are synchronized on the CacheManager. An attempt to call
503      * {@link net.sf.ehcache.CacheManager#getCache(String)} will cause a deadlock.
504      * </ul>
505      * The calling method will block until this method returns.
506      * <p/>
507      * Repopulates the list of cache peers and rebinds the list.
508      * This method should be called if a cache is dynamically added
509      *
510      * @param cacheName the name of the <code>Cache</code> the operation relates to
511      * @see net.sf.ehcache.event.CacheEventListener
512      */
513     public void notifyCacheAdded(String cacheName) throws CacheException {
514 
515 
516             LOG.debug("Adding to RMI listener", cacheName);
517 
518         //Don't add if exists.
519         synchronized (cachePeers) {
520             if (cachePeers.get(cacheName) != null) {
521                 return;
522             }
523         }
524 
525         Ehcache cache = cacheManager.getEhcache(cacheName);
526         if (isDistributed(cache)) {
527             RMICachePeer rmiCachePeer = null;
528             String url = null;
529             try {
530                 if (cache.getCacheConfiguration().getTransactionalMode().isTransactional()) {
531                     rmiCachePeer = new TransactionalRMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
532                 } else {
533                     rmiCachePeer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis);
534                 }
535                 url = rmiCachePeer.getUrl();
536                 bind(url, rmiCachePeer);
537             } catch (Exception e) {
538                 throw new CacheException("Problem starting listener for RMICachePeer "
539                         + url + ". Initial cause was " + e.getMessage(), e);
540             }
541 
542             synchronized (cachePeers) {
543                 cachePeers.put(cacheName, rmiCachePeer);
544             }
545 
546         }
547         if (LOG.isDebugEnabled()) {
548             LOG.debug(cachePeers.size() + " RMICachePeers bound in registry for RMI listener");
549         }
550     }
551 
552     /***
553      * Called immediately after a cache has been disposed and removed. The calling method will block until
554      * this method returns.
555      * <p/>
556      * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized
557      * method on CacheManager from this method will cause a deadlock.
558      * <p/>
559      * Note that a {@link net.sf.ehcache.event.CacheEventListener} status changed will also be triggered. Any attempt from that notification
560      * to access CacheManager will also result in a deadlock.
561      *
562      * @param cacheName the name of the <code>Cache</code> the operation relates to
563      */
564     public void notifyCacheRemoved(String cacheName) {
565 
566 
567             LOG.debug("Removing from RMI listener", cacheName);
568 
569         //don't remove if already removed.
570         synchronized (cachePeers) {
571             if (cachePeers.get(cacheName) == null) {
572                 return;
573             }
574         }
575 
576         RMICachePeer rmiCachePeer;
577         synchronized (cachePeers) {
578             rmiCachePeer = (RMICachePeer) cachePeers.remove(cacheName);
579         }
580         String url = null;
581         try {
582             unbind(rmiCachePeer);
583         } catch (Exception e) {
584             throw new CacheException("Error removing Cache Peer "
585                     + url + " from listener. Message was: " + e.getMessage(), e);
586         }
587 
588         if (LOG.isDebugEnabled()) {
589             LOG.debug(cachePeers.size() + " RMICachePeers bound in registry for RMI listener");
590         }
591     }
592 
593 
594     /***
595      * Package local method for testing
596      */
597     void addCachePeer(String name, RMICachePeer peer) {
598         synchronized (cachePeers) {
599             cachePeers.put(name, peer);
600 
601         }
602     }
603 }