View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import static net.sf.ehcache.util.RetryAssert.assertBy;
20  import static org.hamcrest.core.Is.is;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.fail;
25  
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Set;
29  import java.util.concurrent.Callable;
30  import java.util.concurrent.TimeUnit;
31  import java.util.logging.Level;
32  import java.util.logging.Logger;
33  
34  import junit.framework.Assert;
35  import net.sf.ehcache.AbstractCacheTest;
36  import net.sf.ehcache.CacheException;
37  import net.sf.ehcache.CacheManager;
38  import net.sf.ehcache.Element;
39  import net.sf.ehcache.event.CountingCacheEventListener;
40  import net.sf.ehcache.util.RetryAssert;
41  
42  import org.hamcrest.collection.IsEmptyCollection;
43  import org.junit.After;
44  import org.junit.Before;
45  import org.junit.Test;
46  
47  /***
48   * Tests replication of Cache events with large payloads exceeding MTU
49   * <p/>
50   * Note these tests need a live network interface running in multicast mode to work
51   * <p/>
52   *
53   * @author Abhishek Sanoujam
54   */
55  public class RMICacheReplicatorWithLargePayloadTest extends AbstractRMITest {
56  
57      private static final Logger LOG = Logger.getLogger(RMICacheReplicatorWithLargePayloadTest.class.getName());
58  
59      private static int MB = 1024 * 1024;
60  
61      /***
62       * CacheManager 1 in the cluster
63       */
64      protected CacheManager manager1;
65      /***
66       * CacheManager 2 in the cluster
67       */
68      protected CacheManager manager2;
69      /***
70       * CacheManager 3 in the cluster
71       */
72      protected CacheManager manager3;
73  
74      /***
75       * {@inheritDoc} Sets up two caches: cache1 is local. cache2 is to be receive updates
76       *
77       * @throws Exception
78       */
79      @Before
80      public void setUp() throws Exception {
81          failFastInsufficientMemory();
82  
83          MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
84  
85          CountingCacheEventListener.resetCounters();
86          manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-1.xml");
87          manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-2.xml");
88          manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-3.xml");
89          // allow cluster to be established
90          waitForClusterMembership(10, TimeUnit.SECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2, manager3);
91      }
92  
93      private void failFastInsufficientMemory() {
94          // fail fast if running with insufficient heap
95          long totalMemory = Runtime.getRuntime().totalMemory();
96          if (totalMemory < 200 * MB) {
97              String msg = "Insufficient heap (approx. " + (totalMemory / MB) + " MB detected), this test requires at least 256 MB to run.\n";
98              msg += "Steps to take:\n";
99              msg += "   1) If you are running with eclipse: specify \"-Xms256m -Xmx256m\" as VM arguments in the \"Run Confuguration\" for this test\n";
100             msg += "   2) If you are running using mvn with \"mvn test -Dtest=" + this.getClass().getSimpleName()
101                     + "\", add this in the command line: -DargLine=\"-Xms256m -Xmx256m\"\n";
102             msg += "      Run the test like: mvn test -Dtest=" + this.getClass().getSimpleName() + " -DargLine=\"-Xms256m -Xmx256m\"";
103             LOG.log(Level.WARNING, msg);
104             fail(msg);
105         }
106     }
107 
108     /***
109      * {@inheritDoc}
110      *
111      * @throws Exception
112      */
113     @After
114     public void tearDown() throws Exception {
115 
116         if (manager1 != null) {
117             manager1.shutdown();
118         }
119         if (manager2 != null) {
120             manager2.shutdown();
121         }
122         if (manager3 != null) {
123             manager3.shutdown();
124         }
125 
126         RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Set<Thread>>() {
127             public Set<Thread> call() throws Exception {
128                 return getActiveReplicationThreads();
129             }
130         }, IsEmptyCollection.<Thread>empty());
131     }
132 
133     @Test
134     public void testAssertBigPayload() {
135         List<CachePeer> localPeers = manager1.getCachePeerListener("RMI").getBoundCachePeers();
136         List<byte[]> payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
137         Assert.assertTrue("Payload is not big enough for cacheManager-1", payloadList.size() > 1);
138 
139         localPeers = manager2.getCachePeerListener("RMI").getBoundCachePeers();
140         payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
141         Assert.assertTrue("Payload is not big enough for cacheManager-2", payloadList.size() > 1);
142 
143         localPeers = manager3.getCachePeerListener("RMI").getBoundCachePeers();
144         payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
145         Assert.assertTrue("Payload is not big enough for cacheManager-3", payloadList.size() > 1);
146 
147         CacheManager manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-4.xml");
148         try {
149             localPeers = manager4.getCachePeerListener("RMI").getBoundCachePeers();
150             payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
151             Assert.assertTrue("Payload is not big enough for cacheManager-4", payloadList.size() > 1);
152         } finally {
153             manager4.shutdown();
154         }
155     }
156 
157     /***
158      * Does a new cache manager in the cluster get detected?
159      */
160     @Test
161     public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
162         // Add new CacheManager to cluster
163         CacheManager manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-4.xml");
164         try {
165             // Allow detection to occur
166             waitForClusterMembership(10020, TimeUnit.MILLISECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2, manager3, manager4);
167         } finally {
168             manager4.shutdown();
169         }
170     }
171 
172     /***
173      * Does a down cache manager in the cluster get removed?
174      */
175     @Test
176     public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
177         // Drop a CacheManager from the cluster
178         manager3.shutdown();
179         // Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
180         waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2);
181     }
182 
183     /***
184      * Does a down cache manager in the cluster get removed?
185      */
186     @Test
187     public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
188         MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
189         try {
190             Thread.sleep(2000);
191             // Drop a CacheManager from the cluster
192             manager3.shutdown();
193 
194             // Insufficient time, should be alive till now
195             CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
196             for (String cacheName : manager1.getCacheNames()) {
197                 List remotePeersOfCache1 = provider.listRemoteCachePeers(manager1.getCache(cacheName));
198                 assertEquals(2, remotePeersOfCache1.size());
199             }
200         } finally {
201             MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
202             Thread.sleep(2000);
203         }
204     }
205 
206     /***
207      * Tests put and remove initiated from cache1 in a cluster
208      * <p/>
209      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
210      */
211     @Test
212     public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
213 
214         // Put
215         final String[] cacheNames = manager1.getCacheNames();
216         Arrays.sort(cacheNames);
217         for (int i = 0; i < cacheNames.length; i++) {
218             String name = cacheNames[i];
219             manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
220             // Add some non serializable elements that should not get propagated
221             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
222         }
223 
224         assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
225 
226             public Boolean call() throws Exception {
227                 for (int i = 0; i < cacheNames.length; i++) {
228                     String name = cacheNames[i];
229                     for (CacheManager manager : new CacheManager[] {manager2, manager3}) {
230                         Element element = manager.getCache(name).get(Integer.toString(i));
231                         assertNotNull("Cache : " + name, element);
232                         assertEquals(Integer.toString(i), element.getKey());
233                         assertEquals(Integer.valueOf(i), element.getValue());
234 
235                         assertNull(manager.getCache(name).get("nonSerializable" + i));
236                     }
237                 }
238                 return Boolean.TRUE;
239             }
240         }, is(Boolean.TRUE));
241     }
242 }