View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.CacheManager;
21  import net.sf.ehcache.Ehcache;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.rmi.NotBoundException;
26  import java.util.ArrayList;
27  import java.util.Date;
28  import java.util.Iterator;
29  import java.util.List;
30  
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  /***
35   * A peer provider which discovers peers using Multicast.
36   * <p/>
37   * Hosts can be in three different levels of conformance with the Multicast specification (RFC1112), according to the requirements they meet.
38   * <ol>
39   * <li>Level 0 is the "no support for IP Multicasting" level. Lots of hosts and routers in the Internet are in this state,
40   * as multicast support is not mandatory in IPv4 (it is, however, in IPv6).
41   * Not too much explanation is needed here: hosts in this level can neither send nor receive multicast packets.
42   * They must ignore the ones sent by other multicast capable hosts.
43   * <li>Level 1 is the "support for sending but not receiving multicast IP datagrams" level.
44   * Thus, note that it is not necessary to join a multicast group to be able to send datagrams to it.
45   * Very few additions are needed in the IP module to make a "Level 0" host "Level 1-compliant".
46   * <li>Level 2 is the "full support for IP multicasting" level.
47   * Level 2 hosts must be able to both send and receive multicast traffic.
48   * They must know the way to join and leave multicast groups and to propagate this information to multicast routers.
49   * Thus, they must include an Internet Group Management Protocol (IGMP) implementation in their TCP/IP stack.
50   * </ol>
51   * <p/>
52   * The list of CachePeers is maintained via heartbeats. rmiUrls are looked up using RMI and converted to CachePeers on
53   * registration. On lookup any stale references are removed.
54   *
55   * @author Greg Luck
56   * @version $Id: MulticastRMICacheManagerPeerProvider.html 13146 2011-08-01 17:12:39Z oletizi $
57   */
58  public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
59  
60      /***
61       * One tenth of a second, in ms
62       */
63      protected static final int SHORT_DELAY = 100;
64  
65      private static final Logger LOG = LoggerFactory.getLogger(MulticastRMICacheManagerPeerProvider.class.getName());
66  
67  
68      private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
69      private final MulticastKeepaliveHeartbeatSender heartBeatSender;
70  
71      /***
72       * Creates and starts a multicast peer provider
73       *
74       * @param groupMulticastAddress 224.0.0.1 to 239.255.255.255 e.g. 230.0.0.1
75       * @param groupMulticastPort    1025 to 65536 e.g. 4446
76       * @param hostAddress the address of the interface to use for sending and receiving multicast. May be null.
77       */
78      public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddress groupMulticastAddress,
79                                                  Integer groupMulticastPort, Integer timeToLive, InetAddress hostAddress) {
80          super(cacheManager);
81  
82  
83  
84          heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(this, groupMulticastAddress,
85                  groupMulticastPort, hostAddress);
86          heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress,
87                          groupMulticastPort, timeToLive, hostAddress);
88      }
89  
90      /***
91       * {@inheritDoc}
92       */
93      public final void init() throws CacheException {
94          try {
95              heartBeatReceiver.init();
96              heartBeatSender.init();
97          } catch (IOException exception) {
98              LOG.error("Error starting heartbeat. Error was: " + exception.getMessage(), exception);
99              throw new CacheException(exception.getMessage());
100         }
101     }
102 
103     /***
104      * Register a new peer, but only if the peer is new, otherwise the last seen timestamp is updated.
105      * <p/>
106      * This method is thread-safe. It relies on peerUrls being a synchronizedMap
107      *
108      * @param rmiUrl
109      */
110     public final void registerPeer(String rmiUrl) {
111         try {
112             CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
113             if (cachePeerEntry == null || stale(cachePeerEntry.date)) {
114                 //can take seconds if there is a problem
115                 CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
116                 cachePeerEntry = new CachePeerEntry(cachePeer, new Date());
117                 //synchronized due to peerUrls being a synchronizedMap
118                 peerUrls.put(rmiUrl, cachePeerEntry);
119             } else {
120                 cachePeerEntry.date = new Date();
121             }
122         } catch (IOException e) {
123             if (LOG.isDebugEnabled()) {
124                 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
125                         + e.getMessage());
126             }
127             unregisterPeer(rmiUrl);
128         } catch (NotBoundException e) {
129             peerUrls.remove(rmiUrl);
130             if (LOG.isDebugEnabled()) {
131                 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
132                         + e.getMessage());
133             }
134         } catch (Throwable t) {
135             LOG.error("Unable to lookup remote cache peer for " + rmiUrl
136                     + ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:" +
137                     " " + t.getMessage());
138         }
139     }
140 
141     /***
142      * @return a list of {@link CachePeer} peers, excluding the local peer.
143      */
144     public final synchronized List listRemoteCachePeers(Ehcache cache) throws CacheException {
145         List remoteCachePeers = new ArrayList();
146         List staleList = new ArrayList();
147         synchronized (peerUrls) {
148             for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
149                 String rmiUrl = (String) iterator.next();
150                 String rmiUrlCacheName = extractCacheName(rmiUrl);
151                 try {
152                     if (!rmiUrlCacheName.equals(cache.getName())) {
153                         continue;
154                     }
155                     CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
156                     Date date = cachePeerEntry.date;
157                     if (!stale(date)) {
158                         CachePeer cachePeer = cachePeerEntry.cachePeer;
159                         remoteCachePeers.add(cachePeer);
160                     } else {
161 
162                             LOG.debug("rmiUrl is stale. Either the remote peer is shutdown or the " +
163                                     "network connectivity has been interrupted. Will be removed from list of remote cache peers",
164                                     rmiUrl);
165                         staleList.add(rmiUrl);
166                     }
167                 } catch (Exception exception) {
168                     LOG.error(exception.getMessage(), exception);
169                     throw new CacheException("Unable to list remote cache peers. Error was " + exception.getMessage());
170                 }
171             }
172             //Must remove entries after we have finished iterating over them
173             for (int i = 0; i < staleList.size(); i++) {
174                 String rmiUrl = (String) staleList.get(i);
175                 peerUrls.remove(rmiUrl);
176             }
177         }
178         return remoteCachePeers;
179     }
180 
181 
182     /***
183      * Shutdown the heartbeat
184      */
185     public final void dispose() {
186         heartBeatSender.dispose();
187         heartBeatReceiver.dispose();
188     }
189 
190     /***
191      * Time for a cluster to form. This varies considerably, depending on the implementation.
192      *
193      * @return the time in ms, for a cluster to form
194      */
195     public long getTimeForClusterToForm() {
196         return getStaleTime();
197     }
198 
199     /***
200      * The time after which an unrefreshed peer provider entry is considered stale.
201      */
202     protected long getStaleTime() {
203         return MulticastKeepaliveHeartbeatSender.getHeartBeatInterval() * 2 + SHORT_DELAY;
204     }
205 
206     /***
207      * Whether the entry should be considered stale.
208      * This will depend on the type of RMICacheManagerPeerProvider.
209      * This method should be overridden for implementations that go stale based on date
210      *
211      * @param date the date the entry was created
212      * @return true if stale
213      */
214     protected final boolean stale(Date date) {
215         long now = System.currentTimeMillis();
216         return date.getTime() < (now - getStaleTime());
217     }
218 
219 
220     /***
221      * Entry containing a looked up CachePeer and date
222      */
223     protected static final class CachePeerEntry {
224 
225         private final CachePeer cachePeer;
226         private Date date;
227 
228         /***
229          * Constructor
230          *
231          * @param cachePeer the cache peer part of this entry
232          * @param date      the date part of this entry
233          */
234         public CachePeerEntry(CachePeer cachePeer, Date date) {
235             this.cachePeer = cachePeer;
236             this.date = date;
237         }
238 
239         /***
240          * @return the cache peer part of this entry
241          */
242         public final CachePeer getCachePeer() {
243             return cachePeer;
244         }
245 
246 
247         /***
248          * @return the date part of this entry
249          */
250         public final Date getDate() {
251             return date;
252         }
253 
254     }
255 
256     /***
257      * @return the MulticastKeepaliveHeartbeatReceiver
258      */
259     public MulticastKeepaliveHeartbeatReceiver getHeartBeatReceiver() {
260         return heartBeatReceiver;
261     }
262 
263     /***
264      * @return the MulticastKeepaliveHeartbeatSender
265      */
266     public MulticastKeepaliveHeartbeatSender getHeartBeatSender() {
267         return heartBeatSender;
268     }
269 }