View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.ehcache.writer.writebehind;
17  
18  import net.sf.ehcache.CacheEntry;
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Element;
21  import net.sf.ehcache.config.CacheConfiguration;
22  import net.sf.ehcache.config.CacheWriterConfiguration;
23  import net.sf.ehcache.writer.CacheWriter;
24  import net.sf.ehcache.writer.writebehind.operations.DeleteOperation;
25  import net.sf.ehcache.writer.writebehind.operations.SingleOperation;
26  import net.sf.ehcache.writer.writebehind.operations.SingleOperationType;
27  import net.sf.ehcache.writer.writebehind.operations.WriteOperation;
28  
29  import java.util.ArrayList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.TreeMap;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.locks.Condition;
37  import java.util.concurrent.locks.ReentrantReadWriteLock;
38  import java.util.logging.Level;
39  import java.util.logging.Logger;
40  
41  /***
42   * An implementation of write behind with a queue that is kept in non durable local heap.
43   *
44   * @author Geert Bevin
45   * @version $Id: WriteBehindQueue.html 13146 2011-08-01 17:12:39Z oletizi $
46   */
47  class WriteBehindQueue {
48    
49      private static final Logger LOGGER = Logger.getLogger(WriteBehindQueue.class.getName());
50  
51      private static final int MS_IN_SEC = 1000;
52  
53      private final String cacheName;
54      private final long minWriteDelayMs;
55      private final long maxWriteDelayMs;
56      private final int rateLimitPerSecond;
57      private final int maxQueueSize;
58      private final boolean writeBatching;
59      private final int writeBatchSize;
60      private final int retryAttempts;
61      private final int retryAttemptDelaySeconds;
62      private final Thread processingThread;
63  
64      private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
65      private final ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
66      private final ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();
67      private final Condition queueIsFull = queueWriteLock.newCondition();
68      private final Condition queueIsEmpty = queueWriteLock.newCondition();
69      private final Condition queueIsStopped = queueWriteLock.newCondition();
70  
71      private final AtomicLong lastProcessing = new AtomicLong(System.currentTimeMillis());
72      private final AtomicLong lastWorkDone = new AtomicLong(System.currentTimeMillis());
73      private final AtomicBoolean busyProcessing = new AtomicBoolean(false);
74  
75      private volatile OperationsFilter filter;
76  
77      private List<SingleOperation> waiting = new ArrayList<SingleOperation>();
78      private CacheWriter cacheWriter;
79      private boolean stopping;
80      private boolean stopped;
81  
82      /***
83       * Create a new write behind queue.
84       *
85       * @param config the configuration for the queue
86       */
87      public WriteBehindQueue(CacheConfiguration config) {
88          this.stopping = false;
89          this.stopped = true;
90  
91          this.cacheName = config.getName();
92  
93          // making a copy of the configuration locally to ensure that it will not be changed at runtime
94          final CacheWriterConfiguration cacheWriterConfig = config.getCacheWriterConfiguration();
95          this.minWriteDelayMs = cacheWriterConfig.getMinWriteDelay() * MS_IN_SEC;
96          this.maxWriteDelayMs = cacheWriterConfig.getMaxWriteDelay() * MS_IN_SEC;
97          this.rateLimitPerSecond = cacheWriterConfig.getRateLimitPerSecond();
98          this.maxQueueSize = cacheWriterConfig.getWriteBehindMaxQueueSize();
99          this.writeBatching = cacheWriterConfig.getWriteBatching();
100         this.writeBatchSize = cacheWriterConfig.getWriteBatchSize();
101         this.retryAttempts = cacheWriterConfig.getRetryAttempts();
102         this.retryAttemptDelaySeconds = cacheWriterConfig.getRetryAttemptDelaySeconds();
103 
104         this.processingThread = new Thread(new ProcessingThread(), cacheName + " write-behind");
105         this.processingThread.setDaemon(true);
106     }
107 
108     /***
109      * {@inheritDoc}
110      */
111     public void start(CacheWriter writer) {
112         queueWriteLock.lock();
113         try {
114             if (!stopped) {
115                 throw new CacheException("The write-behind queue for cache '" + cacheName + "' can't be started more than once");
116             }
117 
118             if (processingThread.isAlive()) {
119                 throw new CacheException("The thread with name " + processingThread.getName() + " already exists and is still running");
120             }
121 
122             this.stopping = false;
123             this.stopped = false;
124             this.cacheWriter = writer;
125 
126             processingThread.start();
127         } finally {
128             queueWriteLock.unlock();
129         }
130     }
131 
132     /***
133      * {@inheritDoc}
134      */
135     public void setOperationsFilter(OperationsFilter filter) {
136         this.filter = filter;
137     }
138 
139     private long getLastProcessing() {
140         return lastProcessing.get();
141     }
142 
143     /***
144      * Thread this will continuously process the items in the queue.
145      */
146     private final class ProcessingThread implements Runnable {
147         public void run() {
148             try {
149                 while (!isStopped()) {
150 
151                     processItems();
152 
153                     queueWriteLock.lock();
154                     try {
155                         // Wait for new items or until the min write delay has expired.
156                         // Do not continue if the actual min write delay wasn't at least the one specified in the config
157                         // otherwise it's possible to create a new work list for just a couple of items in case
158                         // the item processor is very fast, causing a large amount of data churn.
159                         // However, if the write delay is expired, the processing should start immediately.
160                         try {
161                             if (minWriteDelayMs != 0) {
162                                 long delay = minWriteDelayMs;
163                                 do {
164                                     queueIsEmpty.await(delay, TimeUnit.MILLISECONDS);
165                                     long actualDelay = System.currentTimeMillis() - getLastProcessing();
166                                     if (actualDelay < minWriteDelayMs) {
167                                         delay = minWriteDelayMs - actualDelay;
168                                     } else {
169                                         delay = 0;
170                                     }
171                                 } while (delay > 0);
172                             } else {
173                                 while (!stopping && waiting.size() == 0) {
174                                     queueIsEmpty.await();
175                                 }
176                             }
177                         } catch (final InterruptedException e) {
178                             // if the wait for items is interrupted, act as if the bucket was cancelled
179                             stop();
180                             Thread.currentThread().interrupt();
181                         }
182 
183                         // If the queue is stopping and no more work is outstanding, perform the actual stop operation
184                         if (stopping && waiting.isEmpty()) {
185                             stopTheQueueThread();
186                         }
187                         queueIsFull.signal();
188                     } finally {
189                         queueWriteLock.unlock();
190                     }
191                 }
192             } finally {
193                 stopTheQueueThread();
194             }
195         }
196 
197         private void stopTheQueueThread() {
198             // Perform the actual stop operation and wake up everyone that is waiting for it.
199             queueWriteLock.lock();
200             try {
201                 stopped = true;
202                 stopping = false;
203                 queueIsStopped.signalAll();
204             } finally {
205                 queueWriteLock.unlock();
206             }
207         }
208     }
209 
210     private void processItems() throws CacheException {
211         // ensure that the items aren't already being processed
212         if (busyProcessing.get()) {
213             throw new CacheException("The write behind queue for cache '" + cacheName + "' is already busy processing.");
214         }
215 
216         // set some state related to this processing run
217         busyProcessing.set(true);
218         lastProcessing.set(System.currentTimeMillis());
219 
220         try {
221             final int workSize;
222             final List<SingleOperation> quarantined;
223 
224             queueWriteLock.lock();
225             try {
226                 // quarantine local work
227                 if (waiting.size() > 0) {
228                     quarantined = waiting;
229                     waiting = new ArrayList<SingleOperation>();
230                 } else {
231                     quarantined = null;
232                 }
233 
234                 // check if work was quarantined
235                 if (quarantined != null) {
236                     workSize = quarantined.size();
237                 } else {
238                     workSize = 0;
239                 }
240             } finally {
241                 queueWriteLock.unlock();
242             }
243 
244             // if there's no work that needs to be done, stop the processing
245             if (0 == workSize) {
246                 if (LOGGER.isLoggable(Level.FINER)) {
247                     LOGGER.finer(getThreadName() + " : processItems() : nothing to process");
248                 }
249                 return;
250             }
251 
252             try {
253                 filterQuarantined(quarantined);
254 
255                 // if the batching is enabled and work size is smaller than batch size, don't process anything as long as the
256                 // max allowed delay hasn't expired
257                 if (writeBatching && writeBatchSize > 0) {
258                     // wait for another round if the batch size hasn't been filled up yet and the max write delay
259                     // hasn't expired yet
260                     if (workSize < writeBatchSize && maxWriteDelayMs > lastProcessing.get() - lastWorkDone.get()) {
261                         waitUntilEnoughWorkItemsAvailable(quarantined, workSize);
262                         return;
263                     }
264                     // enforce the rate limit and wait for another round if too much would be processed compared to
265                     // the last time when a batch was executed
266                     if (rateLimitPerSecond > 0) {
267                         final long secondsSinceLastWorkDone = (System.currentTimeMillis() - lastWorkDone.get()) / MS_IN_SEC;
268                         final long maxBatchSizeSinceLastWorkDone = rateLimitPerSecond * secondsSinceLastWorkDone;
269                         final int batchSize = determineBatchSize(quarantined);
270                         if (batchSize > maxBatchSizeSinceLastWorkDone) {
271                             waitUntilEnoughTimeHasPassed(quarantined, batchSize, secondsSinceLastWorkDone);
272                             return;
273                         }
274                     }
275                 }
276 
277                 // set some state related to this processing run
278                 lastWorkDone.set(System.currentTimeMillis());
279 
280                 if (LOGGER.isLoggable(Level.FINER)) {
281                     LOGGER.finer(getThreadName() + " : processItems() : processing started");
282                 }
283 
284                 // process the quarantined items and remove them as they're processed
285                 processQuarantinedItems(quarantined);
286             } catch (final RuntimeException e) {
287                 reassemble(quarantined);
288                 throw e;
289             } catch (final Error e) {
290                 reassemble(quarantined);
291                 throw e;
292             }
293         } finally {
294             busyProcessing.set(false);
295 
296             if (LOGGER.isLoggable(Level.FINER)) {
297                 LOGGER.finer(getThreadName() + " : processItems() : processing finished");
298             }
299         }
300     }
301 
302     private void waitUntilEnoughWorkItemsAvailable(List<SingleOperation> quarantined, int workSize) {
303         if (LOGGER.isLoggable(Level.FINER)) {
304             LOGGER.finer(getThreadName() + " : processItems() : only " + workSize + " work items available, waiting for "
305                     + writeBatchSize + " items to fill up a batch");
306         }
307         reassemble(quarantined);
308     }
309 
310     private void waitUntilEnoughTimeHasPassed(List<SingleOperation> quarantined, int batchSize, long secondsSinceLastWorkDone) {
311         if (LOGGER.isLoggable(Level.FINER)) {
312             LOGGER.finer(getThreadName() + " : processItems() : last work was done " + secondsSinceLastWorkDone
313                     + " seconds ago, processing " + batchSize + " batch items would exceed the rate limit of "
314                     + rateLimitPerSecond + ", waiting for a while.");
315         }
316         reassemble(quarantined);
317     }
318 
319     private int determineBatchSize(List<SingleOperation> quarantined) {
320         int batchSize = writeBatchSize;
321         if (quarantined.size() < batchSize) {
322             batchSize = quarantined.size();
323         }
324         return batchSize;
325     }
326 
327     private void filterQuarantined(List<SingleOperation> quarantined) {
328         OperationsFilter operationsFilter = this.filter;
329         if (operationsFilter != null) {
330             operationsFilter.filter(quarantined, CastingOperationConverter.getInstance());
331         }
332     }
333 
334     private void processQuarantinedItems(List<SingleOperation> quarantined) {
335         if (LOGGER.isLoggable(Level.CONFIG)) {
336             LOGGER.config(getThreadName() + " : processItems() : processing " + quarantined.size() + " quarantined items");
337         }
338 
339         if (writeBatching && writeBatchSize > 0) {
340             processBatchedOperations(quarantined);
341         } else {
342             processSingleOperation(quarantined);
343 
344         }
345     }
346 
347     private void processBatchedOperations(List<SingleOperation> quarantined) {
348         final int batchSize = determineBatchSize(quarantined);
349 
350         // create batches that are separated by operation type
351         final Map<SingleOperationType, List<SingleOperation>> separatedItemsPerType =
352                 new TreeMap<SingleOperationType, List<SingleOperation>>();
353         for (int i = 0; i < batchSize; i++) {
354             final SingleOperation item = quarantined.get(i);
355 
356             if (LOGGER.isLoggable(Level.CONFIG)) {
357                 LOGGER.config(getThreadName() + " : processItems() : adding " + item + " to next batch");
358             }
359 
360             List<SingleOperation> itemsPerType = separatedItemsPerType.get(item.getType());
361             if (null == itemsPerType) {
362                 itemsPerType = new ArrayList<SingleOperation>();
363                 separatedItemsPerType.put(item.getType(), itemsPerType);
364             }
365 
366             itemsPerType.add(item);
367         }
368 
369         // execute the batch operations
370         for (List<SingleOperation> itemsPerType : separatedItemsPerType.values()) {
371             int executionsLeft = retryAttempts + 1;
372             while (executionsLeft-- > 0) {
373                 try {
374                     itemsPerType.get(0).createBatchOperation(itemsPerType).performBatchOperation(cacheWriter);
375                     break;
376                 } catch (final RuntimeException e) {
377                     if (executionsLeft <= 0) {
378                         throw e;
379                     } else {
380                         LOGGER.warning("Exception while processing write behind queue, retrying in " + retryAttemptDelaySeconds
381                                 + " seconds, " + executionsLeft + " retries left : " + e.getMessage());
382                         try {
383                             Thread.sleep(retryAttemptDelaySeconds * MS_IN_SEC);
384                         } catch (InterruptedException e1) {
385                             Thread.currentThread().interrupt();
386                             throw e;
387                         }
388                     }
389                 }
390             }
391         }
392 
393         // remove the batched items
394         for (int i = 0; i < batchSize; i++) {
395             quarantined.remove(0);
396         }
397 
398         if (!quarantined.isEmpty()) {
399             reassemble(quarantined);
400         }
401     }
402 
403     private void processSingleOperation(List<SingleOperation> quarantined) {
404         while (!quarantined.isEmpty()) {
405             // process the next item
406             final SingleOperation item = quarantined.get(0);
407             if (LOGGER.isLoggable(Level.CONFIG)) {
408                 LOGGER.config(getThreadName() + " : processItems() : processing " + item);
409             }
410 
411             int executionsLeft = retryAttempts + 1;
412             while (executionsLeft-- > 0) {
413                 try {
414                     item.performSingleOperation(cacheWriter);
415                     break;
416                 } catch (final RuntimeException e) {
417                     if (executionsLeft <= 0) {
418                         throw e;
419                     } else {
420                         LOGGER.warning("Exception while processing write behind queue, retrying in " + retryAttemptDelaySeconds
421                                 + " seconds, " + executionsLeft + " retries left : " + e.getMessage());
422                         try {
423                             Thread.sleep(retryAttemptDelaySeconds * MS_IN_SEC);
424                         } catch (InterruptedException e1) {
425                             Thread.currentThread().interrupt();
426                             throw e;
427                         }
428                     }
429                 }
430             }
431 
432             quarantined.remove(0);
433         }
434     }
435 
436     /***
437      * {@inheritDoc}
438      */
439     public void write(Element element) {
440         queueWriteLock.lock();
441         try {
442             waitForQueueSizeToDrop();
443             if (stopping || stopped) {
444                 throw new CacheException("The element '" + element + "' couldn't be added through the write-behind queue for cache '"
445                         + cacheName + "' since it's not started.");
446             }
447             waiting.add(new WriteOperation(element));
448             if (waiting.size() + 1 < maxQueueSize) {
449                 queueIsFull.signal();
450             }
451             queueIsEmpty.signal();
452         } finally {
453             queueWriteLock.unlock();
454         }
455     }
456 
457     private void waitForQueueSizeToDrop() {
458         if (maxQueueSize > 0) {
459             while (getQueueSize() >= maxQueueSize) {
460                 try {
461                     queueIsFull.await();
462                 } catch (InterruptedException e) {
463                     stop();
464                     Thread.currentThread().interrupt();
465                 }
466             }
467         }
468     }
469 
470     /***
471      * {@inheritDoc}
472      */
473     public void delete(CacheEntry entry) {
474         queueWriteLock.lock();
475         try {
476             waitForQueueSizeToDrop();
477             if (stopping || stopped) {
478                 throw new CacheException("The entry for key '" + entry.getKey() + "' couldn't be deleted through the write-behind "
479                         + "queue for cache '" + cacheName + "' since it's not started.");
480             }
481             waiting.add(new DeleteOperation(entry));
482             if (waiting.size() + 1 < maxQueueSize) {
483                 queueIsFull.signal();
484             }
485             queueIsEmpty.signal();
486         } finally {
487             queueWriteLock.unlock();
488         }
489     }
490 
491     /***
492      * {@inheritDoc}
493      */
494     public void stop() throws CacheException {
495         queueWriteLock.lock();
496         try {
497             if (stopped) {
498                 return;
499             }
500 
501             stopping = true;
502             queueIsEmpty.signal();
503             while (!stopped) {
504                 queueIsStopped.await();
505             }
506         } catch (InterruptedException e) {
507             Thread.currentThread().interrupt();
508             throw new CacheException(e);
509         } finally {
510             queueWriteLock.unlock();
511         }
512     }
513 
514     /***
515      * Gets the best estimate for items in the queue still awaiting processing.
516      * Not including elements currently processed
517      * @return the amount of elements still awaiting processing.
518      */
519     public long getQueueSize() {
520         return waiting.size();
521     }
522 
523     private boolean isStopped() {
524         queueReadLock.lock();
525         try {
526             return stopped;
527         } finally {
528             queueReadLock.unlock();
529         }
530     }
531 
532     private String getThreadName() {
533         return processingThread.getName();
534     }
535 
536     private void reassemble(List<SingleOperation> quarantined) {
537         queueWriteLock.lock();
538         try {
539             if (null == quarantined) {
540                 return;
541             }
542 
543             quarantined.addAll(waiting);
544 
545             waiting = quarantined;
546 
547             queueIsEmpty.signal();
548         } finally {
549             queueWriteLock.unlock();
550         }
551     }
552 }