1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.writer;
18
19 import static org.hamcrest.core.Is.is;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertThat;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.util.Properties;
29 import java.util.concurrent.BrokenBarrierException;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CyclicBarrier;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34
35 import net.sf.ehcache.AbstractCacheTest;
36 import net.sf.ehcache.Cache;
37 import net.sf.ehcache.CacheEntry;
38 import net.sf.ehcache.CacheException;
39 import net.sf.ehcache.CacheManager;
40 import net.sf.ehcache.Element;
41 import net.sf.ehcache.Status;
42 import net.sf.ehcache.config.CacheConfiguration;
43 import net.sf.ehcache.config.CacheWriterConfiguration;
44 import net.sf.ehcache.event.CountingCacheEventListener;
45 import net.sf.ehcache.util.RetryAssert;
46 import net.sf.ehcache.writer.TestCacheWriterRetries.WriterEvent;
47
48 import org.hamcrest.core.Is;
49 import org.junit.After;
50 import org.junit.Before;
51 import org.junit.Test;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /***
56 * Tests for a CacheWriters
57 *
58 * @author Geert Bevin
59 * @version $Id: CacheWriterTest.html 13146 2011-08-01 17:12:39Z oletizi $
60 */
61 public class CacheWriterTest extends AbstractCacheTest {
62
63 private static final Logger LOG = LoggerFactory.getLogger(CacheWriterTest.class.getName());
64
65 protected CacheManager manager;
66
67 @Override
68 @Before
69 public void setUp() throws Exception {
70 manager = CacheManager.create(AbstractCacheTest.TEST_CONFIG_DIR + "ehcache-writer.xml");
71 }
72
73 @Override
74 @After
75 public void tearDown() throws Exception {
76 if (!manager.getStatus().equals(Status.STATUS_SHUTDOWN)) {
77 manager.shutdown();
78 }
79 }
80
81 @Test
82 public void testWriteThroughXml() {
83 Cache cache = manager.getCache("writeThroughCacheXml");
84 assertNotNull(cache.getRegisteredCacheWriter());
85
86 TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
87 assertTrue(writer.isInitialized());
88
89 Element el1 = new Element("key1", "value1");
90 Element el2 = new Element("key2", "value2");
91 Element el3 = new Element("key3", "value3");
92 assertEquals(0, writer.getWrittenElements().size());
93 cache.putWithWriter(el1);
94 assertEquals(1, writer.getWrittenElements().size());
95 cache.putWithWriter(el2);
96 assertEquals(2, writer.getWrittenElements().size());
97 cache.putWithWriter(el3);
98 assertEquals(3, writer.getWrittenElements().size());
99 assertEquals("value1", writer.getWrittenElements().get("key1").getValue());
100 assertEquals("value2", writer.getWrittenElements().get("key2").getValue());
101 assertEquals("value3", writer.getWrittenElements().get("key3").getValue());
102 cache.removeWithWriter(el2.getKey());
103 assertEquals(3, writer.getWrittenElements().size());
104 assertNotNull(writer.getWrittenElements().get("key1"));
105 assertNotNull(writer.getWrittenElements().get("key2"));
106 assertNotNull(writer.getWrittenElements().get("key3"));
107 assertEquals(1, writer.getDeletedElements().size());
108 assertTrue(writer.getDeletedElements().containsKey("key2"));
109 }
110
111 @Test
112 public void testWriteThroughXmlProperties() {
113 Cache cache = manager.getCache("writeThroughCacheXmlProperties");
114 assertNotNull(cache.getRegisteredCacheWriter());
115
116 TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
117 assertTrue(writer.isInitialized());
118
119 Element el1 = new Element("key1", "value1");
120 Element el2 = new Element("key2", "value2");
121 Element el3 = new Element("key3", "value3");
122 assertEquals(0, writer.getWrittenElements().size());
123 cache.putWithWriter(el1);
124 assertEquals(1, writer.getWrittenElements().size());
125 cache.putWithWriter(el2);
126 assertEquals(2, writer.getWrittenElements().size());
127 cache.putWithWriter(el3);
128 assertEquals(3, writer.getWrittenElements().size());
129 assertNull(writer.getWrittenElements().get("key1"));
130 assertNull(writer.getWrittenElements().get("key2"));
131 assertNull(writer.getWrittenElements().get("key3"));
132 assertEquals("value1", writer.getWrittenElements().get("prekey1suff").getValue());
133 assertEquals("value2", writer.getWrittenElements().get("prekey2suff").getValue());
134 assertEquals("value3", writer.getWrittenElements().get("prekey3suff").getValue());
135 cache.removeWithWriter(el1.getKey());
136 assertEquals(3, writer.getWrittenElements().size());
137 assertNotNull(writer.getWrittenElements().get("prekey1suff"));
138 assertNotNull(writer.getWrittenElements().get("prekey2suff"));
139 assertNotNull(writer.getWrittenElements().get("prekey3suff"));
140 assertEquals(1, writer.getDeletedElements().size());
141 assertTrue(writer.getDeletedElements().containsKey("prekey1suff"));
142 }
143
144 @Test
145 public void testWriteThroughJavaRegistration() {
146 Cache cache = manager.getCache("writeThroughCacheJavaRegistration");
147 assertNull(cache.getRegisteredCacheWriter());
148
149 TestCacheWriter writer = new TestCacheWriter(new Properties());
150 cache.registerCacheWriter(writer);
151 assertTrue(writer.isInitialized());
152
153 Element el1 = new Element("key1", "value1");
154 Element el2 = new Element("key2", "value2");
155 Element el3 = new Element("key3", "value3");
156 Element el4 = new Element("key4", "value4");
157 assertEquals(0, writer.getWrittenElements().size());
158 cache.putWithWriter(el1);
159 assertEquals(1, writer.getWrittenElements().size());
160 cache.putWithWriter(el2);
161 assertEquals(2, writer.getWrittenElements().size());
162 cache.putWithWriter(el3);
163 assertEquals(3, writer.getWrittenElements().size());
164 cache.putWithWriter(el4);
165 assertEquals(4, writer.getWrittenElements().size());
166 assertEquals("value1", writer.getWrittenElements().get("key1").getValue());
167 assertEquals("value2", writer.getWrittenElements().get("key2").getValue());
168 assertEquals("value3", writer.getWrittenElements().get("key3").getValue());
169 assertEquals("value4", writer.getWrittenElements().get("key4").getValue());
170 cache.removeWithWriter(el1.getKey());
171 cache.removeWithWriter(el2.getKey());
172 cache.removeWithWriter(el3.getKey());
173 assertEquals(4, writer.getWrittenElements().size());
174 assertNotNull(writer.getWrittenElements().get("key1"));
175 assertNotNull(writer.getWrittenElements().get("key2"));
176 assertNotNull(writer.getWrittenElements().get("key3"));
177 assertNotNull(writer.getWrittenElements().get("key4"));
178 assertEquals(3, writer.getDeletedElements().size());
179 assertTrue(writer.getDeletedElements().containsKey("key1"));
180 assertTrue(writer.getDeletedElements().containsKey("key2"));
181 assertTrue(writer.getDeletedElements().containsKey("key3"));
182 }
183
184 @Test
185 public void testWriteThroughNotifyListeners() {
186 Cache cache = new Cache(new CacheConfiguration("writeThroughCacheOnly", 10)
187 .cacheEventListenerFactory(new CacheConfiguration.CacheEventListenerFactoryConfiguration()
188 .className("net.sf.ehcache.event.CountingCacheEventListenerFactory")));
189 assertNull(cache.getRegisteredCacheWriter());
190
191 CacheManager.getInstance().addCache(cache);
192
193 TestCacheWriterException writer = new TestCacheWriterException();
194 cache.registerCacheWriter(writer);
195 assertTrue(writer.isInitialized());
196
197
198 cache.getCacheConfiguration().getCacheWriterConfiguration().setNotifyListenersOnException(false);
199
200 CountingCacheEventListener.resetCounters();
201
202 try {
203 cache.putWithWriter(new Element("key1", "value1"));
204 fail("Expected UnsupportedOperationException");
205 } catch (UnsupportedOperationException e) {
206
207 assertEquals(0, CountingCacheEventListener.getCacheElementsPut(cache).size());
208 } catch(Throwable e) {
209 fail("Didn't expect a " + e.getClass().getName());
210 }
211
212 CountingCacheEventListener.resetCounters();
213
214 try {
215 cache.putWithWriter(new Element("key1", "value1"));
216 fail("Expected UnsupportedOperationException");
217 } catch (UnsupportedOperationException e) {
218
219 assertEquals(0, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
220 } catch(Throwable e) {
221 fail("Didn't expect a " + e.getClass().getName());
222 }
223
224 CountingCacheEventListener.resetCounters();
225
226 try {
227 cache.removeWithWriter("key1");
228 fail("Expected UnsupportedOperationException");
229 } catch (UnsupportedOperationException e) {
230
231 assertEquals(0, CountingCacheEventListener.getCacheElementsRemoved(cache).size());
232 } catch(Throwable e) {
233 fail("Didn't expect a " + e.getClass().getName());
234 }
235
236
237 cache.getCacheConfiguration().getCacheWriterConfiguration().setNotifyListenersOnException(true);
238
239 CountingCacheEventListener.resetCounters();
240
241 try {
242 cache.putWithWriter(new Element("key1", "value1"));
243 fail("Expected UnsupportedOperationException");
244 } catch (UnsupportedOperationException e) {
245
246 assertEquals(0, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
247 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(cache).size());
248 } catch(Throwable e) {
249 fail("Didn't expect a " + e.getClass().getName());
250 }
251
252 CountingCacheEventListener.resetCounters();
253
254 try {
255 assertNotNull(cache.get("key1"));
256 cache.putWithWriter(new Element("key1", "value1"));
257 fail("Expected UnsupportedOperationException");
258 } catch (UnsupportedOperationException e) {
259
260 assertEquals(0, CountingCacheEventListener.getCacheElementsPut(cache).size());
261 assertEquals(1, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
262 } catch(Throwable e) {
263 fail("Didn't expect a " + e.getClass().getName());
264 }
265
266 CountingCacheEventListener.resetCounters();
267
268 try {
269 cache.removeWithWriter("key1");
270 fail("Expected UnsupportedOperationException");
271 } catch (UnsupportedOperationException e) {
272
273 assertEquals(1, CountingCacheEventListener.getCacheElementsRemoved(cache).size());
274 }
275 }
276
277 @Test
278 public void testWriteBehindSolelyJava() throws InterruptedException {
279 Cache cache = new Cache(
280 new CacheConfiguration("writeBehindSolelyJava", 10)
281 .cacheWriter(new CacheWriterConfiguration()
282 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
283 .minWriteDelay(2)
284 .maxWriteDelay(8)
285 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
286 .className("net.sf.ehcache.writer.TestCacheWriterFactory"))));
287 assertNotNull(cache.getRegisteredCacheWriter());
288
289 CacheManager.getInstance().addCache(cache);
290 TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
291 assertTrue(writer.isInitialized());
292 assertEquals(0, writer.getWrittenElements().size());
293
294 Element el1 = new Element("key1", "value1");
295 Element el2 = new Element("key2", "value2");
296 Element el3 = new Element("key3", "value3");
297 cache.putWithWriter(el1);
298 cache.putWithWriter(el2);
299 cache.putWithWriter(el3);
300
301 Thread.sleep(3000);
302
303 assertEquals(3, writer.getWrittenElements().size());
304 assertNotNull(writer.getWrittenElements().get("key1"));
305 assertNotNull(writer.getWrittenElements().get("key2"));
306 assertNotNull(writer.getWrittenElements().get("key3"));
307 assertEquals(0, writer.getDeletedElements().size());
308
309 cache.removeWithWriter(el2.getKey());
310 cache.removeWithWriter(el3.getKey());
311
312 assertEquals(3, writer.getWrittenElements().size());
313 assertEquals(0, writer.getDeletedElements().size());
314
315 Thread.sleep(3000);
316
317 assertEquals(3, writer.getWrittenElements().size());
318 assertEquals(2, writer.getDeletedElements().size());
319 assertTrue(writer.getDeletedElements().containsKey("key2"));
320 assertTrue(writer.getDeletedElements().containsKey("key3"));
321 }
322
323 @Test
324 public void testWriteBehindStopWaitsForEmptyQueue() throws InterruptedException {
325 Cache cache = new Cache(
326 new CacheConfiguration("writeBehindSolelyJavaStopWaitsForEmptyQueue", 10)
327 .cacheWriter(new CacheWriterConfiguration()
328 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
329 .minWriteDelay(2)
330 .maxWriteDelay(8)));
331 TestCacheWriterSlow writer = new TestCacheWriterSlow();
332 cache.registerCacheWriter(writer);
333 assertNotNull(cache.getRegisteredCacheWriter());
334
335 CacheManager.getInstance().addCache(cache);
336 assertTrue(writer.isInitialized());
337 assertEquals(0, writer.getWrittenElements().size());
338
339 Element el1 = new Element("key1", "value1");
340 Element el2 = new Element("key2", "value2");
341 Element el3 = new Element("key3", "value3");
342 cache.putWithWriter(el1);
343 cache.putWithWriter(el2);
344 cache.putWithWriter(el3);
345 cache.removeWithWriter(el2.getKey());
346 cache.removeWithWriter(el3.getKey());
347 cache.dispose();
348
349 Thread.sleep(3000);
350
351 assertEquals(3, writer.getWrittenElements().size());
352 assertNotNull(writer.getWrittenElements().get("key1"));
353 assertNotNull(writer.getWrittenElements().get("key2"));
354 assertNotNull(writer.getWrittenElements().get("key3"));
355 assertEquals(3, writer.getWrittenElements().size());
356 assertEquals(2, writer.getDeletedElements().size());
357 assertTrue(writer.getDeletedElements().containsKey("key2"));
358 assertTrue(writer.getDeletedElements().containsKey("key3"));
359 }
360
361 @Test
362 public void testWriteBehindBlocksWhenFull() throws Exception {
363 final Cache cache = new Cache(
364 new CacheConfiguration("writeBehindBlocksWhenFull", 10)
365 .cacheWriter(new CacheWriterConfiguration()
366 .writeBehindMaxQueueSize(3)
367 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)));
368 final CyclicBarrier barrier = new CyclicBarrier(2);
369 TestCacheWriter writer = new TestCacheWriter(new Properties()) {
370
371 @Override
372 public void write(final Element element) throws CacheException {
373 if(super.getWrittenElements().size() == 0) {
374 try {
375 barrier.await();
376 barrier.await(5, TimeUnit.SECONDS);
377 } catch (TimeoutException e) {
378
379 } catch (Exception e) {
380 throw new RuntimeException(e);
381 }
382 }
383 super.write(element);
384 }
385
386 @Override
387 public void delete(final CacheEntry entry) throws CacheException {
388 super.delete(entry);
389 }
390 };
391 cache.registerCacheWriter(writer);
392 assertNotNull(cache.getRegisteredCacheWriter());
393
394 CacheManager.getInstance().addCache(cache);
395 assertTrue(writer.isInitialized());
396 assertEquals(0, writer.getWrittenElements().size());
397
398 Element el1 = new Element("key1", "value1");
399 Element el2 = new Element("key2", "value2");
400 Element el3 = new Element("key3", "value3");
401 cache.putWithWriter(el1);
402 barrier.await();
403 long before = System.nanoTime();
404 cache.putWithWriter(el2);
405 assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
406 before = System.nanoTime();
407 cache.putWithWriter(el3);
408 assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
409 before = System.nanoTime();
410 cache.removeWithWriter(el2.getKey());
411 assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
412 before = System.nanoTime();
413 cache.removeWithWriter(el3.getKey());
414 assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS) > 4, is(true));
415 try {
416 barrier.await();
417 fail("this should have failed!");
418 } catch (BrokenBarrierException e) {
419
420 }
421
422 cache.dispose();
423
424 Thread.sleep(500);
425
426 assertEquals(3, writer.getWrittenElements().size());
427 assertNotNull(writer.getWrittenElements().get("key1"));
428 assertNotNull(writer.getWrittenElements().get("key2"));
429 assertNotNull(writer.getWrittenElements().get("key3"));
430 assertEquals(3, writer.getWrittenElements().size());
431 assertEquals(2, writer.getDeletedElements().size());
432 assertTrue(writer.getDeletedElements().containsKey("key2"));
433 assertTrue(writer.getDeletedElements().containsKey("key3"));
434 }
435
436 @Test
437 public void testWriteBehindBatched() throws InterruptedException {
438 Cache cache = new Cache(
439 new CacheConfiguration("writeBehindBatched", 10)
440 .cacheWriter(new CacheWriterConfiguration()
441 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
442 .minWriteDelay(1)
443 .maxWriteDelay(4)
444 .writeBatching(true)
445 .writeBatchSize(10)
446 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
447 .className("net.sf.ehcache.writer.TestCacheWriterFactory")
448 .properties("key.prefix=pre2; key.suffix=suff2")
449 .propertySeparator(";"))));
450 assertNotNull(cache.getRegisteredCacheWriter());
451
452 CacheManager.getInstance().addCache(cache);
453 TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
454 assertTrue(writer.isInitialized());
455 assertEquals(0, writer.getWrittenElements().size());
456
457 Element el1 = new Element("key1", "value1");
458 Element el2 = new Element("key2", "value2");
459 Element el3 = new Element("key3", "value3");
460 cache.putWithWriter(el1);
461 cache.putWithWriter(el2);
462 cache.putWithWriter(el3);
463
464 Thread.sleep(3000);
465
466 assertEquals(0, writer.getWrittenElements().size());
467
468 Thread.sleep(2000);
469
470 assertEquals(3, writer.getWrittenElements().size());
471 assertFalse(writer.getWrittenElements().containsKey("key1"));
472 assertFalse(writer.getWrittenElements().containsKey("key2"));
473 assertFalse(writer.getWrittenElements().containsKey("key3"));
474 assertFalse(writer.getWrittenElements().containsKey("pre2key1suff2"));
475 assertFalse(writer.getWrittenElements().containsKey("pre2key2suff2"));
476 assertFalse(writer.getWrittenElements().containsKey("pre2key3suff2"));
477 assertTrue(writer.getWrittenElements().containsKey("pre2key1suff2-batched"));
478 assertTrue(writer.getWrittenElements().containsKey("pre2key2suff2-batched"));
479 assertTrue(writer.getWrittenElements().containsKey("pre2key3suff2-batched"));
480
481 assertEquals(0, writer.getDeletedElements().size());
482
483 cache.removeWithWriter(el2.getKey());
484 cache.removeWithWriter(el3.getKey());
485
486 Thread.sleep(2000);
487
488 assertEquals(3, writer.getWrittenElements().size());
489 assertEquals(0, writer.getDeletedElements().size());
490
491 Thread.sleep(3000);
492
493 assertEquals(3, writer.getWrittenElements().size());
494 assertEquals(2, writer.getDeletedElements().size());
495 assertTrue(writer.getDeletedElements().containsKey("pre2key2suff2-batched"));
496 assertTrue(writer.getDeletedElements().containsKey("pre2key3suff2-batched"));
497 }
498
499 @Test
500 public void testWriteBehindBatchedCoalescing() throws InterruptedException {
501 Cache cache = new Cache(
502 new CacheConfiguration("writeBehindBatchedCoalescing", 10)
503 .cacheWriter(new CacheWriterConfiguration()
504 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
505 .minWriteDelay(1)
506 .maxWriteDelay(1)
507 .writeBatching(true)
508 .writeBatchSize(10)
509 .writeCoalescing(true)
510 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
511 .className("net.sf.ehcache.writer.TestCacheWriterFactory"))));
512 assertNotNull(cache.getRegisteredCacheWriter());
513
514 CacheManager.getInstance().addCache(cache);
515 TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
516 assertTrue(writer.isInitialized());
517 assertEquals(0, writer.getWrittenElements().size());
518
519 Element el1 = new Element("key1", "value1");
520 Element el2a = new Element("key2", "value2a");
521 Element el2b = new Element("key2", "value2b");
522 Element el3 = new Element("key3", "value3");
523 cache.putWithWriter(el1);
524 cache.putWithWriter(el2a);
525 cache.putWithWriter(el3);
526 cache.putWithWriter(el2b);
527 cache.removeWithWriter("key1");
528
529 Thread.sleep(2000);
530
531 assertEquals(2, writer.getWrittenElements().size());
532
533 assertFalse(writer.getWrittenElements().containsKey("key1-batched"));
534 assertTrue(writer.getWrittenElements().containsKey("key2-batched"));
535 assertTrue(writer.getWrittenElements().containsKey("key3-batched"));
536 assertEquals("value2b", writer.getWrittenElements().get("key2-batched").getObjectValue());
537 assertEquals("value3", writer.getWrittenElements().get("key3-batched").getObjectValue());
538
539 assertEquals(1, writer.getDeletedElements().size());
540
541 assertTrue(writer.getDeletedElements().containsKey("key1-batched"));
542 }
543
544 @Test
545 public void testWriteBehindExceptionWithoutRetrying() throws InterruptedException {
546 Cache cache = new Cache(
547 new CacheConfiguration("writeBehindExceptionWithoutRetrying", 10)
548 .cacheWriter(new CacheWriterConfiguration()
549 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
550 .minWriteDelay(0)
551 .maxWriteDelay(0)
552 .retryAttempts(0)
553 .retryAttemptDelaySeconds(0)));
554 TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
555 cache.registerCacheWriter(writer);
556
557 CacheManager.getInstance().addCache(cache);
558 assertTrue(writer.isInitialized());
559 assertEquals(0, writer.getWriterEvents().size());
560
561 cache.putWithWriter(new Element("key1", "value1"));
562 try {
563 cache.putWithWriter(new Element("key2", "value1"));
564 cache.putWithWriter(new Element("key3", "value1"));
565 cache.removeWithWriter("key2");
566 } catch (CacheException e) {
567 System.err.println("We had an error trying to write: " + e.getMessage()
568 + "\nBut that's OK: the writer got shutdown faster than we could *WithWriter");
569 }
570
571 Thread.sleep(2000);
572
573 assertEquals(0, writer.getWriterEvents().size());
574 }
575
576 @Test
577 public void testWriteBehindRetryWithoutExceptions() throws InterruptedException {
578 Cache cache = new Cache(
579 new CacheConfiguration("writeBehindRetryWithoutExceptions", 10)
580 .cacheWriter(new CacheWriterConfiguration()
581 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
582 .minWriteDelay(0)
583 .maxWriteDelay(0)
584 .retryAttempts(3)
585 .retryAttemptDelaySeconds(0)));
586 TestCacheWriterRetries writer = new TestCacheWriterRetries(0);
587 cache.registerCacheWriter(writer);
588
589 CacheManager.getInstance().addCache(cache);
590 assertTrue(writer.isInitialized());
591 assertEquals(0, writer.getWriterEvents().size());
592
593 cache.putWithWriter(new Element("key1", "value1"));
594 cache.putWithWriter(new Element("key2", "value1"));
595 cache.putWithWriter(new Element("key3", "value1"));
596 cache.removeWithWriter("key2");
597
598 Thread.sleep(2000);
599
600 assertEquals(4, writer.getWriterEvents().size());
601 assertEquals(1, (long) writer.getWriteCount().get("key1"));
602 assertEquals(1, (long) writer.getWriteCount().get("key2"));
603 assertEquals(1, (long) writer.getWriteCount().get("key3"));
604 assertFalse(writer.getDeleteCount().containsKey("key1"));
605 assertEquals(1, (long) writer.getDeleteCount().get("key2"));
606 assertFalse(writer.getDeleteCount().containsKey("key3"));
607 }
608
609 @Test
610 public void testWriteBehindRetryWithoutDelay() throws InterruptedException {
611 Cache cache = new Cache(
612 new CacheConfiguration("writeBehindRetryWithoutDelay", 10)
613 .cacheWriter(new CacheWriterConfiguration()
614 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
615 .minWriteDelay(0)
616 .maxWriteDelay(0)
617 .retryAttempts(3)
618 .retryAttemptDelaySeconds(0)));
619 TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
620 cache.registerCacheWriter(writer);
621
622 CacheManager.getInstance().addCache(cache);
623 assertTrue(writer.isInitialized());
624 assertEquals(0, writer.getWriterEvents().size());
625
626 cache.putWithWriter(new Element("key1", "value1"));
627 cache.putWithWriter(new Element("key2", "value2"));
628 cache.putWithWriter(new Element("key3", "value3"));
629 cache.removeWithWriter("key2");
630
631 Thread.sleep(2000);
632
633 assertEquals(4, writer.getWriterEvents().size());
634 assertEquals(1, (long) writer.getWriteCount().get("key1"));
635 assertEquals(1, (long) writer.getWriteCount().get("key2"));
636 assertEquals(1, (long) writer.getWriteCount().get("key3"));
637 assertEquals(1, (long) writer.getDeleteCount().get("key2"));
638 }
639
640 @Test
641 public void testWriteBehindRetryWithDelay() throws InterruptedException {
642 final long tolerance = TimeUnit.MILLISECONDS.toNanos(200);
643
644 Cache cache = new Cache(
645 new CacheConfiguration("writeBehindRetryWithDelay", 10)
646 .cacheWriter(new CacheWriterConfiguration()
647 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
648 .minWriteDelay(0)
649 .maxWriteDelay(0)
650 .retryAttempts(3)
651 .retryAttemptDelaySeconds(1)));
652 final TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
653 cache.registerCacheWriter(writer);
654
655 CacheManager.getInstance().addCache(cache);
656 assertTrue(writer.isInitialized());
657 assertEquals(0, writer.getWriterEvents().size());
658
659 long start = System.nanoTime();
660 cache.putWithWriter(new Element("key1", "value1"));
661 cache.putWithWriter(new Element("key2", "value2"));
662 cache.putWithWriter(new Element("key3", "value3"));
663 cache.removeWithWriter("key2");
664
665 RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Integer>() {
666 public Integer call() throws Exception {
667 return writer.getWriterEvents().size();
668 }
669 }, Is.is(4));
670
671 {
672 WriterEvent event = writer.getWriterEvents().get(0);
673 assertEquals(0, event.getWrittenSize());
674 assertEquals(0, event.getWriteCount("key1"));
675 assertEquals(0, event.getWriteCount("key2"));
676 assertEquals(0, event.getWriteCount("key3"));
677 assertEquals(0, event.getDeleteCount("key2"));
678 assertEquals("key1", event.getAddedElement().getObjectKey());
679 long time = event.getTime() - start;
680 long expected = TimeUnit.SECONDS.toNanos(3);
681 assertTrue("Write-1 time : " + time, time >= (expected - tolerance));
682 }
683
684 {
685 WriterEvent event = writer.getWriterEvents().get(1);
686 assertEquals(1, event.getWrittenSize());
687 assertEquals(1, event.getWriteCount("key1"));
688 assertEquals(0, event.getWriteCount("key2"));
689 assertEquals(0, event.getWriteCount("key3"));
690 assertEquals(0, event.getDeleteCount("key2"));
691 assertEquals("key2", event.getAddedElement().getObjectKey());
692 long time = event.getTime() - start;
693 long expected = TimeUnit.SECONDS.toNanos(6);
694 assertTrue("Write-2 time : " + time, time >= (expected - tolerance));
695 }
696
697 {
698 WriterEvent event = writer.getWriterEvents().get(2);
699 assertEquals(2, event.getWrittenSize());
700 assertEquals(1, event.getWriteCount("key1"));
701 assertEquals(1, event.getWriteCount("key2"));
702 assertEquals(0, event.getWriteCount("key3"));
703 assertEquals(0, event.getDeleteCount("key2"));
704 assertEquals("key3", event.getAddedElement().getObjectKey());
705 long time = event.getTime() - start;
706 long expected = TimeUnit.SECONDS.toNanos(9);
707 assertTrue("Write-3 time : " + time, time >= (expected - tolerance));
708 }
709
710 {
711 WriterEvent event = writer.getWriterEvents().get(3);
712 assertEquals(3, event.getWrittenSize());
713 assertEquals(1, event.getWriteCount("key1"));
714 assertEquals(1, event.getWriteCount("key2"));
715 assertEquals(1, event.getWriteCount("key3"));
716 assertEquals(0, event.getDeleteCount("key2"));
717 assertEquals("key2", event.getRemovedKey());
718 long time = event.getTime() - start;
719 long expected = TimeUnit.SECONDS.toNanos(12);
720 assertTrue("Delete-2 time : " + time, time >= (expected - tolerance));
721 }
722
723 assertEquals(4, writer.getWriterEvents().size());
724 assertEquals(1, writer.getWriteCount().get("key1").intValue());
725 assertEquals(1, writer.getWriteCount().get("key2").intValue());
726 assertEquals(1, writer.getWriteCount().get("key3").intValue());
727 assertEquals(1, writer.getDeleteCount().get("key2").intValue());
728 }
729
730 @Test
731 public void testWriteBehindExceptionWithoutRetryingBatched() throws InterruptedException {
732 Cache cache = new Cache(
733 new CacheConfiguration("writeBehindExceptionWithoutRetryingBatched", 10)
734 .cacheWriter(new CacheWriterConfiguration()
735 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
736 .minWriteDelay(1)
737 .maxWriteDelay(1)
738 .writeBatching(true)
739 .writeBatchSize(10)
740 .retryAttempts(0)
741 .retryAttemptDelaySeconds(0)));
742 TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
743 cache.registerCacheWriter(writer);
744
745 CacheManager.getInstance().addCache(cache);
746 assertTrue(writer.isInitialized());
747 assertEquals(0, writer.getWriterEvents().size());
748
749 cache.putWithWriter(new Element("key1", "value1"));
750 cache.putWithWriter(new Element("key2", "value1"));
751 cache.putWithWriter(new Element("key3", "value1"));
752 cache.removeWithWriter("key2");
753
754 Thread.sleep(2000);
755
756 assertEquals(2, writer.getWriterEvents().size());
757 assertEquals(1, (long) writer.getWriteCount().get("key1"));
758 assertEquals(1, (long) writer.getWriteCount().get("key2"));
759 assertFalse(writer.getWriteCount().containsKey("key3"));
760 assertFalse(writer.getDeleteCount().containsKey("key1"));
761 assertFalse(writer.getDeleteCount().containsKey("key2"));
762 assertFalse(writer.getDeleteCount().containsKey("key3"));
763 }
764
765 @Test
766 public void testWriteBehindRetryWithoutExceptionsBatched() throws InterruptedException {
767 Cache cache = new Cache(
768 new CacheConfiguration("writeBehindRetryWithoutExceptionsBatched", 10)
769 .cacheWriter(new CacheWriterConfiguration()
770 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
771 .minWriteDelay(1)
772 .maxWriteDelay(1)
773 .writeBatching(true)
774 .writeBatchSize(10)
775 .retryAttempts(3)
776 .retryAttemptDelaySeconds(0)));
777 TestCacheWriterRetries writer = new TestCacheWriterRetries(0);
778 cache.registerCacheWriter(writer);
779
780 CacheManager.getInstance().addCache(cache);
781 assertTrue(writer.isInitialized());
782 assertEquals(0, writer.getWriterEvents().size());
783
784 cache.putWithWriter(new Element("key1", "value1"));
785 cache.putWithWriter(new Element("key2", "value2"));
786 cache.putWithWriter(new Element("key3", "value3"));
787 cache.removeWithWriter("key2");
788
789 Thread.sleep(2000);
790
791 assertEquals(4, writer.getWriterEvents().size());
792 assertEquals(1, (long) writer.getWriteCount().get("key1"));
793 assertEquals(1, (long) writer.getWriteCount().get("key2"));
794 assertEquals(1, (long) writer.getWriteCount().get("key3"));
795 assertFalse(writer.getDeleteCount().containsKey("key1"));
796 assertEquals(1, (long) writer.getDeleteCount().get("key2"));
797 assertFalse(writer.getDeleteCount().containsKey("key3"));
798 }
799
800 @Test
801 public void testWriteBehindRetryWithoutDelayBatched() throws InterruptedException {
802 Cache cache = new Cache(
803 new CacheConfiguration("writeBehindRetryWithoutDelayBatched", 10)
804 .cacheWriter(new CacheWriterConfiguration()
805 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
806 .minWriteDelay(1)
807 .maxWriteDelay(1)
808 .writeBatching(true)
809 .writeBatchSize(10)
810 .retryAttempts(3)
811 .retryAttemptDelaySeconds(0)));
812 TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
813 cache.registerCacheWriter(writer);
814
815 CacheManager.getInstance().addCache(cache);
816 assertTrue(writer.isInitialized());
817 assertEquals(0, writer.getWriterEvents().size());
818
819 cache.putWithWriter(new Element("key1", "value1"));
820 cache.putWithWriter(new Element("key2", "value2"));
821 cache.putWithWriter(new Element("key3", "value3"));
822 cache.removeWithWriter("key2");
823
824 Thread.sleep(2000);
825
826 assertEquals(10, writer.getWriterEvents().size());
827 assertEquals(4, (long) writer.getWriteCount().get("key1"));
828 assertEquals(4, (long) writer.getWriteCount().get("key2"));
829 assertEquals(1, (long) writer.getWriteCount().get("key3"));
830 assertEquals(1, (long) writer.getDeleteCount().get("key2"));
831 }
832
833 @Test
834 public void testWriteBehindRetryWithDelayBatched() throws InterruptedException {
835 Cache cache = new Cache(
836 new CacheConfiguration("writeBehindRetryWithDelayBatched", 10)
837 .cacheWriter(new CacheWriterConfiguration()
838 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
839 .minWriteDelay(1)
840 .maxWriteDelay(1)
841 .writeBatching(true)
842 .writeBatchSize(10)
843 .retryAttempts(3)
844 .retryAttemptDelaySeconds(1)));
845 TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
846 cache.registerCacheWriter(writer);
847
848 CacheManager.getInstance().addCache(cache);
849 assertTrue(writer.isInitialized());
850 assertEquals(0, writer.getWriterEvents().size());
851
852 cache.putWithWriter(new Element("key1", "value1"));
853 cache.putWithWriter(new Element("key2", "value2"));
854 cache.putWithWriter(new Element("key3", "value3"));
855 cache.removeWithWriter("key2");
856
857 Thread.sleep(3000);
858
859 assertEquals(4, writer.getWriterEvents().size());
860 assertTrue(writer.getWriteCount().containsKey("key1"));
861 assertTrue(writer.getWriteCount().containsKey("key2"));
862 assertFalse(writer.getWriteCount().containsKey("key3"));
863 assertFalse(writer.getDeleteCount().containsKey("key1"));
864 assertFalse(writer.getDeleteCount().containsKey("key2"));
865 assertFalse(writer.getDeleteCount().containsKey("key3"));
866
867 Thread.sleep(3000);
868
869 assertEquals(9, writer.getWriterEvents().size());
870 assertEquals(4, (long) writer.getWriteCount().get("key1"));
871 assertEquals(4, (long) writer.getWriteCount().get("key2"));
872 assertEquals(1, (long) writer.getWriteCount().get("key3"));
873 assertFalse(writer.getDeleteCount().containsKey("key1"));
874 assertFalse(writer.getDeleteCount().containsKey("key2"));
875 assertFalse(writer.getDeleteCount().containsKey("key3"));
876
877 Thread.sleep(5000);
878
879 assertEquals(10, writer.getWriterEvents().size());
880 assertEquals(4, (long) writer.getWriteCount().get("key1"));
881 assertEquals(4, (long) writer.getWriteCount().get("key2"));
882 assertEquals(1, (long) writer.getWriteCount().get("key3"));
883 assertFalse(writer.getDeleteCount().containsKey("key1"));
884 assertEquals(1, (long) writer.getDeleteCount().get("key2"));
885 assertFalse(writer.getDeleteCount().containsKey("key3"));
886 }
887
888 @Test
889 public void testWriteBehindRateLimitBatched() throws InterruptedException {
890 Cache cache = new Cache(
891 new CacheConfiguration("writeBehindRetryWithDelayBatched", 100)
892 .cacheWriter(new CacheWriterConfiguration()
893 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
894 .minWriteDelay(1)
895 .writeBatching(true)
896 .writeBatchSize(10)
897 .rateLimitPerSecond(5)));
898 TestCacheWriter writer = new TestCacheWriter(new Properties());
899 cache.registerCacheWriter(writer);
900
901 CacheManager.getInstance().addCache(cache);
902 assertTrue(writer.isInitialized());
903 assertEquals(0, writer.getWrittenElements().size());
904
905 for (int i = 0; i < 30; i++) {
906 cache.putWithWriter(new Element("key" + i, "value" + i));
907 }
908
909 Thread.sleep(1000);
910
911 assertEquals(0, writer.getWrittenElements().size());
912
913 Thread.sleep(1500);
914
915 assertEquals(10, writer.getWrittenElements().size());
916
917 Thread.sleep(1000);
918
919 assertEquals(10, writer.getWrittenElements().size());
920
921 Thread.sleep(1000);
922
923 assertEquals(20, writer.getWrittenElements().size());
924
925 Thread.sleep(1000);
926
927 assertEquals(20, writer.getWrittenElements().size());
928
929 Thread.sleep(1000);
930
931 assertEquals(30, writer.getWrittenElements().size());
932 }
933 }