1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import java.io.IOException;
20 import java.net.DatagramPacket;
21 import java.net.InetAddress;
22 import java.net.MulticastSocket;
23 import java.rmi.RemoteException;
24 import java.util.Collections;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.StringTokenizer;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31
32 import net.sf.ehcache.CacheManager;
33 import net.sf.ehcache.util.NamedThreadFactory;
34
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /***
39 * Receives heartbeats from any {@link MulticastKeepaliveHeartbeatSender}s out there.
40 * <p/>
41 * Our own multicast heartbeats are ignored.
42 *
43 * @author Greg Luck
44 * @version $Id: MulticastKeepaliveHeartbeatReceiver.html 13146 2011-08-01 17:12:39Z oletizi $
45 */
46 public final class MulticastKeepaliveHeartbeatReceiver {
47
48 private static final Logger LOG = LoggerFactory.getLogger(MulticastKeepaliveHeartbeatReceiver.class.getName());
49
50 private ExecutorService processingThreadPool;
51 private Set rmiUrlsProcessingQueue = Collections.synchronizedSet(new HashSet());
52 private final InetAddress groupMulticastAddress;
53 private final Integer groupMulticastPort;
54 private MulticastReceiverThread receiverThread;
55 private MulticastSocket socket;
56 private volatile boolean stopped;
57 private final MulticastRMICacheManagerPeerProvider peerProvider;
58 private InetAddress hostAddress;
59
60 /***
61 * Constructor.
62 *
63 * @param peerProvider
64 * @param multicastAddress
65 * @param multicastPort
66 * @param hostAddress
67 */
68 public MulticastKeepaliveHeartbeatReceiver(
69 MulticastRMICacheManagerPeerProvider peerProvider, InetAddress multicastAddress, Integer multicastPort,
70 InetAddress hostAddress) {
71 this.peerProvider = peerProvider;
72 this.groupMulticastAddress = multicastAddress;
73 this.groupMulticastPort = multicastPort;
74 this.hostAddress = hostAddress;
75 }
76
77
78 /***
79 * Start.
80 *
81 * @throws IOException
82 */
83 final void init() throws IOException {
84 socket = new MulticastSocket(groupMulticastPort.intValue());
85 if (hostAddress != null) {
86 socket.setInterface(hostAddress);
87 }
88 socket.joinGroup(groupMulticastAddress);
89 receiverThread = new MulticastReceiverThread();
90 receiverThread.start();
91 processingThreadPool = Executors.newCachedThreadPool(new NamedThreadFactory("Multicast keep-alive Heartbeat Receiver"));
92 }
93
94 /***
95 * Shutdown the heartbeat.
96 */
97 public final void dispose() {
98 LOG.debug("dispose called");
99 processingThreadPool.shutdownNow();
100 stopped = true;
101 receiverThread.interrupt();
102 }
103
104 /***
105 * A multicast receiver which continously receives heartbeats.
106 */
107 private final class MulticastReceiverThread extends Thread {
108
109 /***
110 * Constructor
111 */
112 public MulticastReceiverThread() {
113 super("Multicast Heartbeat Receiver Thread");
114 setDaemon(true);
115 }
116
117 @Override
118 public final void run() {
119 byte[] buf = new byte[PayloadUtil.MTU];
120 try {
121 while (!stopped) {
122 DatagramPacket packet = new DatagramPacket(buf, buf.length);
123 try {
124 socket.receive(packet);
125 byte[] payload = packet.getData();
126 processPayload(payload);
127
128
129 } catch (IOException e) {
130 if (!stopped) {
131 LOG.error("Error receiving heartbeat. " + e.getMessage() +
132 ". Initial cause was " + e.getMessage(), e);
133 }
134 }
135 }
136 } catch (Throwable t) {
137 LOG.error("Multicast receiver thread caught throwable. Cause was " + t.getMessage() + ". Continuing...");
138 }
139 }
140
141 private void processPayload(byte[] compressedPayload) {
142 byte[] payload = PayloadUtil.ungzip(compressedPayload);
143 String rmiUrls = new String(payload);
144 if (self(rmiUrls)) {
145 return;
146 }
147 rmiUrls = rmiUrls.trim();
148 LOG.debug("rmiUrls received {}", rmiUrls);
149 processRmiUrls(rmiUrls);
150 }
151
152 /***
153 * This method forks a new executor to process the received heartbeat in a thread pool.
154 * That way each remote cache manager cannot interfere with others.
155 * <p/>
156 * In the worst case, we have as many concurrent threads as remote cache managers.
157 *
158 * @param rmiUrls
159 */
160 private void processRmiUrls(final String rmiUrls) {
161 if (rmiUrlsProcessingQueue.contains(rmiUrls)) {
162
163 LOG.debug("We are already processing these rmiUrls. Another heartbeat came before we finished: {}", rmiUrls);
164 return;
165 }
166
167 if (processingThreadPool == null) {
168 return;
169 }
170
171 processingThreadPool.execute(new Runnable() {
172 public void run() {
173 try {
174
175 rmiUrlsProcessingQueue.add(rmiUrls);
176 for (StringTokenizer stringTokenizer = new StringTokenizer(rmiUrls,
177 PayloadUtil.URL_DELIMITER); stringTokenizer.hasMoreTokens();) {
178 if (stopped) {
179 return;
180 }
181 String rmiUrl = stringTokenizer.nextToken();
182 registerNotification(rmiUrl);
183 if (!peerProvider.peerUrls.containsKey(rmiUrl)) {
184 LOG.debug("Aborting processing of rmiUrls since failed to add rmiUrl: {}", rmiUrl);
185 return;
186 }
187 }
188 } finally {
189
190 rmiUrlsProcessingQueue.remove(rmiUrls);
191 }
192 }
193 });
194 }
195
196
197 /***
198 * @param rmiUrls
199 * @return true if our own hostname and listener port are found in the list. This then means we have
200 * caught our onw multicast, and should be ignored.
201 */
202 private boolean self(String rmiUrls) {
203 CacheManager cacheManager = peerProvider.getCacheManager();
204 CacheManagerPeerListener cacheManagerPeerListener = cacheManager.getCachePeerListener("RMI");
205 if (cacheManagerPeerListener == null) {
206 return false;
207 }
208 List boundCachePeers = cacheManagerPeerListener.getBoundCachePeers();
209 if (boundCachePeers == null || boundCachePeers.size() == 0) {
210 return false;
211 }
212 CachePeer peer = (CachePeer) boundCachePeers.get(0);
213 try {
214 String cacheManagerUrlBase = peer.getUrlBase();
215 int baseUrlMatch = rmiUrls.indexOf(cacheManagerUrlBase);
216 return baseUrlMatch != -1;
217 } catch (RemoteException e) {
218 LOG.error("Error geting url base", e);
219 return false;
220 }
221 }
222
223 private void registerNotification(String rmiUrl) {
224 peerProvider.registerPeer(rmiUrl);
225 }
226
227
228 /***
229 * {@inheritDoc}
230 */
231 @Override
232 public final void interrupt() {
233 try {
234 socket.leaveGroup(groupMulticastAddress);
235 } catch (IOException e) {
236 LOG.error("Error leaving group");
237 }
238 socket.close();
239 super.interrupt();
240 }
241 }
242
243
244 }