1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import static net.sf.ehcache.util.RetryAssert.assertBy;
20 import static org.hamcrest.core.Is.is;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.fail;
25
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.TimeUnit;
31 import java.util.logging.Level;
32 import java.util.logging.Logger;
33
34 import junit.framework.Assert;
35 import net.sf.ehcache.AbstractCacheTest;
36 import net.sf.ehcache.CacheException;
37 import net.sf.ehcache.CacheManager;
38 import net.sf.ehcache.Element;
39 import net.sf.ehcache.event.CountingCacheEventListener;
40 import net.sf.ehcache.util.RetryAssert;
41
42 import org.hamcrest.collection.IsEmptyCollection;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46
47 /***
48 * Tests replication of Cache events with large payloads exceeding MTU
49 * <p/>
50 * Note these tests need a live network interface running in multicast mode to work
51 * <p/>
52 *
53 * @author Abhishek Sanoujam
54 */
55 public class RMICacheReplicatorWithLargePayloadTest extends AbstractRMITest {
56
57 private static final Logger LOG = Logger.getLogger(RMICacheReplicatorWithLargePayloadTest.class.getName());
58
59 private static int MB = 1024 * 1024;
60
61 /***
62 * CacheManager 1 in the cluster
63 */
64 protected CacheManager manager1;
65 /***
66 * CacheManager 2 in the cluster
67 */
68 protected CacheManager manager2;
69 /***
70 * CacheManager 3 in the cluster
71 */
72 protected CacheManager manager3;
73
74 /***
75 * {@inheritDoc} Sets up two caches: cache1 is local. cache2 is to be receive updates
76 *
77 * @throws Exception
78 */
79 @Before
80 public void setUp() throws Exception {
81 failFastInsufficientMemory();
82
83 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
84
85 CountingCacheEventListener.resetCounters();
86 manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-1.xml");
87 manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-2.xml");
88 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-3.xml");
89
90 waitForClusterMembership(10, TimeUnit.SECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2, manager3);
91 }
92
93 private void failFastInsufficientMemory() {
94
95 long totalMemory = Runtime.getRuntime().totalMemory();
96 if (totalMemory < 200 * MB) {
97 String msg = "Insufficient heap (approx. " + (totalMemory / MB) + " MB detected), this test requires at least 256 MB to run.\n";
98 msg += "Steps to take:\n";
99 msg += " 1) If you are running with eclipse: specify \"-Xms256m -Xmx256m\" as VM arguments in the \"Run Confuguration\" for this test\n";
100 msg += " 2) If you are running using mvn with \"mvn test -Dtest=" + this.getClass().getSimpleName()
101 + "\", add this in the command line: -DargLine=\"-Xms256m -Xmx256m\"\n";
102 msg += " Run the test like: mvn test -Dtest=" + this.getClass().getSimpleName() + " -DargLine=\"-Xms256m -Xmx256m\"";
103 LOG.log(Level.WARNING, msg);
104 fail(msg);
105 }
106 }
107
108 /***
109 * {@inheritDoc}
110 *
111 * @throws Exception
112 */
113 @After
114 public void tearDown() throws Exception {
115
116 if (manager1 != null) {
117 manager1.shutdown();
118 }
119 if (manager2 != null) {
120 manager2.shutdown();
121 }
122 if (manager3 != null) {
123 manager3.shutdown();
124 }
125
126 RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Set<Thread>>() {
127 public Set<Thread> call() throws Exception {
128 return getActiveReplicationThreads();
129 }
130 }, IsEmptyCollection.<Thread>empty());
131 }
132
133 @Test
134 public void testAssertBigPayload() {
135 List<CachePeer> localPeers = manager1.getCachePeerListener("RMI").getBoundCachePeers();
136 List<byte[]> payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
137 Assert.assertTrue("Payload is not big enough for cacheManager-1", payloadList.size() > 1);
138
139 localPeers = manager2.getCachePeerListener("RMI").getBoundCachePeers();
140 payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
141 Assert.assertTrue("Payload is not big enough for cacheManager-2", payloadList.size() > 1);
142
143 localPeers = manager3.getCachePeerListener("RMI").getBoundCachePeers();
144 payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
145 Assert.assertTrue("Payload is not big enough for cacheManager-3", payloadList.size() > 1);
146
147 CacheManager manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-4.xml");
148 try {
149 localPeers = manager4.getCachePeerListener("RMI").getBoundCachePeers();
150 payloadList = PayloadUtil.createCompressedPayloadList(localPeers, 150);
151 Assert.assertTrue("Payload is not big enough for cacheManager-4", payloadList.size() > 1);
152 } finally {
153 manager4.shutdown();
154 }
155 }
156
157 /***
158 * Does a new cache manager in the cluster get detected?
159 */
160 @Test
161 public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
162
163 CacheManager manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-big-payload-4.xml");
164 try {
165
166 waitForClusterMembership(10020, TimeUnit.MILLISECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2, manager3, manager4);
167 } finally {
168 manager4.shutdown();
169 }
170 }
171
172 /***
173 * Does a down cache manager in the cluster get removed?
174 */
175 @Test
176 public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
177
178 manager3.shutdown();
179
180 waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Arrays.asList(manager1.getCacheNames()), manager1, manager2);
181 }
182
183 /***
184 * Does a down cache manager in the cluster get removed?
185 */
186 @Test
187 public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
188 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
189 try {
190 Thread.sleep(2000);
191
192 manager3.shutdown();
193
194
195 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
196 for (String cacheName : manager1.getCacheNames()) {
197 List remotePeersOfCache1 = provider.listRemoteCachePeers(manager1.getCache(cacheName));
198 assertEquals(2, remotePeersOfCache1.size());
199 }
200 } finally {
201 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
202 Thread.sleep(2000);
203 }
204 }
205
206 /***
207 * Tests put and remove initiated from cache1 in a cluster
208 * <p/>
209 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
210 */
211 @Test
212 public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
213
214
215 final String[] cacheNames = manager1.getCacheNames();
216 Arrays.sort(cacheNames);
217 for (int i = 0; i < cacheNames.length; i++) {
218 String name = cacheNames[i];
219 manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
220
221 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
222 }
223
224 assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
225
226 public Boolean call() throws Exception {
227 for (int i = 0; i < cacheNames.length; i++) {
228 String name = cacheNames[i];
229 for (CacheManager manager : new CacheManager[] {manager2, manager3}) {
230 Element element = manager.getCache(name).get(Integer.toString(i));
231 assertNotNull("Cache : " + name, element);
232 assertEquals(Integer.toString(i), element.getKey());
233 assertEquals(Integer.valueOf(i), element.getValue());
234
235 assertNull(manager.getCache(name).get("nonSerializable" + i));
236 }
237 }
238 return Boolean.TRUE;
239 }
240 }, is(Boolean.TRUE));
241 }
242 }