View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.locking;
18  
19  import java.util.Date;
20  import java.util.concurrent.CyclicBarrier;
21  
22  import junit.framework.TestCase;
23  import net.sf.ehcache.Cache;
24  import net.sf.ehcache.CacheManager;
25  import net.sf.ehcache.Element;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  /***
30   * @author Abhishek Sanoujam
31   */
32  public class ExplicitLockApiTest extends TestCase {
33  
34      private static final Logger LOG = LoggerFactory.getLogger(ExplicitLockApiTest.class);
35  
36      public void testExplicitLockApi() throws Exception {
37          CacheManager cm = CacheManager.create(ExplicitLockApiTest.class.getResourceAsStream("/nonstop/nonstop-config-test.xml"));
38          Cache cache = cm.getCache("defaultConfig");
39          basicCacheTest(cache);
40          explicitApiTest(cache);
41      }
42  
43      private void basicCacheTest(Cache cache) {
44          debug("Basic Cache Test");
45          assertNotNull(cache);
46          cache.put(new Element("key", "value"));
47          Element element = cache.get("key");
48          assertNotNull(element);
49          assertEquals("value", element.getValue());
50          debug("Basic Cache Test Done");
51      }
52  
53      private void explicitApiTest(Cache cache) throws Exception {
54          debug("Explicit API Test");
55          String key = "key";
56          CyclicBarrier barrier = new CyclicBarrier(3);
57          final Reader reader = new Reader(barrier, cache, key);
58          final Writer writer = new Writer(barrier, cache, key);
59  
60          Thread t1 = new Thread(reader, "Reader Thread");
61          Thread t2 = new Thread(writer, "Writer Thread");
62  
63          debug("Old Element for key: " + cache.get(key));
64          assertNotSame("new-value", cache.get(key).getValue());
65  
66          assertFalse(writer.writeLockAcquired);
67          assertFalse(writer.updatedValue);
68          assertFalse(writer.writeLockReleased);
69          assertFalse(writer.finished);
70  
71          assertFalse(reader.readLockAcquired);
72          assertFalse(reader.assertedNewValue);
73          assertFalse(reader.finished);
74  
75          t1.start();
76          t2.start();
77  
78          barrier.await();
79  
80          // acquire write lock
81          debug("Signalling writer to acquire write lock");
82          writer.signal();
83          writer.waitUntilSignalProcessed();
84          assertTrue(writer.writeLockAcquired);
85          assertFalse(writer.updatedValue);
86          assertFalse(writer.writeLockReleased);
87  
88          // attempt read lock
89          debug("Letting reader to attempt read lock");
90          reader.signal();
91  
92          long start = System.currentTimeMillis();
93          while (true) {
94              if (System.currentTimeMillis() - start >= 5000) {
95                  break;
96              }
97              debug("Asserting Read lock call is blocked");
98              assertFalse(reader.readLockAcquired);
99              Thread.sleep(1000);
100         }
101 
102         // update the value
103         debug("Signalling writer to update value");
104         writer.signal();
105         writer.waitUntilSignalProcessed();
106         Thread.sleep(1000);
107         assertTrue(writer.writeLockAcquired);
108         assertTrue(writer.updatedValue);
109         assertFalse(writer.writeLockReleased);
110 
111         // assert read lock is still blocked
112         start = System.currentTimeMillis();
113         while (true) {
114             if (System.currentTimeMillis() - start >= 5000) {
115                 break;
116             }
117             debug("Asserting Read lock call is blocked");
118             assertFalse(reader.readLockAcquired);
119             Thread.sleep(1000);
120         }
121 
122         // release write lock
123         debug("Signalling writer to release lock");
124         writer.signal();
125         writer.waitUntilSignalProcessed();
126         assertTrue(writer.writeLockAcquired);
127         assertTrue(writer.updatedValue);
128         assertTrue(writer.writeLockReleased);
129 
130         // wait until reader has processed the initial attempt-read-lock signal
131         reader.waitUntilSignalProcessed();
132         // assert acquired read lock
133         debug("Asserting read lock acquired");
134         assertTrue(reader.readLockAcquired);
135 
136         // assert new updated value in reader
137         debug("Letting reader check new updated value");
138         reader.signal();
139         reader.waitUntilSignalProcessed();
140         debug("Asserting reader got new updated value");
141         assertTrue(reader.assertedNewValue);
142 
143         t1.join();
144         t2.join();
145 
146         assertTrue(reader.finished);
147         assertTrue(writer.finished);
148         assertNull(reader.error);
149         assertNull(writer.error);
150 
151         debug("Explicit API Test Done");
152     }
153 
154     private static void debug(String string) {
155         LOG.info("[" + Thread.currentThread().getName() + "] [" + new Date().toString() + "] " + string);
156     }
157 
158     private abstract static class SignalRunnable implements Runnable {
159         private volatile boolean signalReceived = false;
160         private volatile boolean signalProcessed = true;
161         private final String name;
162 
163         public SignalRunnable(String name) {
164             this.name = name;
165         }
166 
167         public void waitUntilSignalProcessed() {
168             while (!signalProcessed) {
169                 try {
170                     debug("Signal[" + name + "]  not processed yet... sleeping for 1 sec");
171                     Thread.sleep(1000);
172                 } catch (InterruptedException e) {
173                     e.printStackTrace();
174                 }
175             }
176             debug("Last signal[" + name + "]  processed");
177         }
178 
179         protected void waitUntilSignalled() throws InterruptedException {
180             while (!signalReceived) {
181                 synchronized (this) {
182                     this.wait(500);
183                 }
184             }
185             debug("Received signal[" + name + "]  to go ahead");
186         }
187 
188         public void signal() {
189             synchronized (this) {
190                 signalReceived = true;
191                 signalProcessed = false;
192                 this.notifyAll();
193             }
194         }
195 
196         protected void markSignalProcessed() {
197             synchronized (this) {
198                 signalProcessed = true;
199                 signalReceived = false;
200             }
201         }
202     }
203 
204     private static class Reader extends SignalRunnable {
205         private final Cache cache;
206         private final String key;
207         private volatile Throwable error;
208         private volatile boolean finished;
209         private volatile boolean readLockAcquired;
210         private volatile boolean assertedNewValue;
211         private final CyclicBarrier barrier;
212 
213         public Reader(CyclicBarrier barrier, Cache cache, String key) {
214             super("Reader");
215             this.barrier = barrier;
216             this.cache = cache;
217             this.key = key;
218         }
219 
220         public void run() {
221             try {
222                 barrier.await();
223 
224                 waitUntilSignalled();
225                 cache.acquireReadLockOnKey(key);
226                 readLockAcquired = true;
227                 debug("Acquired read lock");
228                 markSignalProcessed();
229 
230                 waitUntilSignalled();
231                 Element element = cache.get(key);
232                 debug("Got element: " + element);
233                 cache.releaseReadLockOnKey(key);
234                 assertNotNull(element);
235                 assertEquals("new-value", element.getValue());
236                 assertedNewValue = true;
237                 markSignalProcessed();
238 
239             } catch (Throwable e) {
240                 e.printStackTrace();
241                 error = e;
242             } finally {
243                 finished = true;
244             }
245         }
246 
247     }
248 
249     private static class Writer extends SignalRunnable {
250         private final Cache cache;
251         private final String key;
252         private volatile Throwable error;
253         private volatile boolean finished;
254         private volatile boolean writeLockAcquired;
255         private volatile boolean updatedValue;
256         private volatile boolean writeLockReleased;
257         private final CyclicBarrier barrier;
258 
259         public Writer(CyclicBarrier barrier, Cache cache, String key) {
260             super("Writer");
261             this.barrier = barrier;
262             this.cache = cache;
263             this.key = key;
264         }
265 
266         public void run() {
267             try {
268                 barrier.await();
269 
270                 waitUntilSignalled();
271                 cache.acquireWriteLockOnKey(key);
272                 writeLockAcquired = true;
273                 debug("Write Lock Acquired");
274                 markSignalProcessed();
275 
276                 waitUntilSignalled();
277                 debug("Old Element for key: " + cache.get(key));
278                 cache.put(new Element(key, "new-value"));
279                 updatedValue = true;
280                 debug("Updated value");
281                 debug("Updated Element for key: " + cache.get(key));
282                 markSignalProcessed();
283 
284                 waitUntilSignalled();
285                 debug("Element for key: " + cache.get(key));
286                 cache.releaseWriteLockOnKey(key);
287                 writeLockReleased = true;
288                 debug("Write lock released");
289                 markSignalProcessed();
290 
291             } catch (Throwable e) {
292                 e.printStackTrace();
293                 error = e;
294             } finally {
295                 finished = true;
296             }
297         }
298 
299     }
300 
301 }