1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.terracotta;
18
19 import java.lang.reflect.Method;
20 import java.util.Collections;
21 import java.util.Map;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import net.sf.ehcache.CacheException;
29 import net.sf.ehcache.CacheManager;
30 import net.sf.ehcache.cluster.CacheCluster;
31 import net.sf.ehcache.cluster.ClusterNode;
32 import net.sf.ehcache.cluster.ClusterTopologyListener;
33 import net.sf.ehcache.config.CacheConfiguration;
34 import net.sf.ehcache.config.InvalidConfigurationException;
35 import net.sf.ehcache.config.TerracottaClientConfiguration;
36 import net.sf.ehcache.config.TerracottaConfiguration.StorageStrategy;
37 import net.sf.ehcache.terracotta.TerracottaClusteredInstanceHelper.TerracottaRuntimeType;
38
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /***
43 * Class encapsulating the idea of a Terracotta client. Provides access to the {@link ClusteredInstanceFactory} for the cluster
44 *
45 * @author Abhishek Sanoujam
46 *
47 */
48 public class TerracottaClient {
49
50 private static final Logger LOGGER = LoggerFactory.getLogger(TerracottaClient.class);
51 private static final int REJOIN_SLEEP_MILLIS_ON_EXCEPTION = Integer.getInteger("net.sf.ehcache.rejoin.sleepMillisOnException", 5000);
52
53 private final TerracottaClientConfiguration terracottaClientConfiguration;
54 private volatile ClusteredInstanceFactoryWrapper clusteredInstanceFactory;
55 private final TerracottaCacheCluster cacheCluster = new TerracottaCacheCluster();
56 private final RejoinWorker rejoinWorker = new RejoinWorker();
57 private final TerracottaClientRejoinListener rejoinListener;
58 private final ExecutorService l1TerminatorThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
59 public Thread newThread(Runnable runnable) {
60 Thread t = new Thread(runnable, "L1 Terminator");
61 t.setDaemon(true);
62 return t;
63 }
64 });
65 private final CacheManager cacheManager;
66
67 /***
68 * Constructor accepting the {@link TerracottaClientRejoinListener} and the {@link TerracottaClientConfiguration}
69 *
70 * @param cacheManager
71 * @param rejoinAction
72 * @param terracottaClientConfiguration
73 */
74 public TerracottaClient(CacheManager cacheManager, TerracottaClientRejoinListener rejoinAction,
75 TerracottaClientConfiguration terracottaClientConfiguration) {
76 this.cacheManager = cacheManager;
77 this.rejoinListener = rejoinAction;
78 this.terracottaClientConfiguration = terracottaClientConfiguration;
79 if (terracottaClientConfiguration != null) {
80 terracottaClientConfiguration.freezeConfig();
81 }
82 if (isRejoinEnabled()) {
83 TerracottaRuntimeType type = TerracottaClusteredInstanceHelper.getInstance().getTerracottaRuntimeTypeOrNull();
84 if (type == null) {
85 throw new InvalidConfigurationException(
86 "Terracotta Rejoin is enabled but can't determine Terracotta Runtime. You are probably missing Terracotta jar(s).");
87 }
88 if (type != TerracottaRuntimeType.EnterpriseExpress && type != TerracottaRuntimeType.Express) {
89 throw new InvalidConfigurationException("Rejoin cannot be used in Terracotta DSO mode.");
90 }
91 Thread rejoinThread = new Thread(rejoinWorker, "Rejoin Worker Thread [cacheManager: " + cacheManager.getName() + "]");
92 rejoinThread.setDaemon(true);
93 rejoinThread.start();
94 }
95 }
96
97 /***
98 * Returns the default {@link StorageStrategy} type for the current Terracotta runtime.
99 *
100 * @param cacheConfiguration the cache's configuration
101 *
102 * @return the default {@link StorageStrategy} type for the current Terracotta runtime.
103 */
104 public static StorageStrategy getTerracottaDefaultStrategyForCurrentRuntime(final CacheConfiguration cacheConfiguration) {
105 return TerracottaClusteredInstanceHelper.getInstance().getDefaultStorageStrategyForCurrentRuntime(cacheConfiguration);
106 }
107
108
109
110
111
112
113
114 private static void setTestMode(TerracottaClusteredInstanceHelper testHelper) {
115 try {
116 Method method = TerracottaClusteredInstanceHelper.class.getDeclaredMethod("setTestMode",
117 TerracottaClusteredInstanceHelper.class);
118 method.setAccessible(true);
119 method.invoke(null, testHelper);
120 } catch (Exception e) {
121
122 e.printStackTrace();
123 }
124 }
125
126 /***
127 * Returns the {@link ClusteredInstanceFactory} associated with this client
128 *
129 * @return The ClusteredInstanceFactory
130 */
131 public ClusteredInstanceFactory getClusteredInstanceFactory() {
132 rejoinWorker.waitUntilRejoinComplete();
133 return clusteredInstanceFactory;
134 }
135
136 /***
137 * Returns true if the clusteredInstanceFactory was created, otherwise returns false.
138 * Multiple threads calling this method block and only one of them creates the factory.
139 *
140 * @param cacheConfigs
141 * @return true if the clusteredInstanceFactory was created, otherwise returns false
142 */
143 public boolean createClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
144 rejoinWorker.waitUntilRejoinComplete();
145 if (clusteredInstanceFactory != null) {
146 return false;
147 }
148 final boolean created;
149 synchronized (this) {
150 if (clusteredInstanceFactory == null) {
151 clusteredInstanceFactory = createNewClusteredInstanceFactory(cacheConfigs);
152 created = true;
153 } else {
154 created = false;
155 }
156 }
157 return created;
158 }
159
160 /***
161 * Get the {@link CacheCluster} associated with this client
162 *
163 * @return the {@link CacheCluster} associated with this client
164 */
165 public TerracottaCacheCluster getCacheCluster() {
166 rejoinWorker.waitUntilRejoinComplete();
167 if (clusteredInstanceFactory == null) {
168 throw new CacheException("Cannot get CacheCluster as ClusteredInstanceFactory has not been initialized yet.");
169 }
170 return cacheCluster;
171 }
172
173 /***
174 * Shuts down the client
175 */
176 public synchronized void shutdown() {
177 rejoinWorker.waitUntilRejoinComplete();
178 rejoinWorker.shutdown();
179 if (clusteredInstanceFactory != null) {
180 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
181 }
182 }
183
184 private void shutdownClusteredInstanceFactoryWrapper(ClusteredInstanceFactoryWrapper clusteredInstanceFactory) {
185 clusteredInstanceFactory.getActualFactory().getTopology().getTopologyListeners().clear();
186 clusteredInstanceFactory.shutdown();
187 }
188
189 private synchronized ClusteredInstanceFactoryWrapper createNewClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
190
191 if (clusteredInstanceFactory != null) {
192 info("Shutting down old ClusteredInstanceFactory...");
193 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
194 }
195 info("Creating new ClusteredInstanceFactory");
196 ClusteredInstanceFactory factory;
197 CacheCluster underlyingCacheCluster = null;
198 try {
199 factory = TerracottaClusteredInstanceHelper.getInstance().newClusteredInstanceFactory(cacheConfigs,
200 terracottaClientConfiguration);
201 underlyingCacheCluster = factory.getTopology();
202 } finally {
203
204 if (isRejoinEnabled()) {
205 if (underlyingCacheCluster != null) {
206 underlyingCacheCluster.addTopologyListener(new NodeLeftListener(this, underlyingCacheCluster
207 .waitUntilNodeJoinsCluster()));
208 } else {
209 warn("Unable to register node left listener for rejoin");
210 }
211 }
212 }
213
214 if (!rejoinWorker.isRejoinInProgress()) {
215
216
217 cacheCluster.setUnderlyingCacheCluster(underlyingCacheCluster);
218 }
219
220 return new ClusteredInstanceFactoryWrapper(this, factory);
221 }
222
223 /***
224 * Block thread until rejoin is complete
225 */
226 protected void waitUntilRejoinComplete() {
227 rejoinWorker.waitUntilRejoinComplete();
228 }
229
230 /***
231 * Rejoins the cluster
232 */
233 private void rejoinCluster(final ClusterNode oldNode) {
234 if (!isRejoinEnabled()) {
235 return;
236 }
237 final Runnable rejoinRunnable = new Runnable() {
238 public void run() {
239 if (rejoinWorker.isRejoinInProgress()) {
240 debug("Current node (" + oldNode.getId() + ") left before rejoin could complete, force terminating current client");
241 if (clusteredInstanceFactory != null) {
242
243
244 info("Shutting down old client");
245 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
246 clusteredInstanceFactory = null;
247 } else {
248 warn("Current node (" + oldNode.getId() + ") left before rejoin could complete, but previous client is null");
249 }
250
251
252 debug("Interrupting rejoin thread");
253 rejoinWorker.rejoinThread.interrupt();
254 }
255 debug("Going to initiate rejoin");
256
257 rejoinWorker.startRejoin(oldNode);
258 }
259
260 };
261 if (rejoinWorker.isRejoinInProgress()) {
262
263
264
265 rejoinWorker.setForcedShutdown();
266 l1TerminatorThreadPool.execute(rejoinRunnable);
267 } else {
268
269 rejoinRunnable.run();
270 }
271 }
272
273 private boolean isRejoinEnabled() {
274 return terracottaClientConfiguration != null && terracottaClientConfiguration.isRejoin();
275 }
276
277 private void info(String msg) {
278 info(msg, null);
279 }
280
281 private void info(String msg, Throwable t) {
282 if (t == null) {
283 LOGGER.info(getLogPrefix() + msg);
284 } else {
285 LOGGER.info(getLogPrefix() + msg, t);
286 }
287 }
288
289 private String getLogPrefix() {
290 return "Thread [" + Thread.currentThread().getName() + "] [cacheManager: " + getCacheManagerName() + "]: ";
291 }
292
293 private void debug(String msg) {
294 LOGGER.debug(getLogPrefix() + msg);
295 }
296
297 private void warn(String msg) {
298 LOGGER.warn(getLogPrefix() + msg);
299 }
300
301 private String getCacheManagerName() {
302 if (cacheManager.isNamed()) {
303 return "'" + cacheManager.getName() + "'";
304 } else {
305 return "no name";
306 }
307 }
308
309 /***
310 * Private class responsible for carrying out rejoin
311 *
312 * @author Abhishek Sanoujam
313 *
314 */
315 private class RejoinWorker implements Runnable {
316
317 private final Object rejoinSync = new Object();
318 private final RejoinStatus rejoinStatus = new RejoinStatus();
319 private final AtomicInteger rejoinCount = new AtomicInteger();
320 private final RejoinRequestHolder rejoinRequestHolder = new RejoinRequestHolder();
321 private volatile boolean shutdown;
322 private volatile Thread rejoinThread;
323 private volatile boolean forcedShutdown;
324
325 public void run() {
326 rejoinThread = Thread.currentThread();
327 while (!shutdown) {
328 waitUntilRejoinRequested();
329 if (shutdown) {
330 break;
331 }
332 boolean rejoined = false;
333 final RejoinRequest rejoinRequest = rejoinRequestHolder.consume();
334 debug("Going to start rejoin for request: " + rejoinRequest);
335 while (!rejoined) {
336 try {
337 doRejoin(rejoinRequest);
338 rejoined = true;
339 } catch (Exception e) {
340 boolean forced = getAndClearForcedShutdown();
341 if (forced) {
342 info("Client was shutdown forcefully before rejoin completed", e);
343 break;
344 }
345 LOGGER.warn("Caught exception while trying to rejoin cluster", e);
346 info("Trying to rejoin again in " + REJOIN_SLEEP_MILLIS_ON_EXCEPTION + " msecs...");
347 sleep(REJOIN_SLEEP_MILLIS_ON_EXCEPTION);
348 }
349 }
350 }
351 }
352
353 public synchronized boolean getAndClearForcedShutdown() {
354 boolean rv = forcedShutdown;
355 forcedShutdown = false;
356 return rv;
357 }
358
359 public synchronized void setForcedShutdown() {
360 forcedShutdown = true;
361 }
362
363 public boolean isRejoinInProgress() {
364 return rejoinStatus.isRejoinInProgress();
365 }
366
367 private void sleep(long sleepMillis) {
368 try {
369 Thread.sleep(sleepMillis);
370 } catch (InterruptedException e1) {
371
372 }
373 }
374
375 public void shutdown() {
376 synchronized (rejoinSync) {
377 shutdown = true;
378 rejoinSync.notifyAll();
379 }
380 }
381
382 private void doRejoin(RejoinRequest rejoinRequest) {
383 if (rejoinRequest == null) {
384 return;
385 }
386 final ClusterNode oldNodeReference = rejoinRequest.getRejoinOldNode();
387 rejoinStatus.rejoinStarted();
388 if (Thread.currentThread().isInterrupted()) {
389
390 info("Clearing interrupt state of rejoin thread");
391 Thread.currentThread().interrupted();
392 }
393 int rejoinNumber = rejoinCount.incrementAndGet();
394 info("Starting Terracotta Rejoin (as client id: " + (oldNodeReference == null ? "null" : oldNodeReference.getId())
395 + " left the cluster) [rejoin count = " + rejoinNumber + "] ... ");
396 rejoinListener.clusterRejoinStarted();
397 clusteredInstanceFactory = createNewClusteredInstanceFactory(Collections.EMPTY_MAP);
398
399 rejoinListener.clusterRejoinComplete();
400
401 fireClusterRejoinedEvent(oldNodeReference);
402 info("Rejoin Complete [rejoin count = " + rejoinNumber + "]");
403 rejoinStatus.rejoinComplete();
404 }
405
406 private void fireClusterRejoinedEvent(final ClusterNode oldNodeReference) {
407
408 cacheCluster.setUnderlyingCacheCluster(clusteredInstanceFactory.getActualFactory().getTopology());
409
410 final CountDownLatch latch = new CountDownLatch(2);
411 FireRejoinEventListener fireRejoinEventListener = new FireRejoinEventListener(clusteredInstanceFactory.getActualFactory()
412 .getTopology().waitUntilNodeJoinsCluster(), latch);
413 clusteredInstanceFactory.getActualFactory().getTopology().addTopologyListener(fireRejoinEventListener);
414
415 waitUntilLatchOpen(latch);
416 try {
417 cacheCluster.fireNodeRejoinedEvent(oldNodeReference, cacheCluster.getCurrentNode());
418 } catch (Throwable e) {
419 LOGGER.error("Caught exception while firing rejoin event", e);
420 }
421 clusteredInstanceFactory.getActualFactory().getTopology().removeTopologyListener(fireRejoinEventListener);
422 }
423
424 private void waitUntilLatchOpen(CountDownLatch latch) {
425 boolean done = false;
426 do {
427 try {
428 latch.await();
429 done = true;
430 } catch (InterruptedException e) {
431 if (forcedShutdown) {
432 throw new CacheException(e);
433 } else {
434 LOGGER.info("Ignoring interrupted exception while waiting for latch");
435 }
436 }
437 } while (!done);
438 }
439
440 private void waitUntilRejoinRequested() {
441 info("Rejoin worker waiting until rejoin requested...");
442 synchronized (rejoinSync) {
443 while (!rejoinRequestHolder.isRejoinRequested()) {
444 if (shutdown) {
445 break;
446 }
447 try {
448 rejoinSync.wait();
449 } catch (InterruptedException e) {
450
451 }
452 }
453 }
454 }
455
456 public void startRejoin(ClusterNode oldNode) {
457 synchronized (rejoinSync) {
458 rejoinRequestHolder.addRejoinRequest(oldNode);
459 rejoinSync.notifyAll();
460 }
461 }
462
463 private void waitUntilRejoinComplete() {
464 if (rejoinThread == Thread.currentThread()) {
465 return;
466 }
467 if (isRejoinEnabled()) {
468 rejoinStatus.waitUntilRejoinComplete();
469 }
470 }
471 }
472
473 /***
474 * Private class maintaining rejoin requests
475 *
476 * @author Abhishek Sanoujam
477 *
478 */
479 private static class RejoinRequestHolder {
480 private RejoinRequest outstandingRequest;
481
482 public synchronized void addRejoinRequest(ClusterNode oldNode) {
483
484 outstandingRequest = new RejoinRequest(oldNode);
485 }
486
487 public synchronized RejoinRequest consume() {
488 if (outstandingRequest == null) {
489 return null;
490 }
491 RejoinRequest rv = outstandingRequest;
492 outstandingRequest = null;
493 return rv;
494 }
495
496 public synchronized boolean isRejoinRequested() {
497 return outstandingRequest != null;
498 }
499 }
500
501 /***
502 * Private class - Rejoin request bean
503 *
504 * @author Abhishek Sanoujam
505 *
506 */
507 private static class RejoinRequest {
508 private final ClusterNode oldNode;
509
510 public RejoinRequest(ClusterNode oldNode) {
511 this.oldNode = oldNode;
512 }
513
514 public ClusterNode getRejoinOldNode() {
515 return oldNode;
516 }
517
518 @Override
519 public String toString() {
520 return "RejoinRequest [oldNode=" + oldNode.getId() + "]";
521 }
522
523 }
524
525 /***
526 *
527 * A {@link ClusterTopologyListener} that listens for node left event for a node
528 *
529 * @author Abhishek Sanoujam
530 *
531 */
532 private static class NodeLeftListener implements ClusterTopologyListener {
533
534 private final ClusterNode currentNode;
535 private final TerracottaClient client;
536
537 /***
538 * Constructor accepting the client and the node to listen for
539 */
540 public NodeLeftListener(TerracottaClient client, ClusterNode currentNode) {
541 this.client = client;
542 this.currentNode = currentNode;
543 client.info("Registered interest for rejoin, current node: " + currentNode.getId());
544 }
545
546 /***
547 * {@inheritDoc}
548 */
549 public void nodeLeft(ClusterNode node) {
550 client.info("ClusterNode [id=" + node.getId() + "] left the cluster (currentNode=" + currentNode.getId() + ")");
551 if (node.equals(currentNode)) {
552 client.rejoinCluster(node);
553 }
554 }
555
556 /***
557 * {@inheritDoc}
558 */
559 public void clusterOffline(ClusterNode node) {
560 client.info("ClusterNode [id=" + node.getId() + "] went offline (currentNode=" + currentNode.getId() + ")");
561 }
562
563 /***
564 * {@inheritDoc}
565 */
566 public void clusterOnline(ClusterNode node) {
567 client.info("ClusterNode [id=" + node.getId() + "] became online (currentNode=" + currentNode.getId() + ")");
568 }
569
570 /***
571 * {@inheritDoc}
572 */
573 public void nodeJoined(ClusterNode node) {
574 client.info("ClusterNode [id=" + node.getId() + "] joined the cluster (currentNode=" + currentNode.getId() + ")");
575 }
576
577 /***
578 * {@inheritDoc}
579 */
580 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
581 client.info("ClusterNode [id=" + oldNode.getId() + "] rejoined cluster as ClusterNode [id=" + newNode.getId()
582 + "] (currentNode=" + currentNode.getId() + ")");
583 }
584
585 }
586
587 /***
588 * Private class maintaining the rejoin state of the client
589 *
590 * @author Abhishek Sanoujam
591 *
592 */
593 private static class RejoinStatus {
594
595 /***
596 * Rejoin state enum
597 *
598 * @author Abhishek Sanoujam
599 *
600 */
601 enum RejoinState {
602 IN_PROGRESS, NOT_IN_PROGRESS;
603 }
604
605 private volatile RejoinState state = RejoinState.NOT_IN_PROGRESS;
606
607 /***
608 * Returns true if rejoin is in progress
609 *
610 * @return true if rejoin is in progress
611 */
612 public boolean isRejoinInProgress() {
613 return state == RejoinState.IN_PROGRESS;
614 }
615
616 /***
617 * Waits until rejoin is complete if in progress
618 */
619 public synchronized void waitUntilRejoinComplete() {
620 boolean interrupted = false;
621 while (state == RejoinState.IN_PROGRESS) {
622 try {
623 wait();
624 } catch (InterruptedException e) {
625 interrupted = true;
626 }
627 }
628 if (interrupted) {
629 Thread.currentThread().interrupt();
630 }
631 }
632
633 /***
634 * Set the status to rejoin in progress
635 */
636 public synchronized void rejoinStarted() {
637 state = RejoinState.IN_PROGRESS;
638 notifyAll();
639 }
640
641 /***
642 * Set the rejoin status to not in progress
643 */
644 public synchronized void rejoinComplete() {
645 state = RejoinState.NOT_IN_PROGRESS;
646 notifyAll();
647 }
648
649 }
650
651 /***
652 * Event listener that counts down on receiving node join and online event
653 *
654 * @author Abhishek Sanoujam
655 *
656 */
657 private static class FireRejoinEventListener implements ClusterTopologyListener {
658
659 private final CountDownLatch latch;
660 private final ClusterNode currentNode;
661
662 /***
663 * Constructor
664 *
665 * @param clusterNode
666 * @param latch
667 */
668 public FireRejoinEventListener(ClusterNode currentNode, CountDownLatch latch) {
669 this.currentNode = currentNode;
670 this.latch = latch;
671 }
672
673 /***
674 * {@inheritDoc}
675 */
676 public void nodeJoined(ClusterNode node) {
677 if (node.equals(currentNode)) {
678 latch.countDown();
679 }
680 }
681
682 /***
683 * {@inheritDoc}
684 */
685 public void clusterOnline(ClusterNode node) {
686 if (node.equals(currentNode)) {
687 latch.countDown();
688 }
689 }
690
691 /***
692 * {@inheritDoc}
693 */
694 public void nodeLeft(ClusterNode node) {
695
696 }
697
698 /***
699 * {@inheritDoc}
700 */
701 public void clusterOffline(ClusterNode node) {
702
703 }
704
705 /***
706 * {@inheritDoc}
707 */
708 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
709
710 }
711
712 }
713
714 }