1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import static net.sf.ehcache.util.RetryAssert.assertBy;
20 import static org.hamcrest.core.Is.is;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertThat;
23
24 import java.io.IOException;
25 import java.lang.management.ManagementFactory;
26 import java.rmi.server.RMISocketFactory;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.HashSet;
30 import java.util.Set;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.management.MBeanServer;
35 import javax.management.ObjectName;
36
37 import net.sf.ehcache.CacheManager;
38
39 import org.hamcrest.collection.IsEmptyCollection;
40 import org.junit.Assert;
41 import org.junit.BeforeClass;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public abstract class AbstractRMITest {
46
47 private static final Logger LOG = LoggerFactory.getLogger(AbstractRMITest.class);
48
49 @BeforeClass
50 public static void installRMISocketFactory() {
51 RMISocketFactory current = RMISocketFactory.getSocketFactory();
52 if (current == null) {
53 current = RMISocketFactory.getDefaultSocketFactory();
54 }
55 assertNotNull(current);
56 try {
57 RMISocketFactory.setSocketFactory(new SocketReusingRMISocketFactory(current));
58 LOG.info("Installed the SO_REUSEADDR setting socket factory");
59 } catch (IOException e) {
60 LOG.warn("Couldn't register the SO_REUSEADDR setting socket factory", e);
61 }
62 }
63
64 @BeforeClass
65 public static void checkActiveThreads() {
66 assertThat(getActiveReplicationThreads(), IsEmptyCollection.<Thread>empty());
67 }
68
69 protected static Set<Thread> getActiveReplicationThreads() {
70 Set<Thread> threads = new HashSet<Thread>();
71 for (Thread thread : JVMUtil.enumerateThreads()) {
72 if (thread.getName().equals("Replication Thread")) {
73 threads.add(thread);
74 }
75 }
76 return threads;
77 }
78
79 protected static final void setHeapDumpOnOutOfMemoryError(boolean value) {
80 try {
81 MBeanServer server = ManagementFactory.getPlatformMBeanServer();
82 ObjectName beanName = ObjectName.getInstance("com.sun.management:type=HotSpotDiagnostic");
83 Object vmOption = server.invoke(beanName, "setVMOption", new Object[] { "HeapDumpOnOutOfMemoryError", Boolean.toString(value) },
84 new String[] { "java.lang.String", "java.lang.String" });
85 LOG.info("Set HeapDumpOnOutOfMemoryError to: " + value);
86 } catch (Throwable t) {
87 LOG.info("Set HeapDumpOnOutOfMemoryError to: " + value + " - failed", t);
88 }
89 }
90
91 protected static Collection<Throwable> runTasks(Collection<Callable<Void>> tasks) {
92 final long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
93 final Collection<Throwable> errors = new ArrayList<Throwable>();
94
95
96 Collection<Thread> threads = new ArrayList<Thread>(tasks.size());
97 for (final Callable<Void> task : tasks) {
98 Assert.assertNotNull(task);
99 threads.add(new Thread() {
100 @Override
101 public void run() {
102 try {
103
104 while (System.nanoTime() < endTime) {
105 task.call();
106 }
107 } catch (Throwable t) {
108
109 errors.add(t);
110 }
111 }
112
113 });
114 }
115
116 for (Thread t : threads) {
117 t.start();
118 }
119
120 boolean interrupted = false;
121 try {
122 for (Thread t : threads) {
123 while (t.isAlive()) {
124 try {
125 t.join();
126 } catch (InterruptedException e) {
127 interrupted = true;
128 }
129 }
130 }
131 } finally {
132 if (interrupted) {
133 Thread.currentThread().interrupt();
134 }
135 }
136
137 return errors;
138 }
139
140 protected static void waitForClusterMembership(int time, TimeUnit unit, final Collection<String> cacheNames, final CacheManager ... managers) {
141 assertBy(time, unit, new Callable<Integer>() {
142
143 public Integer call() throws Exception {
144 Integer minimumPeers = null;
145 for (CacheManager manager : managers) {
146 CacheManagerPeerProvider peerProvider = manager.getCacheManagerPeerProvider("RMI");
147 for (String cacheName : cacheNames) {
148 int peers = peerProvider.listRemoteCachePeers(manager.getEhcache(cacheName)).size();
149 if (minimumPeers == null || peers < minimumPeers) {
150 minimumPeers = peers;
151 }
152 }
153 }
154 if (minimumPeers == null) {
155 return 0;
156 } else {
157 return minimumPeers + 1;
158 }
159 }
160 }, is(managers.length));
161 }
162 }