View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop;
18  
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.mock;
21  import static org.mockito.Mockito.when;
22  
23  import java.util.Date;
24  
25  import junit.framework.Assert;
26  import junit.framework.TestCase;
27  import net.sf.ehcache.Cache;
28  import net.sf.ehcache.CacheManager;
29  import net.sf.ehcache.Ehcache;
30  import net.sf.ehcache.Element;
31  import net.sf.ehcache.cluster.ClusterScheme;
32  import net.sf.ehcache.terracotta.BasicRejoinTest.ClusterRejoinListener;
33  import net.sf.ehcache.terracotta.ClusteredInstanceFactory;
34  import net.sf.ehcache.terracotta.MockCacheCluster;
35  import net.sf.ehcache.terracotta.TerracottaUnitTesting;
36  import net.sf.ehcache.terracotta.TestRejoinStore;
37  import net.sf.ehcache.terracotta.TestRejoinStore.StoreAction;
38  
39  import org.junit.Test;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  public class TimeoutOnRejoinTest extends TestCase {
44  
45      private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutOnRejoinTest.class);
46      private static final long NON_STOP_TIMEOUT_MILLIS = 3000;
47      private static final long DELTA_MILLIS = 500;
48  
49      @Test
50      public void testTimeoutOnRejoin() throws Exception {
51          final ClusteredInstanceFactory mockFactory = mock(ClusteredInstanceFactory.class);
52          TerracottaUnitTesting.setupTerracottaTesting(mockFactory);
53          TestRejoinStore testRejoinStore = new TestRejoinStore();
54          when(mockFactory.createStore((Ehcache) any())).thenReturn(testRejoinStore);
55  
56          MockCacheCluster mockCacheCluster = new MockCacheCluster();
57          when(mockFactory.getTopology()).thenReturn(mockCacheCluster);
58  
59          CacheManager cacheManager = new CacheManager(CacheManager.class.getResourceAsStream("/rejoin/basic-rejoin-test.xml"));
60          Cache cache = cacheManager.getCache("test");
61          assertNotNull(cache);
62  
63          cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().timeoutMillis(NON_STOP_TIMEOUT_MILLIS);
64  
65          cache.put(new Element("key", "value"));
66  
67          Element element = cache.get("key");
68          assertNotNull(element);
69          assertEquals("value", element.getValue());
70  
71          ClusterRejoinListener rejoinListener = new ClusterRejoinListener();
72          cacheManager.getCluster(ClusterScheme.TERRACOTTA).addTopologyListener(rejoinListener);
73  
74          // lets simulate cluster offline
75          mockCacheCluster.fireClusterOffline();
76          // assert time out happens when offline, immediateTimeout=false
77          cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
78          assertOperationsTimeout(cache, NON_STOP_TIMEOUT_MILLIS, true);
79          // assert time out happens when offline, immediateTimeout=false
80          cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(true);
81          assertOperationsTimeout(cache, 0, true);
82  
83          // lets make the cluster rejoin, but let rejoin not succeed
84          testRejoinStore.setStoreAction(StoreAction.EXCEPTION);
85          mockCacheCluster.fireCurrentNodeLeft();
86  
87          long start = System.currentTimeMillis();
88          cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
89          while (true) {
90              // keep doing for 60 seconds
91              if (System.currentTimeMillis() - start > 60000) {
92                  break;
93              }
94              LOGGER.info("Asserting operations times out with set timeoutMillis (immediateTimeout=false)");
95              assertOperationsTimeout(cache, NON_STOP_TIMEOUT_MILLIS, true);
96          }
97  
98          start = System.currentTimeMillis();
99          cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(true);
100         LOGGER.info("Asserting operations times out with set timeoutMillis (immediateTimeout=true)");
101         while (true) {
102             // keep doing for 4 seconds
103             if (System.currentTimeMillis() - start > 4000) {
104                 break;
105             }
106             assertOperationsTimeout(cache, 0, false);
107         }
108 
109         // now let the rejoin go through
110         testRejoinStore.setStoreAction(StoreAction.NONE);
111 
112         // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
113         // ///////////////////////// READ ME /////////////////////////////////////////////////////////////////////////////////
114         // If this test fails, then probably methods in Store interface that are used in Cache.initialize() should be using
115         // forceExecuteWithExecutor() method in ExecutorServiceStore
116         // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
117         int count = 0;
118         while (true) {
119             if (rejoinListener.getRejoinedCount().get() > 0) {
120                 break;
121             }
122             LOGGER.info("Waiting for rejoin to complete.. sleeping 1 sec, count=" + count);
123             Thread.sleep(1000);
124             if (++count >= 60) {
125                 LOGGER.info(ThreadDump.takeThreadDump());
126                 fail("Rejoin did not happen even after 60 seconds. Something wrong.");
127             }
128         }
129         // assert rejoin event fired
130         assertEquals(1, rejoinListener.getRejoinedCount().get());
131         cache.getCacheConfiguration().getTerracottaConfiguration().getNonstopConfiguration().immediateTimeout(false);
132 
133         System.out.println(new Date() + ": Asserting operations go through");
134         assertOperationsGoThrough(cache);
135         System.out.println(new Date() + ": Test passed successfully");
136     }
137 
138     private void assertOperationsGoThrough(Cache cache) throws Exception {
139         try {
140             Element element;
141             // now gets/puts should go through
142             System.out.println(new Date() + ": Doing get");
143             element = cache.get("key");
144             assertNotNull(element);
145             assertEquals("value", element.getValue());
146 
147             System.out.println(new Date() + ": Doing put");
148             cache.put(new Element("newKey", "newValue"));
149             // assert new key-value
150             element = cache.get("newKey");
151             assertNotNull(element);
152             assertEquals("newValue", element.getValue());
153             System.out.println(new Date() + ": Test Done");
154         } catch (Exception e) {
155             System.out.println(new Date() + ": Test failed");
156             throw e;
157         }
158     }
159 
160     private void assertOperationsTimeout(Cache cache, long expectedTimeoutMillis, boolean log) {
161         // now gets/puts should throw exception as nonstop is configured the default behavior - "exception"
162         long start = System.currentTimeMillis();
163         try {
164             cache.get("key");
165             fail("Get should have thrown exception after cluster went offline");
166         } catch (NonStopCacheException e) {
167             long elapsed = System.currentTimeMillis() - start;
168             if (log) {
169                 LOGGER.info("+++++++ Caught expected exception on get: " + e);
170                 LOGGER.info("+++++++ Elapsed time before getting nonstop cache exception: " + elapsed);
171             }
172             Assert.assertTrue("expected timeout: " + expectedTimeoutMillis + " actual: " + elapsed,
173                     elapsed + DELTA_MILLIS >= expectedTimeoutMillis);
174         }
175         start = System.currentTimeMillis();
176         try {
177             cache.put(new Element("newKey", "newValue"));
178             fail("put should have thrown exception after cluster went offline");
179         } catch (NonStopCacheException e) {
180             long elapsed = System.currentTimeMillis() - start;
181             if (log) {
182                 LOGGER.info("+++++++ Caught expected exception on put: " + e);
183                 LOGGER.info("+++++++ Elapsed time before getting nonstop cache exception: " + elapsed);
184             }
185             Assert.assertTrue("expected timeout: " + expectedTimeoutMillis + " actual: " + elapsed,
186                     elapsed + DELTA_MILLIS >= expectedTimeoutMillis);
187         }
188     }
189 
190 }