View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  
20  import net.sf.ehcache.AbstractCacheTest;
21  import net.sf.ehcache.Cache;
22  import net.sf.ehcache.CacheManager;
23  import net.sf.ehcache.Ehcache;
24  import net.sf.ehcache.Element;
25  import net.sf.ehcache.util.RetryAssert;
26  
27  import org.hamcrest.collection.IsEmptyCollection;
28  import org.junit.After;
29  
30  import static org.junit.Assert.assertEquals;
31  import static org.junit.Assert.fail;
32  
33  import org.junit.Before;
34  import org.junit.Test;
35  
36  import java.net.SocketTimeoutException;
37  import java.rmi.RemoteException;
38  import java.rmi.UnmarshalException;
39  import java.util.ArrayList;
40  import java.util.Date;
41  import java.util.List;
42  import java.util.Set;
43  import java.util.concurrent.Callable;
44  import java.util.concurrent.TimeUnit;
45  
46  import org.slf4j.Logger;
47  import org.slf4j.LoggerFactory;
48  
49  /***
50   * Unit tests for RMICachePeer
51   * <p/>
52   * Note these tests need a live network interface running in multicast mode to work
53   *
54   * @author <a href="mailto:gluck@thoughtworks.com">Greg Luck</a>
55   * @version $Id: RMICacheManagerPeerTest.html 13146 2011-08-01 17:12:39Z oletizi $
56   */
57  public class RMICacheManagerPeerTest extends AbstractRMITest {
58  
59      private static final Logger LOG = LoggerFactory.getLogger(RMICacheManagerPeerTest.class.getName());
60  
61  
62      /***
63       * manager
64       */
65      protected CacheManager manager;
66      private final String hostName = "localhost";
67      private final Integer port = Integer.valueOf(40000);
68      private RMICacheManagerPeerListener peerListener;
69      private Cache cache;
70  
71  
72      /***
73       * {@inheritDoc}
74       *
75       * @throws Exception
76       */
77      @Before
78      public void setUp() throws Exception {
79          manager = CacheManager.create(AbstractCacheTest.TEST_CONFIG_DIR + "ehcache.xml");
80          cache = new Cache("test", 10, false, false, 10, 10);
81  
82          peerListener = new RMICacheManagerPeerListener(hostName, port, Integer.valueOf(0), manager, Integer.valueOf(2000));
83      }
84  
85      /***
86       * Shutdown the cache
87       */
88      @After
89      public void tearDown() throws InterruptedException {
90          Thread.sleep(20);
91          if (peerListener != null) {
92              peerListener.dispose();
93          }
94          manager.shutdown();
95  
96          RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Set<Thread>>() {
97              public Set<Thread> call() throws Exception {
98                  return getActiveReplicationThreads();
99              }
100         }, IsEmptyCollection.<Thread>empty());
101     }
102 
103 
104     /***
105      * Can we create the peer using remote port of 0?
106      */
107     @Test
108     public void testCreatePeerWithAutomaticRemotePort() throws RemoteException {
109         for (int i = 0; i < 10; i++) {
110             new RMICachePeer(cache, hostName, port, Integer.valueOf(0), Integer.valueOf(2000));
111         }
112     }
113 
114 
115     /***
116      * Can we create the peer using a specified free remote port of 45000
117      */
118     @Test
119     public void testCreatePeerWithSpecificRemotePort() throws RemoteException {
120         for (int i = 0; i < 10; i++) {
121             new RMICachePeer(cache, hostName, port, Integer.valueOf(45000), Integer.valueOf(2000));
122         }
123     }
124 
125 
126     /***
127      * See if socket.setSoTimeout(socketTimeoutMillis) works. Should throw a SocketTimeoutException
128      *
129      * @throws RemoteException
130      */
131     @Test
132     public void testFailsIfTimeoutExceeded() throws Exception {
133 
134         RMICachePeer rmiCachePeer = new SlowRMICachePeer(cache, hostName, port, Integer.valueOf(1000));
135         peerListener.addCachePeer(cache.getName(), rmiCachePeer);
136         peerListener.init();
137 
138 
139         try {
140             CachePeer cachePeer = new ManualRMICacheManagerPeerProvider().lookupRemoteCachePeer(rmiCachePeer.getUrl());
141             cachePeer.put(new Element("1", new Date()));
142             fail();
143         } catch (UnmarshalException e) {
144             assertEquals(SocketTimeoutException.class, e.getCause().getClass());
145         }
146     }
147 
148     /***
149      * See if socket.setSoTimeout(socketTimeoutMillis) works.
150      * Should not fail because the put takes less than the timeout.
151      *
152      * @throws RemoteException
153      */
154     @Test
155     public void testWorksIfTimeoutNotExceeded() throws Exception {
156 
157         cache = new Cache("test", 10, false, false, 10, 10);
158         RMICachePeer rmiCachePeer = new SlowRMICachePeer(cache, hostName, port, Integer.valueOf(2100));
159 
160         peerListener.addCachePeer(cache.getName(), rmiCachePeer);
161         peerListener.init();
162 
163         CachePeer cachePeer = new ManualRMICacheManagerPeerProvider().lookupRemoteCachePeer(rmiCachePeer.getUrl());
164         cachePeer.put(new Element("1", new Date()));
165     }
166 
167     /***
168      * Test send.
169      * <p/>
170      * This is a unit test because it was throwing AbstractMethodError if a method has changed signature,
171      * or NoSuchMethodError is a new one is added. The problem is that rmic needs
172      * to recompile the stub after any changes are made to the CachePeer source, something done by ant
173      * compile but not by the IDE.
174      */
175     @Test
176     public void testSend() throws Exception {
177 
178         cache = new Cache("test", 10, false, false, 10, 10);
179         RMICachePeer rmiCachePeer = new RMICachePeer(cache, hostName, port, Integer.valueOf(0), Integer.valueOf(2100));
180         manager.addCache(cache);
181 
182         peerListener.addCachePeer(cache.getName(), rmiCachePeer);
183         peerListener.init();
184 
185         CachePeer cachePeer = new ManualRMICacheManagerPeerProvider().lookupRemoteCachePeer(rmiCachePeer.getUrl());
186         Element element = new Element("1", new Date());
187         EventMessage eventMessage = new EventMessage(EventMessage.PUT, null, element);
188         List eventMessages = new ArrayList();
189         eventMessages.add(eventMessage);
190         cachePeer.send(eventMessages);
191     }
192 
193 
194     /***
195      * RMICachePeer that breaks in lots of interesting ways.
196      */
197     class SlowRMICachePeer extends RMICachePeer {
198 
199         /***
200          * Constructor
201          *
202          * @param cache
203          * @param hostName
204          * @param port
205          * @param socketTimeoutMillis
206          * @throws RemoteException
207          */
208         public SlowRMICachePeer(Ehcache cache, String hostName, Integer port, Integer socketTimeoutMillis)
209                 throws RemoteException {
210             super(cache, hostName, port, Integer.valueOf(0), socketTimeoutMillis);
211         }
212 
213         /***
214          * Puts an Element into the underlying cache without notifying listeners or updating statistics.
215          *
216          * @param element
217          * @throws java.rmi.RemoteException
218          * @throws IllegalArgumentException
219          * @throws IllegalStateException
220          */
221         @Override
222         public void put(Element element) throws RemoteException, IllegalArgumentException, IllegalStateException {
223             try {
224                 Thread.sleep(2000);
225             } catch (InterruptedException exception) {
226                 LOG.error(exception.getMessage(), exception);
227             }
228         }
229     }
230 }