View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.terracotta;
18  
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.mock;
21  import static org.mockito.Mockito.when;
22  
23  import java.util.Arrays;
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import junit.framework.Assert;
28  import junit.framework.TestCase;
29  import net.sf.ehcache.Cache;
30  import net.sf.ehcache.CacheManager;
31  import net.sf.ehcache.Ehcache;
32  import net.sf.ehcache.Element;
33  import net.sf.ehcache.cluster.CacheCluster;
34  import net.sf.ehcache.cluster.ClusterNode;
35  import net.sf.ehcache.cluster.ClusterScheme;
36  import net.sf.ehcache.cluster.ClusterTopologyListener;
37  import net.sf.ehcache.config.CacheConfiguration;
38  import net.sf.ehcache.config.InvalidConfigurationException;
39  import net.sf.ehcache.config.TerracottaConfiguration;
40  import net.sf.ehcache.config.TerracottaConfiguration.StorageStrategy;
41  import net.sf.ehcache.constructs.nonstop.NonStopCacheException;
42  import net.sf.ehcache.constructs.nonstop.ThreadDump;
43  import net.sf.ehcache.terracotta.TerracottaClusteredInstanceHelper.TerracottaRuntimeType;
44  import net.sf.ehcache.terracotta.TestRejoinStore.StoreAction;
45  
46  import org.junit.Test;
47  import org.junit.runner.RunWith;
48  import org.mockito.runners.MockitoJUnitRunner;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  @RunWith(MockitoJUnitRunner.class)
53  public class BasicRejoinTest extends TestCase {
54  
55      private static final Logger LOG = LoggerFactory.getLogger(BasicRejoinTest.class);
56  
57      private static final String ERROR_MSG_REJOIN_CUSTOM = "Rejoin cannot be used in Terracotta DSO mode";
58      private static final String ERROR_MSG_REJOIN_NO_NONSTOP = "Terracotta clustered caches must be nonstop when rejoin is enabled";
59      private static final CharSequence ERROR_MSG_REJOIN_NO_TC = "Terracotta Rejoin is enabled but can't determine Terracotta Runtime. "
60              + "You are probably missing Terracotta jar(s)";
61  
62      @Test
63      public void testInvalidRejoinWithoutNonstop() throws Exception {
64          ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
65          TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
66  
67          CacheCluster mockCacheCluster = new MockCacheCluster();
68          when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
69  
70          try {
71              new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/invalid-rejoin-no-nonstop-test.xml"));
72              fail("Trying to run rejoin without nonstop terracotta caches should fail");
73          } catch (InvalidConfigurationException e) {
74              LOG.info("Caught expected exception: " + e);
75              assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_NONSTOP));
76          }
77      }
78  
79      @Test
80      public void testInvalidRejoinInCustom() throws Exception {
81          ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
82          TerracottaUnitTesting.setupTerracottaTesting(mockFactory, TerracottaRuntimeType.Custom, StorageStrategy.CLASSIC);
83  
84          CacheCluster mockCacheCluster = new MockCacheCluster();
85          when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
86  
87          try {
88              new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
89              fail("Running rejoin in custom mode should fail");
90          } catch (InvalidConfigurationException e) {
91              LOG.info("Caught Expected exception: " + e);
92              assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_CUSTOM));
93          }
94      }
95  
96      @Test
97      public void testInvalidRejoinWithoutTerracotta() throws Exception {
98          ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
99          TerracottaUnitTesting.setupTerracottaTesting(mockFactory, (TerracottaRuntimeType) null, StorageStrategy.CLASSIC);
100 
101         CacheCluster mockCacheCluster = new MockCacheCluster();
102         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
103 
104         try {
105             new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
106             fail("Running rejoin without Terracotta should fail");
107         } catch (InvalidConfigurationException e) {
108             LOG.info("Caught Expected exception: " + e);
109             assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_TC));
110         }
111     }
112 
113     @Test
114     public void testAddNoNonstopCache() throws Exception {
115         ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
116         TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
117 
118         CacheCluster mockCacheCluster = new MockCacheCluster();
119         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
120 
121         final String cacheName = "someName";
122         CacheManager cacheManager = null;
123         try {
124             cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
125 
126             CacheConfiguration config = new CacheConfiguration(cacheName, 10);
127             config.addTerracotta(new TerracottaConfiguration().clustered(true));
128 
129             TerracottaConfiguration terracottaConfiguration = config.getTerracottaConfiguration();
130             if (terracottaConfiguration.getNonstopConfiguration() != null) {
131                 terracottaConfiguration.getNonstopConfiguration().enabled(false);
132             }
133 
134             cacheManager.addCache(new Cache(config));
135             fail("Adding Terracotta caches without nonstop should fail");
136         } catch (InvalidConfigurationException e) {
137             LOG.info("Caught Expected exception: " + e);
138             assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_NONSTOP));
139             assertTrue(e.getMessage().contains(cacheName));
140         } finally {
141             if (cacheManager != null) {
142                 cacheManager.shutdown();
143             }
144         }
145     }
146 
147     @Test
148     public void testAddUnclusteredCache() throws Exception {
149         ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
150         TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
151 
152         CacheCluster mockCacheCluster = new MockCacheCluster();
153         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
154 
155         final String cacheName = "someUnclusteredCacheName";
156         CacheManager cacheManager = null;
157         try {
158             cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
159 
160             CacheConfiguration config = new CacheConfiguration(cacheName, 1000);
161             cacheManager.addCache(new Cache(config));
162             List<String> cacheNames = Arrays.asList(cacheManager.getCacheNames());
163             Assert.assertTrue("Adding unclustered cache should not fail", cacheNames.contains(cacheName));
164 
165             Cache cache = cacheManager.getCache(cacheName);
166             Assert.assertFalse("Unclustered cache should have terracottaClustered = false", cache.getCacheConfiguration().isTerracottaClustered());
167             Assert.assertNull("Unclustered cache should have null terracotta config", cache.getCacheConfiguration().getTerracottaConfiguration());
168             for (int i = 0; i < 100; i++) {
169                 cache.put(new Element("key-" + i, "value-" + i));
170             }
171 
172             for (int i = 0; i < 100; i++) {
173                 String key = "key-" + i;
174                 Element element = cache.get(key);
175                 Assert.assertNotNull("Element should not be null for key: " + key, element);
176                 Assert.assertEquals(key, element.getKey());
177                 Assert.assertEquals("value-"  + i, element.getValue());
178             }
179 
180         } finally {
181             if (cacheManager != null) {
182                 cacheManager.shutdown();
183             }
184         }
185     }
186 
187     @Test
188     public void testBasicRejoin() throws Exception {
189         final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
190         final AtomicInteger factoryCreationCount = new AtomicInteger();
191         TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
192             public void run() {
193                 factoryCreationCount.incrementAndGet();
194             }
195         });
196         TestRejoinStore testRejoinStore = new TestRejoinStore();
197         when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
198 
199         MockCacheCluster mockCacheCluster = new MockCacheCluster();
200         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
201 
202         CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
203         assertEquals(1, factoryCreationCount.get());
204         Cache cache = cacheManager.getCache("test");
205         assertNotNull(cache);
206 
207         cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
208 
209         cache.put(new Element("key", "value"));
210 
211         Element element = cache.get("key");
212         assertNotNull(element);
213         assertEquals("value", element.getValue());
214 
215         ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
216         cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
217 
218         // lets simulate cluster offline with blocking behavior on test store
219         testRejoinStore.setBlocking(true);
220 
221         // now gets/puts should throw exception as nonstop is configured the default behavior - "exception"
222         try {
223             cache.get("key");
224             fail("Get should have thrown exception after cluster went offline");
225         } catch (NonStopCacheException e) {
226             LOG.info("Caught expected exception on get: " + e);
227         }
228         try {
229             cache.put(new Element("newKey", "newValue"));
230             fail("put should have thrown exception after cluster went offline");
231         } catch (NonStopCacheException e) {
232             LOG.info("Caught expected exception on put: " + e);
233         }
234 
235         // lets make the cluster rejoin
236         // don't forget to unblock the test store
237         testRejoinStore.setBlocking(false);
238         mockCacheCluster.fireCurrentNodeLeft();
239 
240         int count = 0;
241         while (true) {
242             if (rejoinListener.rejoinedCount.get() > 0) {
243                 break;
244             }
245             LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
246             Thread.sleep(1000);
247             if (++count >= 60) {
248                 LOG.info(ThreadDump.takeThreadDump());
249                 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
250             }
251         }
252         // assert rejoin event fired
253         assertEquals(1, rejoinListener.rejoinedCount.get());
254         // assert new factory created
255         assertEquals(2, factoryCreationCount.get());
256 
257         // now gets/puts should go through
258         element = cache.get("key");
259         assertNotNull(element);
260         assertEquals("value", element.getValue());
261 
262         cache.put(new Element("newKey", "newValue"));
263         // assert new key-value
264         element = cache.get("newKey");
265         assertNotNull(element);
266         assertEquals("newValue", element.getValue());
267     }
268 
269     @Test
270     public void testDisposeCalledOnRejoin() throws Exception {
271         final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
272         final AtomicInteger factoryCreationCount = new AtomicInteger();
273         TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
274             public void run() {
275                 factoryCreationCount.incrementAndGet();
276             }
277         });
278         TestRejoinStore testRejoinStore = new TestRejoinStore();
279         when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
280 
281         MockCacheCluster mockCacheCluster = new MockCacheCluster();
282         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
283 
284         CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
285         assertEquals(1, factoryCreationCount.get());
286         Cache cache = cacheManager.getCache("test");
287         assertNotNull(cache);
288 
289         cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
290 
291         cache.put(new Element("key", "value"));
292 
293         Element element = cache.get("key");
294         assertNotNull(element);
295         assertEquals("value", element.getValue());
296 
297         ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
298         cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
299 
300         // clear all methods called prior to here
301         testRejoinStore.getCalledMethods().clear();
302 
303         // lets make the cluster rejoin
304         mockCacheCluster.fireCurrentNodeLeft();
305 
306         int count = 0;
307         while (true) {
308             if (rejoinListener.rejoinedCount.get() > 0) {
309                 break;
310             }
311             LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
312             Thread.sleep(1000);
313             if (++count >= 60) {
314                 LOG.info(ThreadDump.takeThreadDump());
315                 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
316             }
317         }
318         // assert rejoin event fired
319         assertEquals(1, rejoinListener.rejoinedCount.get());
320         // assert new factory created
321         assertEquals(2, factoryCreationCount.get());
322 
323         LOG.info("Methods called during rejoin: " + testRejoinStore.getCalledMethods());
324         Assert.assertTrue("dispose should have been called on rejoin", testRejoinStore.getCalledMethods().contains("dispose"));
325 
326     }
327 
328     @Test
329     public void testRejoinKeepsTryingOnException() throws Exception {
330         final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
331         final AtomicInteger factoryCreationCount = new AtomicInteger();
332         TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
333             public void run() {
334                 factoryCreationCount.incrementAndGet();
335             }
336         });
337         TestRejoinStore testRejoinStore = new TestRejoinStore();
338         when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
339 
340         MockCacheCluster mockCacheCluster = new MockCacheCluster();
341         when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
342 
343         CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
344         assertEquals(1, factoryCreationCount.get());
345         Cache cache = cacheManager.getCache("test");
346         assertNotNull(cache);
347 
348         cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
349 
350         ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
351         cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
352 
353         // make the store keep throwing exception to fail the rejoin
354         testRejoinStore.setStoreAction(StoreAction.EXCEPTION);
355 
356         // lets make the cluster rejoin
357         mockCacheCluster.fireCurrentNodeLeft();
358 
359         int initialCalledMethodsSize = testRejoinStore.getCalledMethods().size();
360         int calledMethodsSize = 0;
361         int count = 0;
362         while (true) {
363             calledMethodsSize += testRejoinStore.getCalledMethods().size();
364             testRejoinStore.clearCalledMethods();
365             if (calledMethodsSize - initialCalledMethodsSize > 15) {
366                 // rejoin has been retrying, so number of called methods increasing
367                 break;
368             }
369             if (rejoinListener.rejoinedCount.get() > 0) {
370                 break;
371             }
372             LOG.info("Waiting for rejoin to complete.. sleeping 3 sec, count=" + count);
373             Thread.sleep(3000);
374             if (++count >= 20) {
375                 LOG.info(ThreadDump.takeThreadDump());
376                 fail("Shouldn't take 60 seconds for multiple rejoin tries");
377             }
378         }
379         LOG.info("calledMethodSize: " + calledMethodsSize + " initial:" + initialCalledMethodsSize);
380         assertTrue("Rejoin should have been retrying on getting exception ", calledMethodsSize > initialCalledMethodsSize);
381 
382         // now lets make the rejoin happen
383         testRejoinStore.setStoreAction(StoreAction.NONE);
384         count = 0;
385         while (true) {
386             if (rejoinListener.rejoinedCount.get() > 0) {
387                 break;
388             }
389             LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
390             Thread.sleep(1000);
391             if (++count >= 60) {
392                 LOG.info(ThreadDump.takeThreadDump());
393                 fail("Rejoin should have happened withing 60 seconds. Something wrong");
394             }
395         }
396 
397         // assert rejoin event fired
398         assertEquals(1, rejoinListener.rejoinedCount.get());
399 
400         LOG.info("Methods called during rejoin: " + testRejoinStore.getCalledMethods());
401         Assert.assertTrue("dispose should have been called on rejoin", testRejoinStore.getCalledMethods().contains("dispose"));
402 
403     }
404 
405     public static class ClusterRejoinListener implements ClusterTopologyListener {
406         private final AtomicInteger rejoinedCount = new AtomicInteger();
407 
408         public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
409             LOG.info("========= Got cluster rejoined event: oldNode=" + printNode(oldNode) + " newNode:" + printNode(newNode));
410             rejoinedCount.incrementAndGet();
411         }
412 
413         public AtomicInteger getRejoinedCount() {
414             return rejoinedCount;
415         }
416 
417         private String printNode(ClusterNode node) {
418             return "[ClusterNode: id=" + node.getId() + ", hostname=" + node.getHostname() + ", ip=" + node.getIp() + "]";
419         }
420 
421         public void clusterOffline(ClusterNode node) {
422             LOG.info("========= Got OFFLINE event: node=" + printNode(node));
423         }
424 
425         public void clusterOnline(ClusterNode node) {
426             LOG.info("========= Got ONLINE event: node=" + printNode(node));
427         }
428 
429         public void nodeJoined(ClusterNode node) {
430             LOG.info("========= Got NODE_JOINED event: node=" + printNode(node));
431         }
432 
433         public void nodeLeft(ClusterNode node) {
434             LOG.info("========= Got NODE_LEFT event: node=" + printNode(node));
435         }
436 
437     }
438 
439 }