1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.event;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNull;
21 import static org.junit.Assert.assertTrue;
22
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import net.sf.ehcache.AbstractCacheTest;
32 import net.sf.ehcache.Cache;
33 import net.sf.ehcache.CacheException;
34 import net.sf.ehcache.Ehcache;
35 import net.sf.ehcache.Element;
36 import net.sf.ehcache.config.CacheConfiguration;
37 import net.sf.ehcache.event.CacheEventListener;
38
39 import org.junit.Test;
40
41 public class BulkOpsEventListenerTest extends AbstractCacheTest {
42
43 @Test
44 public void testBulkOpsEventListener() throws Throwable {
45 CacheConfiguration cacheConfiguration = new CacheConfiguration();
46 cacheConfiguration.setName("cache");
47 cacheConfiguration.setMaxElementsInMemory(1000);
48 cacheConfiguration.setOverflowToDisk(true);
49 cacheConfiguration.setEternal(false);
50 cacheConfiguration.setTimeToLiveSeconds(100000);
51 cacheConfiguration.setTimeToIdleSeconds(200000);
52 cacheConfiguration.setDiskPersistent(false);
53 cacheConfiguration.setDiskExpiryThreadIntervalSeconds(1);
54 Cache cache = new Cache(cacheConfiguration);
55 manager.addCache(cache);
56
57 TestCacheEventListener eventListener = new TestCacheEventListener();
58 cache.getCacheEventNotificationService().registerListener(eventListener);
59
60 int numOfElements = 100;
61 Set<Element> elements = new HashSet<Element>();
62 for(int i = 0; i < numOfElements; i++){
63 elements.add(new Element("key" + i, "value" + i));
64 }
65 cache.putAll(elements);
66 assertEquals(numOfElements, cache.getSize());
67 assertEquals(numOfElements, eventListener.elementsPut.size());
68 assertEquals(elements, eventListener.elementsPut);
69
70 Set keySet1 = new HashSet<String>();
71 for(int i = 0; i < numOfElements; i++){
72 keySet1.add("key"+i);
73 }
74
75 Map<Object, Element> rv = cache.getAll(keySet1);
76 assertEquals(numOfElements, rv.size());
77
78 for(Element element : rv.values()){
79 assertTrue(elements.contains(element));
80 }
81
82 Collection<Element> values = rv.values();
83 for(Element element : elements){
84 assertTrue(values.contains(element));
85 }
86
87 Random rand = new Random();
88 Set keySet2 = new HashSet<String>();
89 for(int i = 0; i < numOfElements/2; i++){
90 keySet2.add("key" + rand.nextInt(numOfElements));
91 }
92
93 rv = cache.getAll(keySet2);
94 assertEquals(keySet2.size(), rv.size());
95
96 for(Element element : rv.values()){
97 assertTrue(elements.contains(element));
98 }
99
100 assertEquals(keySet2, rv.keySet());
101
102 cache.removeAll(keySet2);
103 assertEquals(numOfElements - keySet2.size(), cache.getSize());
104 assertEquals(keySet2.size(), eventListener.elementsRemoved.size());
105 Set<String> removedKeySet = new HashSet<String>();
106 for(Element element : eventListener.elementsRemoved){
107 removedKeySet.add(element.getKey().toString());
108 }
109 assertEquals(keySet2, removedKeySet);
110
111 for(Object key : keySet2){
112 assertNull(cache.get(key));
113 }
114
115 cache.removeAll();
116 assertEquals(0, cache.getSize());
117 }
118
119 @Test
120 public void testMultiThreadedBulkOps() throws InterruptedException{
121 Cache cache = new Cache("cache", 1000000, true, false, 100000, 200000, false, 1);
122 manager.addCache(cache);
123
124 TestCacheEventListener eventListener = new TestCacheEventListener();
125 cache.getCacheEventNotificationService().registerListener(eventListener);
126
127 Producer p1 = new Producer(cache, 0, 3 * 60 * 1000);
128 Producer p2 = new Producer(cache, 1000000, 3 * 60 * 1000);
129 Thread[] th = new Thread[4];
130 th[0] = new Thread(p1, "p1");
131 th[1] = new Thread(p2, "p2");
132 th[0].start();
133 th[1].start();
134
135 Consumer c1 = new Consumer(cache, 0, 2 * 60 * 1000);
136 Consumer c2 = new Consumer(cache, 1000000, 2 * 60 * 1000);
137 th[2] = new Thread(c1, "c1");
138 th[3] = new Thread(c2, "c2");
139
140 Thread.sleep(10000);
141 th[2].start();
142 th[3].start();
143
144 for(Thread t : th){
145 t.join();
146 }
147
148 assertEquals(p1.numPuts.intValue() + p2.numPuts.intValue(), eventListener.elementsPut.size());
149 assertEquals(c1.numRemoved.intValue() + c2.numRemoved.intValue(), eventListener.elementsRemoved.size());
150 }
151
152 private static class Producer implements Runnable {
153 private final AtomicInteger numPuts = new AtomicInteger(0);
154 private final Cache cache;
155 private final int startIndex;
156 private final int timeToRunMills;
157 private final long startTime = System.currentTimeMillis();
158
159 public Producer(Cache cache, int start, int timeToRunMills) {
160 this.cache = cache;
161 this.startIndex = start;
162 this.timeToRunMills = timeToRunMills;
163 }
164
165 public void run() {
166 int i = startIndex;
167 Random rand = new Random();
168 while(System.currentTimeMillis() - startTime <= timeToRunMills){
169 int batch = rand.nextInt(100);
170 Set<Element> elements = new HashSet<Element>();
171 for(int j = 0; j < batch; j++){
172 elements.add(new Element("key" + i, "value" + i));
173 i++;
174 }
175 this.cache.putAll(elements);
176 numPuts.addAndGet(batch);
177 try {
178 Thread.sleep(500);
179 } catch (InterruptedException e) {
180 e.printStackTrace();
181 }
182 }
183 }
184 }
185
186 private static class Consumer implements Runnable{
187 private final AtomicInteger numRemoved = new AtomicInteger(0);
188 private final Cache cache;
189 private final int startIndex;
190 private final int timeToRunMills;
191 private final long startTime = System.currentTimeMillis();
192
193 public Consumer(Cache cache, int start, int timeToRunMills) {
194 this.cache = cache;
195 this.startIndex = start;
196 this.timeToRunMills = timeToRunMills;
197 }
198
199 public void run() {
200 int i = startIndex;
201 Random rand = new Random();
202 while(System.currentTimeMillis() - startTime <= timeToRunMills){
203 int batch = rand.nextInt(100);
204 Set elements = new HashSet<String>();
205 for(int j = 0; j < batch; j++){
206 elements.add("key" + i);
207 i++;
208 }
209 this.cache.removeAll(elements);
210 numRemoved.addAndGet(batch);
211 try {
212 Thread.sleep(1000);
213 } catch (InterruptedException e) {
214 e.printStackTrace();
215 }
216 }
217 }
218 }
219
220 private static class TestCacheEventListener implements CacheEventListener{
221 Set<Element> elementsPut = Collections.synchronizedSet(new HashSet<Element>());
222 Set<Element> elementsUpdated = Collections.synchronizedSet(new HashSet<Element>());
223 Set<Element> elementsRemoved = Collections.synchronizedSet(new HashSet<Element>());
224
225 public void dispose() {
226
227
228 }
229
230 public void notifyElementEvicted(Ehcache cache, Element element) {
231
232
233 }
234
235 public void notifyElementExpired(Ehcache cache, Element element) {
236
237
238 }
239
240 public void notifyElementPut(Ehcache cache, Element element) throws CacheException {
241 elementsPut.add(element);
242 }
243
244 public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException {
245 elementsRemoved.add(element);
246 }
247
248 public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException {
249 elementsUpdated.add(element);
250 }
251
252 public void notifyRemoveAll(Ehcache cache) {
253
254 }
255
256 @Override
257 public Object clone() throws CloneNotSupportedException {
258 return super.clone();
259 }
260
261 }
262 }