View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.event;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNull;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashSet;
26  import java.util.Map;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import net.sf.ehcache.AbstractCacheTest;
32  import net.sf.ehcache.Cache;
33  import net.sf.ehcache.CacheException;
34  import net.sf.ehcache.Ehcache;
35  import net.sf.ehcache.Element;
36  import net.sf.ehcache.config.CacheConfiguration;
37  import net.sf.ehcache.event.CacheEventListener;
38  
39  import org.junit.Test;
40  
41  public class BulkOpsEventListenerTest extends AbstractCacheTest {
42  
43      @Test
44      public void testBulkOpsEventListener() throws Throwable {
45          CacheConfiguration cacheConfiguration = new CacheConfiguration();
46          cacheConfiguration.setName("cache");
47          cacheConfiguration.setMaxElementsInMemory(1000);
48          cacheConfiguration.setOverflowToDisk(true);
49          cacheConfiguration.setEternal(false);
50          cacheConfiguration.setTimeToLiveSeconds(100000);
51          cacheConfiguration.setTimeToIdleSeconds(200000);
52          cacheConfiguration.setDiskPersistent(false);
53          cacheConfiguration.setDiskExpiryThreadIntervalSeconds(1);
54          Cache cache = new Cache(cacheConfiguration);
55          manager.addCache(cache);
56  
57          TestCacheEventListener eventListener = new TestCacheEventListener();
58          cache.getCacheEventNotificationService().registerListener(eventListener);
59  
60          int numOfElements = 100;
61          Set<Element> elements = new HashSet<Element>();
62          for(int i = 0; i < numOfElements; i++){
63              elements.add(new Element("key" + i, "value" + i));
64          }
65          cache.putAll(elements);
66          assertEquals(numOfElements, cache.getSize());
67          assertEquals(numOfElements, eventListener.elementsPut.size());
68          assertEquals(elements, eventListener.elementsPut);
69  
70          Set keySet1 = new HashSet<String>();
71          for(int i = 0; i < numOfElements; i++){
72              keySet1.add("key"+i);
73          }
74  
75          Map<Object, Element> rv = cache.getAll(keySet1);
76          assertEquals(numOfElements, rv.size());
77  
78          for(Element element : rv.values()){
79              assertTrue(elements.contains(element));
80          }
81  
82          Collection<Element> values = rv.values();
83          for(Element element : elements){
84              assertTrue(values.contains(element));
85          }
86  
87          Random rand = new Random();
88          Set keySet2 = new HashSet<String>();
89          for(int i = 0; i < numOfElements/2; i++){
90              keySet2.add("key" + rand.nextInt(numOfElements));
91          }
92  
93          rv = cache.getAll(keySet2);
94          assertEquals(keySet2.size(), rv.size());
95  
96          for(Element element : rv.values()){
97              assertTrue(elements.contains(element));
98          }
99  
100         assertEquals(keySet2, rv.keySet());
101 
102         cache.removeAll(keySet2);
103         assertEquals(numOfElements - keySet2.size(), cache.getSize());
104         assertEquals(keySet2.size(), eventListener.elementsRemoved.size());
105         Set<String> removedKeySet = new HashSet<String>();
106         for(Element element : eventListener.elementsRemoved){
107             removedKeySet.add(element.getKey().toString());
108         }
109         assertEquals(keySet2, removedKeySet);
110 
111         for(Object key : keySet2){
112             assertNull(cache.get(key));
113         }
114 
115         cache.removeAll();
116         assertEquals(0, cache.getSize());
117     }
118 
119     @Test
120     public void testMultiThreadedBulkOps() throws InterruptedException{
121         Cache cache = new Cache("cache", 1000000, true, false, 100000, 200000, false, 1);
122         manager.addCache(cache);
123 
124         TestCacheEventListener eventListener = new TestCacheEventListener();
125         cache.getCacheEventNotificationService().registerListener(eventListener);
126 
127         Producer p1 = new Producer(cache, 0, 3 * 60 * 1000);
128         Producer p2 = new Producer(cache, 1000000, 3 * 60 * 1000);
129         Thread[] th = new Thread[4];
130         th[0] = new Thread(p1, "p1");
131         th[1] = new Thread(p2, "p2");
132         th[0].start();
133         th[1].start();
134 
135         Consumer c1 = new Consumer(cache, 0, 2 * 60 * 1000);
136         Consumer c2 = new Consumer(cache, 1000000, 2 * 60 * 1000);
137         th[2] = new Thread(c1, "c1");
138         th[3] = new Thread(c2, "c2");
139 
140         Thread.sleep(10000);
141         th[2].start();
142         th[3].start();
143 
144         for(Thread t : th){
145             t.join();
146         }
147 
148         assertEquals(p1.numPuts.intValue() + p2.numPuts.intValue(), eventListener.elementsPut.size());
149         assertEquals(c1.numRemoved.intValue() + c2.numRemoved.intValue(), eventListener.elementsRemoved.size());
150     }
151 
152     private static class Producer implements Runnable {
153         private final AtomicInteger numPuts = new AtomicInteger(0);
154         private final Cache cache;
155         private final int startIndex;
156         private final int timeToRunMills;
157         private final long startTime = System.currentTimeMillis();
158 
159         public Producer(Cache cache, int start, int timeToRunMills) {
160             this.cache = cache;
161             this.startIndex = start;
162             this.timeToRunMills = timeToRunMills;
163         }
164 
165         public void run() {
166             int i = startIndex;
167             Random rand = new Random();
168             while(System.currentTimeMillis() - startTime <= timeToRunMills){
169                 int batch = rand.nextInt(100);
170                 Set<Element> elements = new HashSet<Element>();
171                 for(int j = 0; j < batch; j++){
172                     elements.add(new Element("key" + i, "value" + i));
173                     i++;
174                 }
175                 this.cache.putAll(elements);
176                 numPuts.addAndGet(batch);
177                 try {
178                     Thread.sleep(500);
179                 } catch (InterruptedException e) {
180                     e.printStackTrace();
181                 }
182             }
183         }
184     }
185 
186     private static class Consumer implements Runnable{
187         private final AtomicInteger numRemoved = new AtomicInteger(0);
188         private final Cache cache;
189         private final int startIndex;
190         private final int timeToRunMills;
191         private final long startTime = System.currentTimeMillis();
192 
193         public Consumer(Cache cache, int start, int timeToRunMills) {
194             this.cache = cache;
195             this.startIndex = start;
196             this.timeToRunMills = timeToRunMills;
197         }
198 
199         public void run() {
200             int i = startIndex;
201             Random rand = new Random();
202             while(System.currentTimeMillis() - startTime <= timeToRunMills){
203                 int batch = rand.nextInt(100);
204                 Set elements = new HashSet<String>();
205                 for(int j = 0; j < batch; j++){
206                     elements.add("key" + i);
207                     i++;
208                 }
209                 this.cache.removeAll(elements);
210                 numRemoved.addAndGet(batch);
211                 try {
212                     Thread.sleep(1000);
213                 } catch (InterruptedException e) {
214                     e.printStackTrace();
215                 }
216             }
217         }
218     }
219 
220     private static class TestCacheEventListener implements CacheEventListener{
221         Set<Element> elementsPut = Collections.synchronizedSet(new HashSet<Element>());
222         Set<Element> elementsUpdated = Collections.synchronizedSet(new HashSet<Element>());
223         Set<Element> elementsRemoved = Collections.synchronizedSet(new HashSet<Element>());
224 
225         public void dispose() {
226             // TODO Auto-generated method stub
227 
228         }
229 
230         public void notifyElementEvicted(Ehcache cache, Element element) {
231             // TODO Auto-generated method stub
232 
233         }
234 
235         public void notifyElementExpired(Ehcache cache, Element element) {
236             // TODO Auto-generated method stub
237 
238         }
239 
240         public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
241             elementsPut.add(element);
242         }
243 
244         public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
245             elementsRemoved.add(element);
246         }
247 
248         public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
249             elementsUpdated.add(element);
250         }
251 
252         public void notifyRemoveAll(Ehcache cache) {
253             // TODO Auto-generated method stub
254         }
255 
256         @Override
257         public Object clone() throws CloneNotSupportedException {
258             return super.clone();
259         }
260 
261     }
262 }