1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.terracotta;
18
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.when;
22
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import junit.framework.Assert;
28 import junit.framework.TestCase;
29 import net.sf.ehcache.Cache;
30 import net.sf.ehcache.CacheManager;
31 import net.sf.ehcache.Ehcache;
32 import net.sf.ehcache.Element;
33 import net.sf.ehcache.cluster.CacheCluster;
34 import net.sf.ehcache.cluster.ClusterNode;
35 import net.sf.ehcache.cluster.ClusterScheme;
36 import net.sf.ehcache.cluster.ClusterTopologyListener;
37 import net.sf.ehcache.config.CacheConfiguration;
38 import net.sf.ehcache.config.InvalidConfigurationException;
39 import net.sf.ehcache.config.TerracottaConfiguration;
40 import net.sf.ehcache.config.TerracottaConfiguration.StorageStrategy;
41 import net.sf.ehcache.constructs.nonstop.NonStopCacheException;
42 import net.sf.ehcache.constructs.nonstop.ThreadDump;
43 import net.sf.ehcache.terracotta.TerracottaClusteredInstanceHelper.TerracottaRuntimeType;
44 import net.sf.ehcache.terracotta.TestRejoinStore.StoreAction;
45
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.runners.MockitoJUnitRunner;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 @RunWith(MockitoJUnitRunner.class)
53 public class BasicRejoinTest extends TestCase {
54
55 private static final Logger LOG = LoggerFactory.getLogger(BasicRejoinTest.class);
56
57 private static final String ERROR_MSG_REJOIN_CUSTOM = "Rejoin cannot be used in Terracotta DSO mode";
58 private static final String ERROR_MSG_REJOIN_NO_NONSTOP = "Terracotta clustered caches must be nonstop when rejoin is enabled";
59 private static final CharSequence ERROR_MSG_REJOIN_NO_TC = "Terracotta Rejoin is enabled but can't determine Terracotta Runtime. "
60 + "You are probably missing Terracotta jar(s)";
61
62 @Test
63 public void testInvalidRejoinWithoutNonstop() throws Exception {
64 ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
65 TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
66
67 CacheCluster mockCacheCluster = new MockCacheCluster();
68 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
69
70 try {
71 new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/invalid-rejoin-no-nonstop-test.xml"));
72 fail("Trying to run rejoin without nonstop terracotta caches should fail");
73 } catch (InvalidConfigurationException e) {
74 LOG.info("Caught expected exception: " + e);
75 assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_NONSTOP));
76 }
77 }
78
79 @Test
80 public void testInvalidRejoinInCustom() throws Exception {
81 ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
82 TerracottaUnitTesting.setupTerracottaTesting(mockFactory, TerracottaRuntimeType.Custom, StorageStrategy.CLASSIC);
83
84 CacheCluster mockCacheCluster = new MockCacheCluster();
85 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
86
87 try {
88 new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
89 fail("Running rejoin in custom mode should fail");
90 } catch (InvalidConfigurationException e) {
91 LOG.info("Caught Expected exception: " + e);
92 assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_CUSTOM));
93 }
94 }
95
96 @Test
97 public void testInvalidRejoinWithoutTerracotta() throws Exception {
98 ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
99 TerracottaUnitTesting.setupTerracottaTesting(mockFactory, (TerracottaRuntimeType) null, StorageStrategy.CLASSIC);
100
101 CacheCluster mockCacheCluster = new MockCacheCluster();
102 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
103
104 try {
105 new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
106 fail("Running rejoin without Terracotta should fail");
107 } catch (InvalidConfigurationException e) {
108 LOG.info("Caught Expected exception: " + e);
109 assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_TC));
110 }
111 }
112
113 @Test
114 public void testAddNoNonstopCache() throws Exception {
115 ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
116 TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
117
118 CacheCluster mockCacheCluster = new MockCacheCluster();
119 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
120
121 final String cacheName = "someName";
122 CacheManager cacheManager = null;
123 try {
124 cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
125
126 CacheConfiguration config = new CacheConfiguration(cacheName, 10);
127 config.addTerracotta(new TerracottaConfiguration().clustered(true));
128
129 TerracottaConfiguration terracottaConfiguration = config.getTerracottaConfiguration();
130 if (terracottaConfiguration.getNonstopConfiguration() != null) {
131 terracottaConfiguration.getNonstopConfiguration().enabled(false);
132 }
133
134 cacheManager.addCache(new Cache(config));
135 fail("Adding Terracotta caches without nonstop should fail");
136 } catch (InvalidConfigurationException e) {
137 LOG.info("Caught Expected exception: " + e);
138 assertTrue(e.getMessage().contains(ERROR_MSG_REJOIN_NO_NONSTOP));
139 assertTrue(e.getMessage().contains(cacheName));
140 } finally {
141 if (cacheManager != null) {
142 cacheManager.shutdown();
143 }
144 }
145 }
146
147 @Test
148 public void testAddUnclusteredCache() throws Exception {
149 ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
150 TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
151
152 CacheCluster mockCacheCluster = new MockCacheCluster();
153 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
154
155 final String cacheName = "someUnclusteredCacheName";
156 CacheManager cacheManager = null;
157 try {
158 cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
159
160 CacheConfiguration config = new CacheConfiguration(cacheName, 1000);
161 cacheManager.addCache(new Cache(config));
162 List<String> cacheNames = Arrays.asList(cacheManager.getCacheNames());
163 Assert.assertTrue("Adding unclustered cache should not fail", cacheNames.contains(cacheName));
164
165 Cache cache = cacheManager.getCache(cacheName);
166 Assert.assertFalse("Unclustered cache should have terracottaClustered = false", cache.getCacheConfiguration().isTerracottaClustered());
167 Assert.assertNull("Unclustered cache should have null terracotta config", cache.getCacheConfiguration().getTerracottaConfiguration());
168 for (int i = 0; i < 100; i++) {
169 cache.put(new Element("key-" + i, "value-" + i));
170 }
171
172 for (int i = 0; i < 100; i++) {
173 String key = "key-" + i;
174 Element element = cache.get(key);
175 Assert.assertNotNull("Element should not be null for key: " + key, element);
176 Assert.assertEquals(key, element.getKey());
177 Assert.assertEquals("value-" + i, element.getValue());
178 }
179
180 } finally {
181 if (cacheManager != null) {
182 cacheManager.shutdown();
183 }
184 }
185 }
186
187 @Test
188 public void testBasicRejoin() throws Exception {
189 final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
190 final AtomicInteger factoryCreationCount = new AtomicInteger();
191 TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
192 public void run() {
193 factoryCreationCount.incrementAndGet();
194 }
195 });
196 TestRejoinStore testRejoinStore = new TestRejoinStore();
197 when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
198
199 MockCacheCluster mockCacheCluster = new MockCacheCluster();
200 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
201
202 CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
203 assertEquals(1, factoryCreationCount.get());
204 Cache cache = cacheManager.getCache("test");
205 assertNotNull(cache);
206
207 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
208
209 cache.put(new Element("key", "value"));
210
211 Element element = cache.get("key");
212 assertNotNull(element);
213 assertEquals("value", element.getValue());
214
215 ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
216 cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
217
218
219 testRejoinStore.setBlocking(true);
220
221
222 try {
223 cache.get("key");
224 fail("Get should have thrown exception after cluster went offline");
225 } catch (NonStopCacheException e) {
226 LOG.info("Caught expected exception on get: " + e);
227 }
228 try {
229 cache.put(new Element("newKey", "newValue"));
230 fail("put should have thrown exception after cluster went offline");
231 } catch (NonStopCacheException e) {
232 LOG.info("Caught expected exception on put: " + e);
233 }
234
235
236
237 testRejoinStore.setBlocking(false);
238 mockCacheCluster.fireCurrentNodeLeft();
239
240 int count = 0;
241 while (true) {
242 if (rejoinListener.rejoinedCount.get() > 0) {
243 break;
244 }
245 LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
246 Thread.sleep(1000);
247 if (++count >= 60) {
248 LOG.info(ThreadDump.takeThreadDump());
249 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
250 }
251 }
252
253 assertEquals(1, rejoinListener.rejoinedCount.get());
254
255 assertEquals(2, factoryCreationCount.get());
256
257
258 element = cache.get("key");
259 assertNotNull(element);
260 assertEquals("value", element.getValue());
261
262 cache.put(new Element("newKey", "newValue"));
263
264 element = cache.get("newKey");
265 assertNotNull(element);
266 assertEquals("newValue", element.getValue());
267 }
268
269 @Test
270 public void testDisposeCalledOnRejoin() throws Exception {
271 final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
272 final AtomicInteger factoryCreationCount = new AtomicInteger();
273 TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
274 public void run() {
275 factoryCreationCount.incrementAndGet();
276 }
277 });
278 TestRejoinStore testRejoinStore = new TestRejoinStore();
279 when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
280
281 MockCacheCluster mockCacheCluster = new MockCacheCluster();
282 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
283
284 CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
285 assertEquals(1, factoryCreationCount.get());
286 Cache cache = cacheManager.getCache("test");
287 assertNotNull(cache);
288
289 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
290
291 cache.put(new Element("key", "value"));
292
293 Element element = cache.get("key");
294 assertNotNull(element);
295 assertEquals("value", element.getValue());
296
297 ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
298 cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
299
300
301 testRejoinStore.getCalledMethods().clear();
302
303
304 mockCacheCluster.fireCurrentNodeLeft();
305
306 int count = 0;
307 while (true) {
308 if (rejoinListener.rejoinedCount.get() > 0) {
309 break;
310 }
311 LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
312 Thread.sleep(1000);
313 if (++count >= 60) {
314 LOG.info(ThreadDump.takeThreadDump());
315 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
316 }
317 }
318
319 assertEquals(1, rejoinListener.rejoinedCount.get());
320
321 assertEquals(2, factoryCreationCount.get());
322
323 LOG.info("Methods called during rejoin: " + testRejoinStore.getCalledMethods());
324 Assert.assertTrue("dispose should have been called on rejoin", testRejoinStore.getCalledMethods().contains("dispose"));
325
326 }
327
328 @Test
329 public void testRejoinKeepsTryingOnException() throws Exception {
330 final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
331 final AtomicInteger factoryCreationCount = new AtomicInteger();
332 TerracottaUnitTesting.setupTerracottaTesting(mockFactory, new Runnable() {
333 public void run() {
334 factoryCreationCount.incrementAndGet();
335 }
336 });
337 TestRejoinStore testRejoinStore = new TestRejoinStore();
338 when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
339
340 MockCacheCluster mockCacheCluster = new MockCacheCluster();
341 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
342
343 CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
344 assertEquals(1, factoryCreationCount.get());
345 Cache cache = cacheManager.getCache("test");
346 assertNotNull(cache);
347
348 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(2000);
349
350 ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
351 cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
352
353
354 testRejoinStore.setStoreAction(StoreAction.EXCEPTION);
355
356
357 mockCacheCluster.fireCurrentNodeLeft();
358
359 int initialCalledMethodsSize = testRejoinStore.getCalledMethods().size();
360 int calledMethodsSize = 0;
361 int count = 0;
362 while (true) {
363 calledMethodsSize += testRejoinStore.getCalledMethods().size();
364 testRejoinStore.clearCalledMethods();
365 if (calledMethodsSize - initialCalledMethodsSize > 15) {
366
367 break;
368 }
369 if (rejoinListener.rejoinedCount.get() > 0) {
370 break;
371 }
372 LOG.info("Waiting for rejoin to complete.. sleeping 3 sec, count=" + count);
373 Thread.sleep(3000);
374 if (++count >= 20) {
375 LOG.info(ThreadDump.takeThreadDump());
376 fail("Shouldn't take 60 seconds for multiple rejoin tries");
377 }
378 }
379 LOG.info("calledMethodSize: " + calledMethodsSize + " initial:" + initialCalledMethodsSize);
380 assertTrue("Rejoin should have been retrying on getting exception ", calledMethodsSize > initialCalledMethodsSize);
381
382
383 testRejoinStore.setStoreAction(StoreAction.NONE);
384 count = 0;
385 while (true) {
386 if (rejoinListener.rejoinedCount.get() > 0) {
387 break;
388 }
389 LOG.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
390 Thread.sleep(1000);
391 if (++count >= 60) {
392 LOG.info(ThreadDump.takeThreadDump());
393 fail("Rejoin should have happened withing 60 seconds. Something wrong");
394 }
395 }
396
397
398 assertEquals(1, rejoinListener.rejoinedCount.get());
399
400 LOG.info("Methods called during rejoin: " + testRejoinStore.getCalledMethods());
401 Assert.assertTrue("dispose should have been called on rejoin", testRejoinStore.getCalledMethods().contains("dispose"));
402
403 }
404
405 public static class ClusterRejoinListener implements ClusterTopologyListener {
406 private final AtomicInteger rejoinedCount = new AtomicInteger();
407
408 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
409 LOG.info("========= Got cluster rejoined event: oldNode=" + printNode(oldNode) + " newNode:" + printNode(newNode));
410 rejoinedCount.incrementAndGet();
411 }
412
413 public AtomicInteger getRejoinedCount() {
414 return rejoinedCount;
415 }
416
417 private String printNode(ClusterNode node) {
418 return "[ClusterNode: id=" + node.getId() + ", hostname=" + node.getHostname() + ", ip=" + node.getIp() + "]";
419 }
420
421 public void clusterOffline(ClusterNode node) {
422 LOG.info("========= Got OFFLINE event: node=" + printNode(node));
423 }
424
425 public void clusterOnline(ClusterNode node) {
426 LOG.info("========= Got ONLINE event: node=" + printNode(node));
427 }
428
429 public void nodeJoined(ClusterNode node) {
430 LOG.info("========= Got NODE_JOINED event: node=" + printNode(node));
431 }
432
433 public void nodeLeft(ClusterNode node) {
434 LOG.info("========= Got NODE_LEFT event: node=" + printNode(node));
435 }
436
437 }
438
439 }