View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.writer;
18  
19  import static org.hamcrest.core.Is.is;
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertNull;
24  import static org.junit.Assert.assertThat;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.util.Properties;
29  import java.util.concurrent.BrokenBarrierException;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.CyclicBarrier;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.TimeoutException;
34  
35  import net.sf.ehcache.AbstractCacheTest;
36  import net.sf.ehcache.Cache;
37  import net.sf.ehcache.CacheEntry;
38  import net.sf.ehcache.CacheException;
39  import net.sf.ehcache.CacheManager;
40  import net.sf.ehcache.Element;
41  import net.sf.ehcache.Status;
42  import net.sf.ehcache.config.CacheConfiguration;
43  import net.sf.ehcache.config.CacheWriterConfiguration;
44  import net.sf.ehcache.event.CountingCacheEventListener;
45  import net.sf.ehcache.util.RetryAssert;
46  import net.sf.ehcache.writer.TestCacheWriterRetries.WriterEvent;
47  
48  import org.hamcrest.core.Is;
49  import org.junit.After;
50  import org.junit.Before;
51  import org.junit.Test;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  
55  /***
56   * Tests for a CacheWriters
57   *
58   * @author Geert Bevin
59   * @version $Id: CacheWriterTest.html 13146 2011-08-01 17:12:39Z oletizi $
60   */
61  public class CacheWriterTest extends AbstractCacheTest {
62  
63      private static final Logger LOG = LoggerFactory.getLogger(CacheWriterTest.class.getName());
64  
65      protected CacheManager manager;
66  
67      @Override
68      @Before
69      public void setUp() throws Exception {
70          manager = CacheManager.create(AbstractCacheTest.TEST_CONFIG_DIR + "ehcache-writer.xml");
71      }
72  
73      @Override
74      @After
75      public void tearDown() throws Exception {
76          if (!manager.getStatus().equals(Status.STATUS_SHUTDOWN)) {
77              manager.shutdown();
78          }
79      }
80  
81      @Test
82      public void testWriteThroughXml() {
83          Cache cache = manager.getCache("writeThroughCacheXml");
84          assertNotNull(cache.getRegisteredCacheWriter());
85  
86          TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
87          assertTrue(writer.isInitialized());
88  
89          Element el1 = new Element("key1", "value1");
90          Element el2 = new Element("key2", "value2");
91          Element el3 = new Element("key3", "value3");
92          assertEquals(0, writer.getWrittenElements().size());
93          cache.putWithWriter(el1);
94          assertEquals(1, writer.getWrittenElements().size());
95          cache.putWithWriter(el2);
96          assertEquals(2, writer.getWrittenElements().size());
97          cache.putWithWriter(el3);
98          assertEquals(3, writer.getWrittenElements().size());
99          assertEquals("value1", writer.getWrittenElements().get("key1").getValue());
100         assertEquals("value2", writer.getWrittenElements().get("key2").getValue());
101         assertEquals("value3", writer.getWrittenElements().get("key3").getValue());
102         cache.removeWithWriter(el2.getKey());
103         assertEquals(3, writer.getWrittenElements().size());
104         assertNotNull(writer.getWrittenElements().get("key1"));
105         assertNotNull(writer.getWrittenElements().get("key2"));
106         assertNotNull(writer.getWrittenElements().get("key3"));
107         assertEquals(1, writer.getDeletedElements().size());
108         assertTrue(writer.getDeletedElements().containsKey("key2"));
109     }
110 
111     @Test
112     public void testWriteThroughXmlProperties() {
113         Cache cache = manager.getCache("writeThroughCacheXmlProperties");
114         assertNotNull(cache.getRegisteredCacheWriter());
115 
116         TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
117         assertTrue(writer.isInitialized());
118 
119         Element el1 = new Element("key1", "value1");
120         Element el2 = new Element("key2", "value2");
121         Element el3 = new Element("key3", "value3");
122         assertEquals(0, writer.getWrittenElements().size());
123         cache.putWithWriter(el1);
124         assertEquals(1, writer.getWrittenElements().size());
125         cache.putWithWriter(el2);
126         assertEquals(2, writer.getWrittenElements().size());
127         cache.putWithWriter(el3);
128         assertEquals(3, writer.getWrittenElements().size());
129         assertNull(writer.getWrittenElements().get("key1"));
130         assertNull(writer.getWrittenElements().get("key2"));
131         assertNull(writer.getWrittenElements().get("key3"));
132         assertEquals("value1", writer.getWrittenElements().get("prekey1suff").getValue());
133         assertEquals("value2", writer.getWrittenElements().get("prekey2suff").getValue());
134         assertEquals("value3", writer.getWrittenElements().get("prekey3suff").getValue());
135         cache.removeWithWriter(el1.getKey());
136         assertEquals(3, writer.getWrittenElements().size());
137         assertNotNull(writer.getWrittenElements().get("prekey1suff"));
138         assertNotNull(writer.getWrittenElements().get("prekey2suff"));
139         assertNotNull(writer.getWrittenElements().get("prekey3suff"));
140         assertEquals(1, writer.getDeletedElements().size());
141         assertTrue(writer.getDeletedElements().containsKey("prekey1suff"));
142     }
143 
144     @Test
145     public void testWriteThroughJavaRegistration() {
146         Cache cache = manager.getCache("writeThroughCacheJavaRegistration");
147         assertNull(cache.getRegisteredCacheWriter());
148 
149         TestCacheWriter writer = new TestCacheWriter(new Properties());
150         cache.registerCacheWriter(writer);
151         assertTrue(writer.isInitialized());
152 
153         Element el1 = new Element("key1", "value1");
154         Element el2 = new Element("key2", "value2");
155         Element el3 = new Element("key3", "value3");
156         Element el4 = new Element("key4", "value4");
157         assertEquals(0, writer.getWrittenElements().size());
158         cache.putWithWriter(el1);
159         assertEquals(1, writer.getWrittenElements().size());
160         cache.putWithWriter(el2);
161         assertEquals(2, writer.getWrittenElements().size());
162         cache.putWithWriter(el3);
163         assertEquals(3, writer.getWrittenElements().size());
164         cache.putWithWriter(el4);
165         assertEquals(4, writer.getWrittenElements().size());
166         assertEquals("value1", writer.getWrittenElements().get("key1").getValue());
167         assertEquals("value2", writer.getWrittenElements().get("key2").getValue());
168         assertEquals("value3", writer.getWrittenElements().get("key3").getValue());
169         assertEquals("value4", writer.getWrittenElements().get("key4").getValue());
170         cache.removeWithWriter(el1.getKey());
171         cache.removeWithWriter(el2.getKey());
172         cache.removeWithWriter(el3.getKey());
173         assertEquals(4, writer.getWrittenElements().size());
174         assertNotNull(writer.getWrittenElements().get("key1"));
175         assertNotNull(writer.getWrittenElements().get("key2"));
176         assertNotNull(writer.getWrittenElements().get("key3"));
177         assertNotNull(writer.getWrittenElements().get("key4"));
178         assertEquals(3, writer.getDeletedElements().size());
179         assertTrue(writer.getDeletedElements().containsKey("key1"));
180         assertTrue(writer.getDeletedElements().containsKey("key2"));
181         assertTrue(writer.getDeletedElements().containsKey("key3"));
182     }
183 
184     @Test
185     public void testWriteThroughNotifyListeners() {
186         Cache cache = new Cache(new CacheConfiguration("writeThroughCacheOnly", 10)
187                 .cacheEventListenerFactory(new CacheConfiguration.CacheEventListenerFactoryConfiguration()
188                         .className("net.sf.ehcache.event.CountingCacheEventListenerFactory")));
189         assertNull(cache.getRegisteredCacheWriter());
190 
191         CacheManager.getInstance().addCache(cache);
192 
193         TestCacheWriterException writer = new TestCacheWriterException();
194         cache.registerCacheWriter(writer);
195         assertTrue(writer.isInitialized());
196 
197         // without listeners notification
198         cache.getCacheConfiguration().getCacheWriterConfiguration().setNotifyListenersOnException(false);
199 
200         CountingCacheEventListener.resetCounters();
201 
202         try {
203             cache.putWithWriter(new Element("key1", "value1"));
204             fail("Expected UnsupportedOperationException");
205         } catch (UnsupportedOperationException e) {
206             // expected
207             assertEquals(0, CountingCacheEventListener.getCacheElementsPut(cache).size());
208         } catch(Throwable e) {
209             fail("Didn't expect a " + e.getClass().getName());
210         }
211 
212         CountingCacheEventListener.resetCounters();
213 
214         try {
215             cache.putWithWriter(new Element("key1", "value1"));
216             fail("Expected UnsupportedOperationException");
217         } catch (UnsupportedOperationException e) {
218             // expected
219             assertEquals(0, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
220         } catch(Throwable e) {
221             fail("Didn't expect a " + e.getClass().getName());
222         }
223 
224         CountingCacheEventListener.resetCounters();
225 
226         try {
227             cache.removeWithWriter("key1");
228             fail("Expected UnsupportedOperationException");
229         } catch (UnsupportedOperationException e) {
230             // expected
231             assertEquals(0, CountingCacheEventListener.getCacheElementsRemoved(cache).size());
232         } catch(Throwable e) {
233             fail("Didn't expect a " + e.getClass().getName());
234         }
235 
236         // with listeners notification
237         cache.getCacheConfiguration().getCacheWriterConfiguration().setNotifyListenersOnException(true);
238 
239         CountingCacheEventListener.resetCounters();
240 
241         try {
242             cache.putWithWriter(new Element("key1", "value1"));
243             fail("Expected UnsupportedOperationException");
244         } catch (UnsupportedOperationException e) {
245             // expected
246             assertEquals(0, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
247             assertEquals(1, CountingCacheEventListener.getCacheElementsPut(cache).size());
248         } catch(Throwable e) {
249             fail("Didn't expect a " + e.getClass().getName());
250         }
251 
252         CountingCacheEventListener.resetCounters();
253 
254         try {
255             assertNotNull(cache.get("key1"));
256             cache.putWithWriter(new Element("key1", "value1"));
257             fail("Expected UnsupportedOperationException");
258         } catch (UnsupportedOperationException e) {
259             // expected
260             assertEquals(0, CountingCacheEventListener.getCacheElementsPut(cache).size());
261             assertEquals(1, CountingCacheEventListener.getCacheElementsUpdated(cache).size());
262         } catch(Throwable e) {
263             fail("Didn't expect a " + e.getClass().getName());
264         }
265 
266         CountingCacheEventListener.resetCounters();
267 
268         try {
269             cache.removeWithWriter("key1");
270             fail("Expected UnsupportedOperationException");
271         } catch (UnsupportedOperationException e) {
272             // expected
273             assertEquals(1, CountingCacheEventListener.getCacheElementsRemoved(cache).size());
274         }
275     }
276 
277     @Test
278     public void testWriteBehindSolelyJava() throws InterruptedException {
279         Cache cache = new Cache(
280                 new CacheConfiguration("writeBehindSolelyJava", 10)
281                         .cacheWriter(new CacheWriterConfiguration()
282                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
283                                 .minWriteDelay(2)
284                                 .maxWriteDelay(8)
285                                 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
286                                         .className("net.sf.ehcache.writer.TestCacheWriterFactory"))));
287         assertNotNull(cache.getRegisteredCacheWriter());
288 
289         CacheManager.getInstance().addCache(cache);
290         TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
291         assertTrue(writer.isInitialized());
292         assertEquals(0, writer.getWrittenElements().size());
293 
294         Element el1 = new Element("key1", "value1");
295         Element el2 = new Element("key2", "value2");
296         Element el3 = new Element("key3", "value3");
297         cache.putWithWriter(el1);
298         cache.putWithWriter(el2);
299         cache.putWithWriter(el3);
300 
301         Thread.sleep(3000);
302 
303         assertEquals(3, writer.getWrittenElements().size());
304         assertNotNull(writer.getWrittenElements().get("key1"));
305         assertNotNull(writer.getWrittenElements().get("key2"));
306         assertNotNull(writer.getWrittenElements().get("key3"));
307         assertEquals(0, writer.getDeletedElements().size());
308 
309         cache.removeWithWriter(el2.getKey());
310         cache.removeWithWriter(el3.getKey());
311 
312         assertEquals(3, writer.getWrittenElements().size());
313         assertEquals(0, writer.getDeletedElements().size());
314 
315         Thread.sleep(3000);
316 
317         assertEquals(3, writer.getWrittenElements().size());
318         assertEquals(2, writer.getDeletedElements().size());
319         assertTrue(writer.getDeletedElements().containsKey("key2"));
320         assertTrue(writer.getDeletedElements().containsKey("key3"));
321     }
322 
323     @Test
324     public void testWriteBehindStopWaitsForEmptyQueue() throws InterruptedException {
325         Cache cache = new Cache(
326                 new CacheConfiguration("writeBehindSolelyJavaStopWaitsForEmptyQueue", 10)
327                         .cacheWriter(new CacheWriterConfiguration()
328                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
329                                 .minWriteDelay(2)
330                                 .maxWriteDelay(8)));
331         TestCacheWriterSlow writer = new TestCacheWriterSlow();
332         cache.registerCacheWriter(writer);
333         assertNotNull(cache.getRegisteredCacheWriter());
334 
335         CacheManager.getInstance().addCache(cache);
336         assertTrue(writer.isInitialized());
337         assertEquals(0, writer.getWrittenElements().size());
338 
339         Element el1 = new Element("key1", "value1");
340         Element el2 = new Element("key2", "value2");
341         Element el3 = new Element("key3", "value3");
342         cache.putWithWriter(el1);
343         cache.putWithWriter(el2);
344         cache.putWithWriter(el3);
345         cache.removeWithWriter(el2.getKey());
346         cache.removeWithWriter(el3.getKey());
347         cache.dispose();
348 
349         Thread.sleep(3000);
350 
351         assertEquals(3, writer.getWrittenElements().size());
352         assertNotNull(writer.getWrittenElements().get("key1"));
353         assertNotNull(writer.getWrittenElements().get("key2"));
354         assertNotNull(writer.getWrittenElements().get("key3"));
355         assertEquals(3, writer.getWrittenElements().size());
356         assertEquals(2, writer.getDeletedElements().size());
357         assertTrue(writer.getDeletedElements().containsKey("key2"));
358         assertTrue(writer.getDeletedElements().containsKey("key3"));
359     }
360 
361     @Test
362     public void testWriteBehindBlocksWhenFull() throws Exception {
363         final Cache cache = new Cache(
364                 new CacheConfiguration("writeBehindBlocksWhenFull", 10)
365                         .cacheWriter(new CacheWriterConfiguration()
366                              .writeBehindMaxQueueSize(3)
367                             .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)));
368         final CyclicBarrier barrier = new CyclicBarrier(2);
369         TestCacheWriter writer = new TestCacheWriter(new Properties()) {
370 
371             @Override
372             public void write(final Element element) throws CacheException {
373                 if(super.getWrittenElements().size() == 0) {
374                     try {
375                         barrier.await();
376                         barrier.await(5, TimeUnit.SECONDS);
377                     } catch (TimeoutException e) {
378                         // expected
379                     } catch (Exception e) {
380                         throw new RuntimeException(e);
381                     }
382                 }
383                 super.write(element);
384             }
385 
386             @Override
387             public void delete(final CacheEntry entry) throws CacheException {
388                 super.delete(entry);
389             }
390         };
391         cache.registerCacheWriter(writer);
392         assertNotNull(cache.getRegisteredCacheWriter());
393 
394         CacheManager.getInstance().addCache(cache);
395         assertTrue(writer.isInitialized());
396         assertEquals(0, writer.getWrittenElements().size());
397 
398         Element el1 = new Element("key1", "value1");
399         Element el2 = new Element("key2", "value2");
400         Element el3 = new Element("key3", "value3");
401         cache.putWithWriter(el1);
402         barrier.await();
403         long before = System.nanoTime();
404         cache.putWithWriter(el2);
405         assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
406         before = System.nanoTime();
407         cache.putWithWriter(el3);
408         assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
409         before = System.nanoTime();
410         cache.removeWithWriter(el2.getKey());
411         assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS), is(0L));
412         before = System.nanoTime();
413         cache.removeWithWriter(el3.getKey());
414         assertThat(TimeUnit.SECONDS.convert(System.nanoTime() - before, TimeUnit.NANOSECONDS) > 4, is(true));
415         try {
416             barrier.await();
417             fail("this should have failed!");
418         } catch (BrokenBarrierException e) {
419             // expected!
420         }
421 
422         cache.dispose();
423 
424         Thread.sleep(500);
425 
426         assertEquals(3, writer.getWrittenElements().size());
427         assertNotNull(writer.getWrittenElements().get("key1"));
428         assertNotNull(writer.getWrittenElements().get("key2"));
429         assertNotNull(writer.getWrittenElements().get("key3"));
430         assertEquals(3, writer.getWrittenElements().size());
431         assertEquals(2, writer.getDeletedElements().size());
432         assertTrue(writer.getDeletedElements().containsKey("key2"));
433         assertTrue(writer.getDeletedElements().containsKey("key3"));
434     }
435 
436     @Test
437     public void testWriteBehindBatched() throws InterruptedException {
438         Cache cache = new Cache(
439                 new CacheConfiguration("writeBehindBatched", 10)
440                         .cacheWriter(new CacheWriterConfiguration()
441                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
442                                 .minWriteDelay(1)
443                                 .maxWriteDelay(4)
444                                 .writeBatching(true)
445                                 .writeBatchSize(10)
446                                 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
447                                         .className("net.sf.ehcache.writer.TestCacheWriterFactory")
448                                         .properties("key.prefix=pre2; key.suffix=suff2")
449                                         .propertySeparator(";"))));
450         assertNotNull(cache.getRegisteredCacheWriter());
451 
452         CacheManager.getInstance().addCache(cache);
453         TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
454         assertTrue(writer.isInitialized());
455         assertEquals(0, writer.getWrittenElements().size());
456 
457         Element el1 = new Element("key1", "value1");
458         Element el2 = new Element("key2", "value2");
459         Element el3 = new Element("key3", "value3");
460         cache.putWithWriter(el1);
461         cache.putWithWriter(el2);
462         cache.putWithWriter(el3);
463 
464         Thread.sleep(3000);
465 
466         assertEquals(0, writer.getWrittenElements().size());
467 
468         Thread.sleep(2000);
469 
470         assertEquals(3, writer.getWrittenElements().size());
471         assertFalse(writer.getWrittenElements().containsKey("key1"));
472         assertFalse(writer.getWrittenElements().containsKey("key2"));
473         assertFalse(writer.getWrittenElements().containsKey("key3"));
474         assertFalse(writer.getWrittenElements().containsKey("pre2key1suff2"));
475         assertFalse(writer.getWrittenElements().containsKey("pre2key2suff2"));
476         assertFalse(writer.getWrittenElements().containsKey("pre2key3suff2"));
477         assertTrue(writer.getWrittenElements().containsKey("pre2key1suff2-batched"));
478         assertTrue(writer.getWrittenElements().containsKey("pre2key2suff2-batched"));
479         assertTrue(writer.getWrittenElements().containsKey("pre2key3suff2-batched"));
480 
481         assertEquals(0, writer.getDeletedElements().size());
482 
483         cache.removeWithWriter(el2.getKey());
484         cache.removeWithWriter(el3.getKey());
485 
486         Thread.sleep(2000);
487 
488         assertEquals(3, writer.getWrittenElements().size());
489         assertEquals(0, writer.getDeletedElements().size());
490 
491         Thread.sleep(3000);
492 
493         assertEquals(3, writer.getWrittenElements().size());
494         assertEquals(2, writer.getDeletedElements().size());
495         assertTrue(writer.getDeletedElements().containsKey("pre2key2suff2-batched"));
496         assertTrue(writer.getDeletedElements().containsKey("pre2key3suff2-batched"));
497     }
498 
499     @Test
500     public void testWriteBehindBatchedCoalescing() throws InterruptedException {
501         Cache cache = new Cache(
502                 new CacheConfiguration("writeBehindBatchedCoalescing", 10)
503                         .cacheWriter(new CacheWriterConfiguration()
504                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
505                                 .minWriteDelay(1)
506                                 .maxWriteDelay(1)
507                                 .writeBatching(true)
508                                 .writeBatchSize(10)
509                                 .writeCoalescing(true)
510                                 .cacheWriterFactory(new CacheWriterConfiguration.CacheWriterFactoryConfiguration()
511                                         .className("net.sf.ehcache.writer.TestCacheWriterFactory"))));
512         assertNotNull(cache.getRegisteredCacheWriter());
513 
514         CacheManager.getInstance().addCache(cache);
515         TestCacheWriter writer = (TestCacheWriter) cache.getRegisteredCacheWriter();
516         assertTrue(writer.isInitialized());
517         assertEquals(0, writer.getWrittenElements().size());
518 
519         Element el1 = new Element("key1", "value1");
520         Element el2a = new Element("key2", "value2a");
521         Element el2b = new Element("key2", "value2b");
522         Element el3 = new Element("key3", "value3");
523         cache.putWithWriter(el1);
524         cache.putWithWriter(el2a);
525         cache.putWithWriter(el3);
526         cache.putWithWriter(el2b);
527         cache.removeWithWriter("key1");
528 
529         Thread.sleep(2000);
530 
531         assertEquals(2, writer.getWrittenElements().size());
532 
533         assertFalse(writer.getWrittenElements().containsKey("key1-batched"));
534         assertTrue(writer.getWrittenElements().containsKey("key2-batched"));
535         assertTrue(writer.getWrittenElements().containsKey("key3-batched"));
536         assertEquals("value2b", writer.getWrittenElements().get("key2-batched").getObjectValue());
537         assertEquals("value3", writer.getWrittenElements().get("key3-batched").getObjectValue());
538 
539         assertEquals(1, writer.getDeletedElements().size());
540 
541         assertTrue(writer.getDeletedElements().containsKey("key1-batched"));
542     }
543 
544     @Test
545     public void testWriteBehindExceptionWithoutRetrying() throws InterruptedException {
546         Cache cache = new Cache(
547                 new CacheConfiguration("writeBehindExceptionWithoutRetrying", 10)
548                         .cacheWriter(new CacheWriterConfiguration()
549                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
550                                 .minWriteDelay(0)
551                                 .maxWriteDelay(0)
552                                 .retryAttempts(0)
553                                 .retryAttemptDelaySeconds(0)));
554         TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
555         cache.registerCacheWriter(writer);
556 
557         CacheManager.getInstance().addCache(cache);
558         assertTrue(writer.isInitialized());
559         assertEquals(0, writer.getWriterEvents().size());
560 
561         cache.putWithWriter(new Element("key1", "value1"));
562       try {
563         cache.putWithWriter(new Element("key2", "value1"));
564         cache.putWithWriter(new Element("key3", "value1"));
565         cache.removeWithWriter("key2");
566       } catch (CacheException e) {
567         System.err.println("We had an error trying to write: " + e.getMessage()
568                            + "\nBut that's OK: the writer got shutdown faster than we could *WithWriter");
569       }
570 
571       Thread.sleep(2000);
572 
573         assertEquals(0, writer.getWriterEvents().size());
574     }
575 
576     @Test
577     public void testWriteBehindRetryWithoutExceptions() throws InterruptedException {
578         Cache cache = new Cache(
579                 new CacheConfiguration("writeBehindRetryWithoutExceptions", 10)
580                         .cacheWriter(new CacheWriterConfiguration()
581                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
582                                 .minWriteDelay(0)
583                                 .maxWriteDelay(0)
584                                 .retryAttempts(3)
585                                 .retryAttemptDelaySeconds(0)));
586         TestCacheWriterRetries writer = new TestCacheWriterRetries(0);
587         cache.registerCacheWriter(writer);
588 
589         CacheManager.getInstance().addCache(cache);
590         assertTrue(writer.isInitialized());
591         assertEquals(0, writer.getWriterEvents().size());
592 
593         cache.putWithWriter(new Element("key1", "value1"));
594         cache.putWithWriter(new Element("key2", "value1"));
595         cache.putWithWriter(new Element("key3", "value1"));
596         cache.removeWithWriter("key2");
597 
598         Thread.sleep(2000);
599 
600         assertEquals(4, writer.getWriterEvents().size());
601         assertEquals(1, (long) writer.getWriteCount().get("key1"));
602         assertEquals(1, (long) writer.getWriteCount().get("key2"));
603         assertEquals(1, (long) writer.getWriteCount().get("key3"));
604         assertFalse(writer.getDeleteCount().containsKey("key1"));
605         assertEquals(1, (long) writer.getDeleteCount().get("key2"));
606         assertFalse(writer.getDeleteCount().containsKey("key3"));
607     }
608 
609     @Test
610     public void testWriteBehindRetryWithoutDelay() throws InterruptedException {
611         Cache cache = new Cache(
612                 new CacheConfiguration("writeBehindRetryWithoutDelay", 10)
613                         .cacheWriter(new CacheWriterConfiguration()
614                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
615                                 .minWriteDelay(0)
616                                 .maxWriteDelay(0)
617                                 .retryAttempts(3)
618                                 .retryAttemptDelaySeconds(0)));
619         TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
620         cache.registerCacheWriter(writer);
621 
622         CacheManager.getInstance().addCache(cache);
623         assertTrue(writer.isInitialized());
624         assertEquals(0, writer.getWriterEvents().size());
625 
626         cache.putWithWriter(new Element("key1", "value1"));
627         cache.putWithWriter(new Element("key2", "value2"));
628         cache.putWithWriter(new Element("key3", "value3"));
629         cache.removeWithWriter("key2");
630 
631         Thread.sleep(2000);
632 
633         assertEquals(4, writer.getWriterEvents().size());
634         assertEquals(1, (long) writer.getWriteCount().get("key1"));
635         assertEquals(1, (long) writer.getWriteCount().get("key2"));
636         assertEquals(1, (long) writer.getWriteCount().get("key3"));
637         assertEquals(1, (long) writer.getDeleteCount().get("key2"));
638     }
639 
640     @Test
641     public void testWriteBehindRetryWithDelay() throws InterruptedException {
642         final long tolerance = TimeUnit.MILLISECONDS.toNanos(200);
643 
644         Cache cache = new Cache(
645                 new CacheConfiguration("writeBehindRetryWithDelay", 10)
646                         .cacheWriter(new CacheWriterConfiguration()
647                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
648                                 .minWriteDelay(0)
649                                 .maxWriteDelay(0)
650                                 .retryAttempts(3)
651                                 .retryAttemptDelaySeconds(1)));
652         final TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
653         cache.registerCacheWriter(writer);
654 
655         CacheManager.getInstance().addCache(cache);
656         assertTrue(writer.isInitialized());
657         assertEquals(0, writer.getWriterEvents().size());
658 
659         long start = System.nanoTime();
660         cache.putWithWriter(new Element("key1", "value1"));
661         cache.putWithWriter(new Element("key2", "value2"));
662         cache.putWithWriter(new Element("key3", "value3"));
663         cache.removeWithWriter("key2");
664 
665         RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Integer>() {
666             public Integer call() throws Exception {
667                 return writer.getWriterEvents().size();
668             }
669         }, Is.is(4));
670 
671         {
672             WriterEvent event = writer.getWriterEvents().get(0);
673             assertEquals(0, event.getWrittenSize());
674             assertEquals(0, event.getWriteCount("key1"));
675             assertEquals(0, event.getWriteCount("key2"));
676             assertEquals(0, event.getWriteCount("key3"));
677             assertEquals(0, event.getDeleteCount("key2"));
678             assertEquals("key1", event.getAddedElement().getObjectKey());
679             long time = event.getTime() - start;
680             long expected = TimeUnit.SECONDS.toNanos(3);
681             assertTrue("Write-1 time : " + time, time >= (expected - tolerance));
682         }
683 
684         {
685             WriterEvent event = writer.getWriterEvents().get(1);
686             assertEquals(1, event.getWrittenSize());
687             assertEquals(1, event.getWriteCount("key1"));
688             assertEquals(0, event.getWriteCount("key2"));
689             assertEquals(0, event.getWriteCount("key3"));
690             assertEquals(0, event.getDeleteCount("key2"));
691             assertEquals("key2", event.getAddedElement().getObjectKey());
692             long time = event.getTime() - start;
693             long expected = TimeUnit.SECONDS.toNanos(6);
694             assertTrue("Write-2 time : " + time, time >= (expected - tolerance));
695         }
696 
697         {
698             WriterEvent event = writer.getWriterEvents().get(2);
699             assertEquals(2, event.getWrittenSize());
700             assertEquals(1, event.getWriteCount("key1"));
701             assertEquals(1, event.getWriteCount("key2"));
702             assertEquals(0, event.getWriteCount("key3"));
703             assertEquals(0, event.getDeleteCount("key2"));
704             assertEquals("key3", event.getAddedElement().getObjectKey());
705             long time = event.getTime() - start;
706             long expected = TimeUnit.SECONDS.toNanos(9);
707             assertTrue("Write-3 time : " + time, time >= (expected - tolerance));
708         }
709 
710         {
711             WriterEvent event = writer.getWriterEvents().get(3);
712             assertEquals(3, event.getWrittenSize());
713             assertEquals(1, event.getWriteCount("key1"));
714             assertEquals(1, event.getWriteCount("key2"));
715             assertEquals(1, event.getWriteCount("key3"));
716             assertEquals(0, event.getDeleteCount("key2"));
717             assertEquals("key2", event.getRemovedKey());
718             long time = event.getTime() - start;
719             long expected = TimeUnit.SECONDS.toNanos(12);
720             assertTrue("Delete-2 time : " + time, time >= (expected - tolerance));
721         }
722 
723         assertEquals(4, writer.getWriterEvents().size());
724         assertEquals(1, writer.getWriteCount().get("key1").intValue());
725         assertEquals(1, writer.getWriteCount().get("key2").intValue());
726         assertEquals(1, writer.getWriteCount().get("key3").intValue());
727         assertEquals(1, writer.getDeleteCount().get("key2").intValue());
728     }
729 
730     @Test
731     public void testWriteBehindExceptionWithoutRetryingBatched() throws InterruptedException {
732         Cache cache = new Cache(
733                 new CacheConfiguration("writeBehindExceptionWithoutRetryingBatched", 10)
734                         .cacheWriter(new CacheWriterConfiguration()
735                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
736                                 .minWriteDelay(1)
737                                 .maxWriteDelay(1)
738                                 .writeBatching(true)
739                                 .writeBatchSize(10)
740                                 .retryAttempts(0)
741                                 .retryAttemptDelaySeconds(0)));
742         TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
743         cache.registerCacheWriter(writer);
744 
745         CacheManager.getInstance().addCache(cache);
746         assertTrue(writer.isInitialized());
747         assertEquals(0, writer.getWriterEvents().size());
748 
749         cache.putWithWriter(new Element("key1", "value1"));
750         cache.putWithWriter(new Element("key2", "value1"));
751         cache.putWithWriter(new Element("key3", "value1"));
752         cache.removeWithWriter("key2");
753 
754         Thread.sleep(2000);
755 
756         assertEquals(2, writer.getWriterEvents().size());
757         assertEquals(1, (long) writer.getWriteCount().get("key1"));
758         assertEquals(1, (long) writer.getWriteCount().get("key2"));
759         assertFalse(writer.getWriteCount().containsKey("key3"));
760         assertFalse(writer.getDeleteCount().containsKey("key1"));
761         assertFalse(writer.getDeleteCount().containsKey("key2"));
762         assertFalse(writer.getDeleteCount().containsKey("key3"));
763     }
764 
765     @Test
766     public void testWriteBehindRetryWithoutExceptionsBatched() throws InterruptedException {
767         Cache cache = new Cache(
768                 new CacheConfiguration("writeBehindRetryWithoutExceptionsBatched", 10)
769                         .cacheWriter(new CacheWriterConfiguration()
770                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
771                                 .minWriteDelay(1)
772                                 .maxWriteDelay(1)
773                                 .writeBatching(true)
774                                 .writeBatchSize(10)
775                                 .retryAttempts(3)
776                                 .retryAttemptDelaySeconds(0)));
777         TestCacheWriterRetries writer = new TestCacheWriterRetries(0);
778         cache.registerCacheWriter(writer);
779 
780         CacheManager.getInstance().addCache(cache);
781         assertTrue(writer.isInitialized());
782         assertEquals(0, writer.getWriterEvents().size());
783 
784         cache.putWithWriter(new Element("key1", "value1"));
785         cache.putWithWriter(new Element("key2", "value2"));
786         cache.putWithWriter(new Element("key3", "value3"));
787         cache.removeWithWriter("key2");
788 
789         Thread.sleep(2000);
790 
791         assertEquals(4, writer.getWriterEvents().size());
792         assertEquals(1, (long) writer.getWriteCount().get("key1"));
793         assertEquals(1, (long) writer.getWriteCount().get("key2"));
794         assertEquals(1, (long) writer.getWriteCount().get("key3"));
795         assertFalse(writer.getDeleteCount().containsKey("key1"));
796         assertEquals(1, (long) writer.getDeleteCount().get("key2"));
797         assertFalse(writer.getDeleteCount().containsKey("key3"));
798     }
799 
800     @Test
801     public void testWriteBehindRetryWithoutDelayBatched() throws InterruptedException {
802         Cache cache = new Cache(
803                 new CacheConfiguration("writeBehindRetryWithoutDelayBatched", 10)
804                         .cacheWriter(new CacheWriterConfiguration()
805                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
806                                 .minWriteDelay(1)
807                                 .maxWriteDelay(1)
808                                 .writeBatching(true)
809                                 .writeBatchSize(10)
810                                 .retryAttempts(3)
811                                 .retryAttemptDelaySeconds(0)));
812         TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
813         cache.registerCacheWriter(writer);
814 
815         CacheManager.getInstance().addCache(cache);
816         assertTrue(writer.isInitialized());
817         assertEquals(0, writer.getWriterEvents().size());
818 
819         cache.putWithWriter(new Element("key1", "value1"));
820         cache.putWithWriter(new Element("key2", "value2"));
821         cache.putWithWriter(new Element("key3", "value3"));
822         cache.removeWithWriter("key2");
823 
824         Thread.sleep(2000);
825 
826         assertEquals(10, writer.getWriterEvents().size());
827         assertEquals(4, (long) writer.getWriteCount().get("key1"));
828         assertEquals(4, (long) writer.getWriteCount().get("key2"));
829         assertEquals(1, (long) writer.getWriteCount().get("key3"));
830         assertEquals(1, (long) writer.getDeleteCount().get("key2"));
831     }
832 
833     @Test
834     public void testWriteBehindRetryWithDelayBatched() throws InterruptedException {
835         Cache cache = new Cache(
836                 new CacheConfiguration("writeBehindRetryWithDelayBatched", 10)
837                         .cacheWriter(new CacheWriterConfiguration()
838                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
839                                 .minWriteDelay(1)
840                                 .maxWriteDelay(1)
841                                 .writeBatching(true)
842                                 .writeBatchSize(10)
843                                 .retryAttempts(3)
844                                 .retryAttemptDelaySeconds(1)));
845         TestCacheWriterRetries writer = new TestCacheWriterRetries(3);
846         cache.registerCacheWriter(writer);
847 
848         CacheManager.getInstance().addCache(cache);
849         assertTrue(writer.isInitialized());
850         assertEquals(0, writer.getWriterEvents().size());
851 
852         cache.putWithWriter(new Element("key1", "value1"));
853         cache.putWithWriter(new Element("key2", "value2"));
854         cache.putWithWriter(new Element("key3", "value3"));
855         cache.removeWithWriter("key2");
856 
857         Thread.sleep(3000);
858 
859         assertEquals(4, writer.getWriterEvents().size());
860         assertTrue(writer.getWriteCount().containsKey("key1"));
861         assertTrue(writer.getWriteCount().containsKey("key2"));
862         assertFalse(writer.getWriteCount().containsKey("key3"));
863         assertFalse(writer.getDeleteCount().containsKey("key1"));
864         assertFalse(writer.getDeleteCount().containsKey("key2"));
865         assertFalse(writer.getDeleteCount().containsKey("key3"));
866 
867         Thread.sleep(3000);
868 
869         assertEquals(9, writer.getWriterEvents().size());
870         assertEquals(4, (long) writer.getWriteCount().get("key1"));
871         assertEquals(4, (long) writer.getWriteCount().get("key2"));
872         assertEquals(1, (long) writer.getWriteCount().get("key3"));
873         assertFalse(writer.getDeleteCount().containsKey("key1"));
874         assertFalse(writer.getDeleteCount().containsKey("key2"));
875         assertFalse(writer.getDeleteCount().containsKey("key3"));
876 
877         Thread.sleep(5000);
878 
879         assertEquals(10, writer.getWriterEvents().size());
880         assertEquals(4, (long) writer.getWriteCount().get("key1"));
881         assertEquals(4, (long) writer.getWriteCount().get("key2"));
882         assertEquals(1, (long) writer.getWriteCount().get("key3"));
883         assertFalse(writer.getDeleteCount().containsKey("key1"));
884         assertEquals(1, (long) writer.getDeleteCount().get("key2"));
885         assertFalse(writer.getDeleteCount().containsKey("key3"));
886     }
887 
888     @Test
889     public void testWriteBehindRateLimitBatched() throws InterruptedException {
890         Cache cache = new Cache(
891                 new CacheConfiguration("writeBehindRetryWithDelayBatched", 100)
892                         .cacheWriter(new CacheWriterConfiguration()
893                                 .writeMode(CacheWriterConfiguration.WriteMode.WRITE_BEHIND)
894                                 .minWriteDelay(1)
895                                 .writeBatching(true)
896                                 .writeBatchSize(10)
897                                 .rateLimitPerSecond(5)));
898         TestCacheWriter writer = new TestCacheWriter(new Properties());
899         cache.registerCacheWriter(writer);
900 
901         CacheManager.getInstance().addCache(cache);
902         assertTrue(writer.isInitialized());
903         assertEquals(0, writer.getWrittenElements().size());
904 
905         for (int i = 0; i < 30; i++) {
906             cache.putWithWriter(new Element("key" + i, "value" + i));
907         }
908 
909         Thread.sleep(1000);
910 
911         assertEquals(0, writer.getWrittenElements().size());
912 
913         Thread.sleep(1500);
914 
915         assertEquals(10, writer.getWrittenElements().size());
916 
917         Thread.sleep(1000);
918 
919         assertEquals(10, writer.getWrittenElements().size());
920 
921         Thread.sleep(1000);
922 
923         assertEquals(20, writer.getWrittenElements().size());
924 
925         Thread.sleep(1000);
926 
927         assertEquals(20, writer.getWrittenElements().size());
928 
929         Thread.sleep(1000);
930 
931         assertEquals(30, writer.getWrittenElements().size());
932     }
933 }