1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop;
18
19 import static org.mockito.Matchers.any;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.when;
22
23 import java.util.Date;
24
25 import junit.framework.Assert;
26 import junit.framework.TestCase;
27 import net.sf.ehcache.Cache;
28 import net.sf.ehcache.CacheManager;
29 import net.sf.ehcache.Ehcache;
30 import net.sf.ehcache.Element;
31 import net.sf.ehcache.cluster.ClusterScheme;
32 import net.sf.ehcache.terracotta.BasicRejoinTest.ClusterRejoinListener;
33 import net.sf.ehcache.terracotta.ClusteredInstanceFactory;
34 import net.sf.ehcache.terracotta.MockCacheCluster;
35 import net.sf.ehcache.terracotta.TerracottaUnitTesting;
36 import net.sf.ehcache.terracotta.TestRejoinStore;
37 import net.sf.ehcache.terracotta.TestRejoinStore.StoreAction;
38
39 import org.junit.Test;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class TimeoutOnRejoinTest extends TestCase {
44
45 private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutOnRejoinTest.class);
46 private static final long NON_STOP_TIMEOUT_MILLIS = 3000;
47 private static final long DELTA_MILLIS = 500;
48
49 @Test
50 public void testTimeoutOnRejoin() throws Exception {
51 final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
52 TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
53 TestRejoinStore testRejoinStore = new TestRejoinStore();
54 when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
55
56 MockCacheCluster mockCacheCluster = new MockCacheCluster();
57 when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
58
59 CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
60 Cache cache = cacheManager.getCache("test");
61 assertNotNull(cache);
62
63 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(NON_STOP_TIMEOUT_MILLIS);
64
65 cache.put(new Element("key", "value"));
66
67 Element element = cache.get("key");
68 assertNotNull(element);
69 assertEquals("value", element.getValue());
70
71 ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
72 cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
73
74
75 mockCacheCluster.fireClusterOffline();
76
77 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
78 assertOperationsTimeout(cache, NON_STOP_TIMEOUT_MILLIS, true);
79
80 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(true);
81 assertOperationsTimeout(cache, 0, true);
82
83
84 testRejoinStore.setStoreAction(StoreAction.EXCEPTION);
85 mockCacheCluster.fireCurrentNodeLeft();
86
87 long start = System.currentTimeMillis();
88 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
89 while (true) {
90
91 if (System.currentTimeMillis() - start > 60000) {
92 break;
93 }
94 LOGGER.info("Asserting operations times out with set timeoutMillis (immediateTimeout=false)");
95 assertOperationsTimeout(cache, NON_STOP_TIMEOUT_MILLIS, true);
96 }
97
98 start = System.currentTimeMillis();
99 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(true);
100 LOGGER.info("Asserting operations times out with set timeoutMillis (immediateTimeout=true)");
101 while (true) {
102
103 if (System.currentTimeMillis() - start > 4000) {
104 break;
105 }
106 assertOperationsTimeout(cache, 0, false);
107 }
108
109
110 testRejoinStore.setStoreAction(StoreAction.NONE);
111
112
113
114
115
116
117 int count = 0;
118 while (true) {
119 if (rejoinListener.getRejoinedCount().get() > 0) {
120 break;
121 }
122 LOGGER.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
123 Thread.sleep(1000);
124 if (++count >= 60) {
125 LOGGER.info(ThreadDump.takeThreadDump());
126 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
127 }
128 }
129
130 assertEquals(1, rejoinListener.getRejoinedCount().get());
131 cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
132
133 System.out.println(new Date() + ": Asserting operations go through");
134 assertOperationsGoThrough(cache);
135 System.out.println(new Date() + ": Test passed successfully");
136 }
137
138 private void assertOperationsGoThrough(Cache cache) throws Exception {
139 try {
140 Element element;
141
142 System.out.println(new Date() + ": Doing get");
143 element = cache.get("key");
144 assertNotNull(element);
145 assertEquals("value", element.getValue());
146
147 System.out.println(new Date() + ": Doing put");
148 cache.put(new Element("newKey", "newValue"));
149
150 element = cache.get("newKey");
151 assertNotNull(element);
152 assertEquals("newValue", element.getValue());
153 System.out.println(new Date() + ": Test Done");
154 } catch (Exception e) {
155 System.out.println(new Date() + ": Test failed");
156 throw e;
157 }
158 }
159
160 private void assertOperationsTimeout(Cache cache, long expectedTimeoutMillis, boolean log) {
161
162 long start = System.currentTimeMillis();
163 try {
164 cache.get("key");
165 fail("Get should have thrown exception after cluster went offline");
166 } catch (NonStopCacheException e) {
167 long elapsed = System.currentTimeMillis() - start;
168 if (log) {
169 LOGGER.info("+++++++ Caught expected exception on get: " + e);
170 LOGGER.info("+++++++ Elapsed time before getting nonstop cache exception: " + elapsed);
171 }
172 Assert.assertTrue("expected timeout: " + expectedTimeoutMillis + " actual: " + elapsed,
173 elapsed + DELTA_MILLIS >= expectedTimeoutMillis);
174 }
175 start = System.currentTimeMillis();
176 try {
177 cache.put(new Element("newKey", "newValue"));
178 fail("put should have thrown exception after cluster went offline");
179 } catch (NonStopCacheException e) {
180 long elapsed = System.currentTimeMillis() - start;
181 if (log) {
182 LOGGER.info("+++++++ Caught expected exception on put: " + e);
183 LOGGER.info("+++++++ Elapsed time before getting nonstop cache exception: " + elapsed);
184 }
185 Assert.assertTrue("expected timeout: " + expectedTimeoutMillis + " actual: " + elapsed,
186 elapsed + DELTA_MILLIS >= expectedTimeoutMillis);
187 }
188 }
189
190 }