View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import java.io.IOException;
20  import java.net.DatagramPacket;
21  import java.net.InetAddress;
22  import java.net.MulticastSocket;
23  import java.rmi.RemoteException;
24  import java.util.Collections;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.StringTokenizer;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  
32  import net.sf.ehcache.CacheManager;
33  import net.sf.ehcache.util.NamedThreadFactory;
34  
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  /***
39   * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
40   * <p/>
41   * Our own multicast heartbeats are ignored.
42   *
43   * @author Greg Luck
44   * @version $Id: MulticastKeepaliveHeartbeatReceiver.html 13146 2011-08-01 17:12:39Z oletizi $
45   */
46  public final class MulticastKeepaliveHeartbeatReceiver {
47  
48      private static final Logger LOG = LoggerFactory.getLogger(MulticastKeepaliveHeartbeatReceiver.class.getName());
49  
50      private ExecutorService processingThreadPool;
51      private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet());
52      private final InetAddress groupMulticastAddress;
53      private final Integer groupMulticastPort;
54      private MulticastReceiverThread receiverThread;
55      private MulticastSocket socket;
56      private volatile boolean stopped;
57      private final MulticastRMICacheManagerPeerProvider peerProvider;
58      private InetAddress hostAddress;
59  
60      /***
61       * Constructor.
62       *
63       * @param peerProvider
64       * @param multicastAddress
65       * @param multicastPort
66       * @param hostAddress
67       */
68      public MulticastKeepaliveHeartbeatReceiver(
69              MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort,
70              InetAddress hostAddress) {
71          this.peerProvider = peerProvider;
72          this.groupMulticastAddress = multicastAddress;
73          this.groupMulticastPort = multicastPort;
74          this.hostAddress = hostAddress;
75      }
76  
77  
78      /***
79       * Start.
80       *
81       * @throws IOException
82       */
83      final void init() throws IOException {
84          socket = new MulticastSocket(groupMulticastPort.intValue());
85          if (hostAddress != null) {
86              socket.setInterface(hostAddress);
87          }
88          socket.joinGroup(groupMulticastAddress);
89          receiverThread = new MulticastReceiverThread();
90          receiverThread.start();
91          processingThreadPool = Executors.newCachedThreadPool(new NamedThreadFactory("Multicast keep-alive Heartbeat Receiver"));
92      }
93  
94      /***
95       * Shutdown the heartbeat.
96       */
97      public final void dispose() {
98          LOG.debug("dispose called");
99          processingThreadPool.shutdownNow();
100         stopped = true;
101         receiverThread.interrupt();
102     }
103 
104     /***
105      * A multicast receiver which continously receives heartbeats.
106      */
107     private final class MulticastReceiverThread extends Thread {
108 
109         /***
110          * Constructor
111          */
112         public MulticastReceiverThread() {
113             super("Multicast Heartbeat Receiver Thread");
114             setDaemon(true);
115         }
116 
117         @Override
118         public final void run() {
119             byte[] buf = new byte[PayloadUtil.MTU];
120             try {
121                 while (!stopped) {
122                     DatagramPacket packet = new DatagramPacket(buf, buf.length);
123                     try {
124                         socket.receive(packet);
125                         byte[] payload = packet.getData();
126                         processPayload(payload);
127 
128 
129                     } catch (IOException e) {
130                         if (!stopped) {
131                             LOG.error("Error receiving heartbeat. " + e.getMessage() +
132                                     ". Initial cause was " + e.getMessage(), e);
133                         }
134                     }
135                 }
136             } catch (Throwable t) {
137                 LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing...");
138             }
139         }
140 
141         private void processPayload(byte[] compressedPayload) {
142             byte[] payload = PayloadUtil.ungzip(compressedPayload);
143             String rmiUrls = new String(payload);
144             if (self(rmiUrls)) {
145                 return;
146             }
147             rmiUrls = rmiUrls.trim();
148                 LOG.debug("rmiUrls received {}", rmiUrls);
149             processRmiUrls(rmiUrls);
150         }
151 
152         /***
153          * This method forks a new executor to process the received heartbeat in a thread pool.
154          * That way each remote cache manager cannot interfere with others.
155          * <p/>
156          * In the worst case, we have as many concurrent threads as remote cache managers.
157          *
158          * @param rmiUrls
159          */
160         private void processRmiUrls(final String rmiUrls) {
161             if (rmiUrlsProcessingQueue.contains(rmiUrls)) {
162 
163                     LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: {}", rmiUrls);
164                 return;
165             }
166 
167             if (processingThreadPool == null) {
168                 return;
169             }
170 
171             processingThreadPool.execute(new Runnable() {
172                 public void run() {
173                     try {
174                         // Add the rmiUrls we are processing.
175                         rmiUrlsProcessingQueue.add(rmiUrls);
176                         for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
177                                 PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
178                             if (stopped) {
179                                 return;
180                             }
181                             String rmiUrl = stringTokenizer.nextToken();
182                             registerNotification(rmiUrl);
183                             if (!peerProvider.peerUrls.containsKey(rmiUrl)) {
184                                     LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: {}", rmiUrl);
185                                 return;
186                             }
187                         }
188                     } finally {
189                         // Remove the rmiUrls we just processed
190                         rmiUrlsProcessingQueue.remove(rmiUrls);
191                     }
192                 }
193             });
194         }
195 
196 
197         /***
198          * @param rmiUrls
199          * @return true if our own hostname and listener port are found in the list. This then means we have
200          *         caught our onw multicast, and should be ignored.
201          */
202         private boolean self(String rmiUrls) {
203             CacheManager cacheManager = peerProvider.getCacheManager();
204             CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener("RMI");
205             if (cacheManagerPeerListener == null) {
206                 return false;
207             }
208             List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers();
209             if (boundCachePeers == null || boundCachePeers.size() == 0) {
210                 return false;
211             }
212             CachePeer peer = (CachePeer) boundCachePeers.get(0);
213             try {
214                 String cacheManagerUrlBase = peer.getUrlBase();
215                 int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
216                 return baseUrlMatch != -1;
217             } catch (RemoteException e) {
218                 LOG.error("Error geting url base", e);
219                 return false;
220             }
221         }
222 
223         private void registerNotification(String rmiUrl) {
224             peerProvider.registerPeer(rmiUrl);
225         }
226 
227 
228         /***
229          * {@inheritDoc}
230          */
231         @Override
232         public final void interrupt() {
233             try {
234                 socket.leaveGroup(groupMulticastAddress);
235             } catch (IOException e) {
236                 LOG.error("Error leaving group");
237             }
238             socket.close();
239             super.interrupt();
240         }
241     }
242 
243 
244 }