1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.CacheManager;
20
21 import java.io.IOException;
22 import java.net.DatagramPacket;
23 import java.net.InetAddress;
24 import java.net.MulticastSocket;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /***
33 * Sends heartbeats to a multicast group containing a compressed list of URLs.
34 * <p/>
35 * You can control how far the multicast packets propagate by setting the badly misnamed "TTL".
36 * Using the multicast IP protocol, the TTL value indicates the scope or range in which a packet may be forwarded.
37 * By convention:
38 * <ul>
39 * <li>0 is restricted to the same host
40 * <li>1 is restricted to the same subnet
41 * <li>32 is restricted to the same site
42 * <li>64 is restricted to the same region
43 * <li>128 is restricted to the same continent
44 * <li>255 is unrestricted
45 * </ul>
46 * You can also control how often the heartbeat sends by setting the interval.
47 *
48 * @author Greg Luck
49 * @version $Id: MulticastKeepaliveHeartbeatSender.html 13146 2011-08-01 17:12:39Z oletizi $
50 */
51 public final class MulticastKeepaliveHeartbeatSender {
52
53
54 private static final Logger LOG = LoggerFactory.getLogger(MulticastKeepaliveHeartbeatSender.class.getName());
55
56 private static final int DEFAULT_HEARTBEAT_INTERVAL = 5000;
57 private static final int MINIMUM_HEARTBEAT_INTERVAL = 1000;
58 private static final int MAXIMUM_PEERS_PER_SEND = 150;
59
60 private static long heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
61
62 private final InetAddress groupMulticastAddress;
63 private final Integer groupMulticastPort;
64 private final Integer timeToLive;
65 private MulticastServerThread serverThread;
66 private volatile boolean stopped;
67 private final CacheManager cacheManager;
68 private InetAddress hostAddress;
69
70 /***
71 * Constructor.
72 *
73 * @param cacheManager the bound CacheManager. Each CacheManager has a maximum of one sender
74 * @param multicastAddress
75 * @param multicastPort
76 * @param timeToLive See class description for the meaning of this parameter.
77 */
78 public MulticastKeepaliveHeartbeatSender(CacheManager cacheManager,
79 InetAddress multicastAddress, Integer multicastPort,
80 Integer timeToLive,
81 InetAddress hostAddress) {
82 this.cacheManager = cacheManager;
83 this.groupMulticastAddress = multicastAddress;
84 this.groupMulticastPort = multicastPort;
85 this.timeToLive = timeToLive;
86 this.hostAddress = hostAddress;
87
88 }
89
90 /***
91 * Start the heartbeat thread
92 */
93 public final void init() {
94 serverThread = new MulticastServerThread();
95 serverThread.start();
96 }
97
98 /***
99 * Shutdown this heartbeat sender
100 */
101 public final synchronized void dispose() {
102 stopped = true;
103 notifyAll();
104 serverThread.interrupt();
105 }
106
107 /***
108 * A thread which sends a multicast heartbeat every second
109 */
110 private final class MulticastServerThread extends Thread {
111
112 private MulticastSocket socket;
113 private List compressedUrlListList = new ArrayList();
114 private int cachePeersHash;
115
116
117 /***
118 * Constructor
119 */
120 public MulticastServerThread() {
121 super("Multicast Heartbeat Sender Thread");
122 setDaemon(true);
123 }
124
125 @Override
126 public final void run() {
127 while (!stopped) {
128 try {
129 socket = new MulticastSocket(groupMulticastPort.intValue());
130 if (hostAddress != null) {
131 socket.setInterface(hostAddress);
132 }
133 socket.setTimeToLive(timeToLive.intValue());
134 socket.joinGroup(groupMulticastAddress);
135
136 while (!stopped) {
137 List buffers = createCachePeersPayload();
138 for (Iterator iter = buffers.iterator(); iter.hasNext();) {
139 byte[] buffer = (byte[]) iter.next();
140 DatagramPacket packet = new DatagramPacket(buffer, buffer.length, groupMulticastAddress,
141 groupMulticastPort.intValue());
142 socket.send(packet);
143 }
144 try {
145 synchronized (this) {
146 wait(heartBeatInterval);
147 }
148 } catch (InterruptedException e) {
149 if (!stopped) {
150 LOG.error("Error receiving heartbeat. Initial cause was " + e.getMessage(), e);
151 }
152 }
153 }
154 } catch (IOException e) {
155 LOG.debug("Error on multicast socket", e);
156 } catch (Throwable e) {
157 LOG.info("Unexpected throwable in run thread. Continuing..." + e.getMessage(), e);
158 } finally {
159 closeSocket();
160 }
161 if (!stopped) {
162 try {
163 sleep(heartBeatInterval);
164 } catch (InterruptedException e) {
165 LOG.error("Sleep after error interrupted. Initial cause was " + e.getMessage(), e);
166 }
167 }
168 }
169 }
170
171 /***
172 * Creates a gzipped payload.
173 * <p/>
174 * The last gzipped payload is retained and only recalculated if the list of cache peers
175 * has changed.
176 *
177 * @return a gzipped byte[]
178 */
179 private List createCachePeersPayload() {
180
181 CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener("RMI");
182 if (cacheManagerPeerListener == null) {
183 LOG.warn("The RMICacheManagerPeerListener is missing. You need to configure a cacheManagerPeerListenerFactory" +
184 " with class=\"net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory\" in ehcache.xml.");
185 return new ArrayList();
186 }
187 List localCachePeers = cacheManagerPeerListener.getBoundCachePeers();
188 int newCachePeersHash = localCachePeers.hashCode();
189 if (cachePeersHash != newCachePeersHash) {
190 cachePeersHash = newCachePeersHash;
191 compressedUrlListList = PayloadUtil.createCompressedPayloadList(localCachePeers, MAXIMUM_PEERS_PER_SEND);
192 }
193 return compressedUrlListList;
194 }
195
196
197 /***
198 * Interrupts this thread.
199 * <p/>
200 * <p> Unless the current thread is interrupting itself, which is
201 * always permitted, the {@link #checkAccess() checkAccess} method
202 * of this thread is invoked, which may cause a {@link
203 * SecurityException} to be thrown.
204 * <p/>
205 * <p> If this thread is blocked in an invocation of the {@link
206 * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
207 * Object#wait(long,int) wait(long, int)} methods of the {@link Object}
208 * class, or of the {@link #join()}, {@link #join(long)}, {@link
209 * #join(long,int)}, {@link #sleep(long)}, or {@link #sleep(long,int)},
210 * methods of this class, then its interrupt status will be cleared and it
211 * will receive an {@link InterruptedException}.
212 * <p/>
213 * <p> If this thread is blocked in an I/O operation upon an {@link
214 * java.nio.channels.InterruptibleChannel </code>interruptible
215 * channel<code>} then the channel will be closed, the thread's interrupt
216 * status will be set, and the thread will receive a {@link
217 * java.nio.channels.ClosedByInterruptException}.
218 * <p/>
219 * <p> If this thread is blocked in a {@link java.nio.channels.Selector}
220 * then the thread's interrupt status will be set and it will return
221 * immediately from the selection operation, possibly with a non-zero
222 * value, just as if the selector's {@link
223 * java.nio.channels.Selector#wakeup wakeup} method were invoked.
224 * <p/>
225 * <p> If none of the previous conditions hold then this thread's interrupt
226 * status will be set. </p>
227 *
228 * @throws SecurityException if the current thread cannot modify this thread
229 */
230 @Override
231 public final void interrupt() {
232 closeSocket();
233 super.interrupt();
234 }
235
236 private void closeSocket() {
237 try {
238 if (socket != null && !socket.isClosed()) {
239 try {
240 socket.leaveGroup(groupMulticastAddress);
241 } catch (IOException e) {
242 LOG.error("Error leaving multicast group. Message was " + e.getMessage());
243 }
244 socket.close();
245 }
246 } catch (NoSuchMethodError e) {
247 LOG.debug("socket.isClosed is not supported by JDK1.3");
248 try {
249 socket.leaveGroup(groupMulticastAddress);
250 } catch (IOException ex) {
251 LOG.error("Error leaving multicast group. Message was " + ex.getMessage());
252 }
253 socket.close();
254 }
255 }
256
257 }
258
259 /***
260 * Sets the heartbeat interval to something other than the default of 5000ms. This is useful for testing,
261 * but not recommended for production. This method is static and so affects the heartbeat interval of all
262 * senders. The change takes effect after the next scheduled heartbeat.
263 *
264 * @param heartBeatInterval a time in ms, greater than 1000
265 */
266 public static void setHeartBeatInterval(long heartBeatInterval) {
267 if (heartBeatInterval < MINIMUM_HEARTBEAT_INTERVAL) {
268 LOG.warn("Trying to set heartbeat interval too low. Using MINIMUM_HEARTBEAT_INTERVAL instead.");
269 MulticastKeepaliveHeartbeatSender.heartBeatInterval = MINIMUM_HEARTBEAT_INTERVAL;
270 } else {
271 MulticastKeepaliveHeartbeatSender.heartBeatInterval = heartBeatInterval;
272 }
273 }
274
275 /***
276 * Returns the heartbeat interval.
277 */
278 public static long getHeartBeatInterval() {
279 return heartBeatInterval;
280 }
281
282 /***
283 * @return the TTL
284 */
285 public Integer getTimeToLive() {
286 return timeToLive;
287 }
288 }