1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.CacheManager;
21 import net.sf.ehcache.Ehcache;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.rmi.NotBoundException;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.Iterator;
29 import java.util.List;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /***
35 * A peer provider which discovers peers using Multicast.
36 * <p/>
37 * Hosts can be in three different levels of conformance with the Multicast specification (RFC1112), according to the requirements they meet.
38 * <ol>
39 * <li>Level 0 is the "no support for IP Multicasting" level. Lots of hosts and routers in the Internet are in this state,
40 * as multicast support is not mandatory in IPv4 (it is, however, in IPv6).
41 * Not too much explanation is needed here: hosts in this level can neither send nor receive multicast packets.
42 * They must ignore the ones sent by other multicast capable hosts.
43 * <li>Level 1 is the "support for sending but not receiving multicast IP datagrams" level.
44 * Thus, note that it is not necessary to join a multicast group to be able to send datagrams to it.
45 * Very few additions are needed in the IP module to make a "Level 0" host "Level 1-compliant".
46 * <li>Level 2 is the "full support for IP multicasting" level.
47 * Level 2 hosts must be able to both send and receive multicast traffic.
48 * They must know the way to join and leave multicast groups and to propagate this information to multicast routers.
49 * Thus, they must include an Internet Group Management Protocol (IGMP) implementation in their TCP/IP stack.
50 * </ol>
51 * <p/>
52 * The list of CachePeers is maintained via heartbeats. rmiUrls are looked up using RMI and converted to CachePeers on
53 * registration. On lookup any stale references are removed.
54 *
55 * @author Greg Luck
56 * @version $Id: MulticastRMICacheManagerPeerProvider.html 13146 2011-08-01 17:12:39Z oletizi $
57 */
58 public final class MulticastRMICacheManagerPeerProvider extends RMICacheManagerPeerProvider implements CacheManagerPeerProvider {
59
60 /***
61 * One tenth of a second, in ms
62 */
63 protected static final int SHORT_DELAY = 100;
64
65 private static final Logger LOG = LoggerFactory.getLogger(MulticastRMICacheManagerPeerProvider.class.getName());
66
67
68 private final MulticastKeepaliveHeartbeatReceiver heartBeatReceiver;
69 private final MulticastKeepaliveHeartbeatSender heartBeatSender;
70
71 /***
72 * Creates and starts a multicast peer provider
73 *
74 * @param groupMulticastAddress 224.0.0.1 to 239.255.255.255 e.g. 230.0.0.1
75 * @param groupMulticastPort 1025 to 65536 e.g. 4446
76 * @param hostAddress the address of the interface to use for sending and receiving multicast. May be null.
77 */
78 public MulticastRMICacheManagerPeerProvider(CacheManager cacheManager, InetAddress groupMulticastAddress,
79 Integer groupMulticastPort, Integer timeToLive, InetAddress hostAddress) {
80 super(cacheManager);
81
82
83
84 heartBeatReceiver = new MulticastKeepaliveHeartbeatReceiver(this, groupMulticastAddress,
85 groupMulticastPort, hostAddress);
86 heartBeatSender = new MulticastKeepaliveHeartbeatSender(cacheManager, groupMulticastAddress,
87 groupMulticastPort, timeToLive, hostAddress);
88 }
89
90 /***
91 * {@inheritDoc}
92 */
93 public final void init() throws CacheException {
94 try {
95 heartBeatReceiver.init();
96 heartBeatSender.init();
97 } catch (IOException exception) {
98 LOG.error("Error starting heartbeat. Error was: " + exception.getMessage(), exception);
99 throw new CacheException(exception.getMessage());
100 }
101 }
102
103 /***
104 * Register a new peer, but only if the peer is new, otherwise the last seen timestamp is updated.
105 * <p/>
106 * This method is thread-safe. It relies on peerUrls being a synchronizedMap
107 *
108 * @param rmiUrl
109 */
110 public final void registerPeer(String rmiUrl) {
111 try {
112 CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
113 if (cachePeerEntry == null || stale(cachePeerEntry.date)) {
114
115 CachePeer cachePeer = lookupRemoteCachePeer(rmiUrl);
116 cachePeerEntry = new CachePeerEntry(cachePeer, new Date());
117
118 peerUrls.put(rmiUrl, cachePeerEntry);
119 } else {
120 cachePeerEntry.date = new Date();
121 }
122 } catch (IOException e) {
123 if (LOG.isDebugEnabled()) {
124 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
125 + e.getMessage());
126 }
127 unregisterPeer(rmiUrl);
128 } catch (NotBoundException e) {
129 peerUrls.remove(rmiUrl);
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("Unable to lookup remote cache peer for " + rmiUrl + ". Removing from peer list. Cause was: "
132 + e.getMessage());
133 }
134 } catch (Throwable t) {
135 LOG.error("Unable to lookup remote cache peer for " + rmiUrl
136 + ". Cause was not due to an IOException or NotBoundException which will occur in normal operation:" +
137 " " + t.getMessage());
138 }
139 }
140
141 /***
142 * @return a list of {@link CachePeer} peers, excluding the local peer.
143 */
144 public final synchronized List listRemoteCachePeers(Ehcache cache) throws CacheException {
145 List remoteCachePeers = new ArrayList();
146 List staleList = new ArrayList();
147 synchronized (peerUrls) {
148 for (Iterator iterator = peerUrls.keySet().iterator(); iterator.hasNext();) {
149 String rmiUrl = (String) iterator.next();
150 String rmiUrlCacheName = extractCacheName(rmiUrl);
151 try {
152 if (!rmiUrlCacheName.equals(cache.getName())) {
153 continue;
154 }
155 CachePeerEntry cachePeerEntry = (CachePeerEntry) peerUrls.get(rmiUrl);
156 Date date = cachePeerEntry.date;
157 if (!stale(date)) {
158 CachePeer cachePeer = cachePeerEntry.cachePeer;
159 remoteCachePeers.add(cachePeer);
160 } else {
161
162 LOG.debug("rmiUrl is stale. Either the remote peer is shutdown or the " +
163 "network connectivity has been interrupted. Will be removed from list of remote cache peers",
164 rmiUrl);
165 staleList.add(rmiUrl);
166 }
167 } catch (Exception exception) {
168 LOG.error(exception.getMessage(), exception);
169 throw new CacheException("Unable to list remote cache peers. Error was " + exception.getMessage());
170 }
171 }
172
173 for (int i = 0; i < staleList.size(); i++) {
174 String rmiUrl = (String) staleList.get(i);
175 peerUrls.remove(rmiUrl);
176 }
177 }
178 return remoteCachePeers;
179 }
180
181
182 /***
183 * Shutdown the heartbeat
184 */
185 public final void dispose() {
186 heartBeatSender.dispose();
187 heartBeatReceiver.dispose();
188 }
189
190 /***
191 * Time for a cluster to form. This varies considerably, depending on the implementation.
192 *
193 * @return the time in ms, for a cluster to form
194 */
195 public long getTimeForClusterToForm() {
196 return getStaleTime();
197 }
198
199 /***
200 * The time after which an unrefreshed peer provider entry is considered stale.
201 */
202 protected long getStaleTime() {
203 return MulticastKeepaliveHeartbeatSender.getHeartBeatInterval() * 2 + SHORT_DELAY;
204 }
205
206 /***
207 * Whether the entry should be considered stale.
208 * This will depend on the type of RMICacheManagerPeerProvider.
209 * This method should be overridden for implementations that go stale based on date
210 *
211 * @param date the date the entry was created
212 * @return true if stale
213 */
214 protected final boolean stale(Date date) {
215 long now = System.currentTimeMillis();
216 return date.getTime() < (now - getStaleTime());
217 }
218
219
220 /***
221 * Entry containing a looked up CachePeer and date
222 */
223 protected static final class CachePeerEntry {
224
225 private final CachePeer cachePeer;
226 private Date date;
227
228 /***
229 * Constructor
230 *
231 * @param cachePeer the cache peer part of this entry
232 * @param date the date part of this entry
233 */
234 public CachePeerEntry(CachePeer cachePeer, Date date) {
235 this.cachePeer = cachePeer;
236 this.date = date;
237 }
238
239 /***
240 * @return the cache peer part of this entry
241 */
242 public final CachePeer getCachePeer() {
243 return cachePeer;
244 }
245
246
247 /***
248 * @return the date part of this entry
249 */
250 public final Date getDate() {
251 return date;
252 }
253
254 }
255
256 /***
257 * @return the MulticastKeepaliveHeartbeatReceiver
258 */
259 public MulticastKeepaliveHeartbeatReceiver getHeartBeatReceiver() {
260 return heartBeatReceiver;
261 }
262
263 /***
264 * @return the MulticastKeepaliveHeartbeatSender
265 */
266 public MulticastKeepaliveHeartbeatSender getHeartBeatSender() {
267 return heartBeatSender;
268 }
269 }