View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.CacheManager;
20  
21  import java.io.IOException;
22  import java.net.DatagramPacket;
23  import java.net.InetAddress;
24  import java.net.MulticastSocket;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  /***
33   * Sends heartbeats to a multicast group containing a compressed list of URLs.
34   * <p/>
35   * You can control how far the multicast packets propagate by setting the badly misnamed "TTL".
36   * Using the multicast IP protocol, the TTL value indicates the scope or range in which a packet may be forwarded.
37   * By convention:
38   * <ul>
39   * <li>0 is restricted to the same host
40   * <li>1 is restricted to the same subnet
41   * <li>32 is restricted to the same site
42   * <li>64 is restricted to the same region
43   * <li>128 is restricted to the same continent
44   * <li>255 is unrestricted
45   * </ul>
46   * You can also control how often the heartbeat sends by setting the interval.
47   *
48   * @author Greg Luck
49   * @version $Id: MulticastKeepaliveHeartbeatSender.html 13146 2011-08-01 17:12:39Z oletizi $
50   */
51  public final class MulticastKeepaliveHeartbeatSender {
52  
53  
54      private static final Logger LOG = LoggerFactory.getLogger(MulticastKeepaliveHeartbeatSender.class.getName());
55  
56      private static final int DEFAULT_HEARTBEAT_INTERVAL = 5000;
57      private static final int MINIMUM_HEARTBEAT_INTERVAL = 1000;
58      private static final int MAXIMUM_PEERS_PER_SEND = 150;
59  
60      private static long heartBeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
61  
62      private final InetAddress groupMulticastAddress;
63      private final Integer groupMulticastPort;
64      private final Integer timeToLive;
65      private MulticastServerThread serverThread;
66      private volatile boolean stopped;
67      private final CacheManager cacheManager;
68      private InetAddress hostAddress;
69  
70      /***
71       * Constructor.
72       *
73       * @param cacheManager     the bound CacheManager. Each CacheManager has a maximum of one sender
74       * @param multicastAddress
75       * @param multicastPort
76       * @param timeToLive       See class description for the meaning of this parameter.
77       */
78      public MulticastKeepaliveHeartbeatSender(CacheManager cacheManager,
79                                               InetAddress multicastAddress, Integer multicastPort,
80                                               Integer timeToLive,
81                                               InetAddress hostAddress) {
82          this.cacheManager = cacheManager;
83          this.groupMulticastAddress = multicastAddress;
84          this.groupMulticastPort = multicastPort;
85          this.timeToLive = timeToLive;
86          this.hostAddress = hostAddress;
87  
88      }
89  
90      /***
91       * Start the heartbeat thread
92       */
93      public final void init() {
94          serverThread = new MulticastServerThread();
95          serverThread.start();
96      }
97  
98      /***
99       * Shutdown this heartbeat sender
100      */
101     public final synchronized void dispose() {
102         stopped = true;
103         notifyAll();
104         serverThread.interrupt();
105     }
106 
107     /***
108      * A thread which sends a multicast heartbeat every second
109      */
110     private final class MulticastServerThread extends Thread {
111 
112         private MulticastSocket socket;
113         private List compressedUrlListList = new ArrayList();
114         private int cachePeersHash;
115 
116 
117         /***
118          * Constructor
119          */
120         public MulticastServerThread() {
121             super("Multicast Heartbeat Sender Thread");
122             setDaemon(true);
123         }
124 
125         @Override
126         public final void run() {
127             while (!stopped) {
128                 try {
129                     socket = new MulticastSocket(groupMulticastPort.intValue());
130                     if (hostAddress != null) {
131                         socket.setInterface(hostAddress);
132                     }
133                     socket.setTimeToLive(timeToLive.intValue());
134                     socket.joinGroup(groupMulticastAddress);
135 
136                     while (!stopped) {
137                         List buffers = createCachePeersPayload();
138                         for (Iterator iter = buffers.iterator(); iter.hasNext();) {
139                             byte[] buffer = (byte[]) iter.next();
140                             DatagramPacket packet = new DatagramPacket(buffer, buffer.length, groupMulticastAddress,
141                                     groupMulticastPort.intValue());
142                             socket.send(packet);
143                         }
144                         try {
145                             synchronized (this) {
146                                 wait(heartBeatInterval);
147                             }
148                         } catch (InterruptedException e) {
149                             if (!stopped) {
150                                 LOG.error("Error receiving heartbeat. Initial cause was " + e.getMessage(), e);
151                             }
152                         }
153                     }
154                 } catch (IOException e) {
155                     LOG.debug("Error on multicast socket", e);
156                 } catch (Throwable e) {
157                     LOG.info("Unexpected throwable in run thread. Continuing..." + e.getMessage(), e);
158                 } finally {
159                     closeSocket();
160                 }
161                 if (!stopped) {
162                     try {
163                         sleep(heartBeatInterval);
164                     } catch (InterruptedException e) {
165                         LOG.error("Sleep after error interrupted. Initial cause was " + e.getMessage(), e);
166                     }
167                 }
168             }
169         }
170 
171         /***
172          * Creates a gzipped payload.
173          * <p/>
174          * The last gzipped payload is retained and only recalculated if the list of cache peers
175          * has changed.
176          *
177          * @return a gzipped byte[]
178          */
179         private List createCachePeersPayload() {
180 
181             CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener("RMI");
182             if (cacheManagerPeerListener == null) {
183                 LOG.warn("The RMICacheManagerPeerListener is missing. You need to configure a cacheManagerPeerListenerFactory" +
184                         " with class=\"net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory\" in ehcache.xml.");
185                 return new ArrayList();
186             }
187             List localCachePeers = cacheManagerPeerListener.getBoundCachePeers();
188             int newCachePeersHash = localCachePeers.hashCode();
189             if (cachePeersHash != newCachePeersHash) {
190                 cachePeersHash = newCachePeersHash;
191                 compressedUrlListList = PayloadUtil.createCompressedPayloadList(localCachePeers, MAXIMUM_PEERS_PER_SEND);
192             }
193             return compressedUrlListList;
194         }
195 
196 
197         /***
198          * Interrupts this thread.
199          * <p/>
200          * <p> Unless the current thread is interrupting itself, which is
201          * always permitted, the {@link #checkAccess() checkAccess} method
202          * of this thread is invoked, which may cause a {@link
203          * SecurityException} to be thrown.
204          * <p/>
205          * <p> If this thread is blocked in an invocation of the {@link
206          * Object#wait() wait()}, {@link Object#wait(long) wait(long)}, or {@link
207          * Object#wait(long,int) wait(long, int)} methods of the {@link Object}
208          * class, or of the {@link #join()}, {@link #join(long)}, {@link
209          * #join(long,int)}, {@link #sleep(long)}, or {@link #sleep(long,int)},
210          * methods of this class, then its interrupt status will be cleared and it
211          * will receive an {@link InterruptedException}.
212          * <p/>
213          * <p> If this thread is blocked in an I/O operation upon an {@link
214          * java.nio.channels.InterruptibleChannel </code>interruptible
215          * channel<code>} then the channel will be closed, the thread's interrupt
216          * status will be set, and the thread will receive a {@link
217          * java.nio.channels.ClosedByInterruptException}.
218          * <p/>
219          * <p> If this thread is blocked in a {@link java.nio.channels.Selector}
220          * then the thread's interrupt status will be set and it will return
221          * immediately from the selection operation, possibly with a non-zero
222          * value, just as if the selector's {@link
223          * java.nio.channels.Selector#wakeup wakeup} method were invoked.
224          * <p/>
225          * <p> If none of the previous conditions hold then this thread's interrupt
226          * status will be set. </p>
227          *
228          * @throws SecurityException if the current thread cannot modify this thread
229          */
230         @Override
231         public final void interrupt() {
232             closeSocket();
233             super.interrupt();
234         }
235 
236         private void closeSocket() {
237             try {
238                 if (socket != null && !socket.isClosed()) {
239                     try {
240                         socket.leaveGroup(groupMulticastAddress);
241                     } catch (IOException e) {
242                         LOG.error("Error leaving multicast group. Message was " + e.getMessage());
243                     }
244                     socket.close();
245                 }
246             } catch (NoSuchMethodError e) {
247                 LOG.debug("socket.isClosed is not supported by JDK1.3");
248                 try {
249                     socket.leaveGroup(groupMulticastAddress);
250                 } catch (IOException ex) {
251                     LOG.error("Error leaving multicast group. Message was " + ex.getMessage());
252                 }
253                 socket.close();
254             }
255         }
256 
257     }
258 
259     /***
260      * Sets the heartbeat interval to something other than the default of 5000ms. This is useful for testing,
261      * but not recommended for production. This method is static and so affects the heartbeat interval of all
262      * senders. The change takes effect after the next scheduled heartbeat.
263      *
264      * @param heartBeatInterval a time in ms, greater than 1000
265      */
266     public static void setHeartBeatInterval(long heartBeatInterval) {
267         if (heartBeatInterval < MINIMUM_HEARTBEAT_INTERVAL) {
268             LOG.warn("Trying to set heartbeat interval too low. Using MINIMUM_HEARTBEAT_INTERVAL instead.");
269             MulticastKeepaliveHeartbeatSender.heartBeatInterval = MINIMUM_HEARTBEAT_INTERVAL;
270         } else {
271             MulticastKeepaliveHeartbeatSender.heartBeatInterval = heartBeatInterval;
272         }
273     }
274 
275     /***
276      * Returns the heartbeat interval.
277      */
278     public static long getHeartBeatInterval() {
279         return heartBeatInterval;
280     }
281 
282     /***
283      * @return the TTL
284      */
285     public Integer getTimeToLive() {
286         return timeToLive;
287     }
288 }