1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package net.sf.ehcache.writer.writebehind;
17
18 import net.sf.ehcache.CacheEntry;
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.Element;
21 import net.sf.ehcache.config.CacheConfiguration;
22 import net.sf.ehcache.config.CacheWriterConfiguration;
23 import net.sf.ehcache.writer.CacheWriter;
24 import net.sf.ehcache.writer.writebehind.operations.DeleteOperation;
25 import net.sf.ehcache.writer.writebehind.operations.SingleOperation;
26 import net.sf.ehcache.writer.writebehind.operations.SingleOperationType;
27 import net.sf.ehcache.writer.writebehind.operations.WriteOperation;
28
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.TreeMap;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.concurrent.locks.Condition;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38 import java.util.logging.Level;
39 import java.util.logging.Logger;
40
41 /***
42 * An implementation of write behind with a queue that is kept in non durable local heap.
43 *
44 * @author Geert Bevin
45 * @version $Id: WriteBehindQueue.html 13146 2011-08-01 17:12:39Z oletizi $
46 */
47 class WriteBehindQueue {
48
49 private static final Logger LOGGER = Logger.getLogger(WriteBehindQueue.class.getName());
50
51 private static final int MS_IN_SEC = 1000;
52
53 private final String cacheName;
54 private final long minWriteDelayMs;
55 private final long maxWriteDelayMs;
56 private final int rateLimitPerSecond;
57 private final int maxQueueSize;
58 private final boolean writeBatching;
59 private final int writeBatchSize;
60 private final int retryAttempts;
61 private final int retryAttemptDelaySeconds;
62 private final Thread processingThread;
63
64 private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
65 private final ReentrantReadWriteLock.ReadLock queueReadLock = queueLock.readLock();
66 private final ReentrantReadWriteLock.WriteLock queueWriteLock = queueLock.writeLock();
67 private final Condition queueIsFull = queueWriteLock.newCondition();
68 private final Condition queueIsEmpty = queueWriteLock.newCondition();
69 private final Condition queueIsStopped = queueWriteLock.newCondition();
70
71 private final AtomicLong lastProcessing = new AtomicLong(System.currentTimeMillis());
72 private final AtomicLong lastWorkDone = new AtomicLong(System.currentTimeMillis());
73 private final AtomicBoolean busyProcessing = new AtomicBoolean(false);
74
75 private volatile OperationsFilter filter;
76
77 private List<SingleOperation> waiting = new ArrayList<SingleOperation>();
78 private CacheWriter cacheWriter;
79 private boolean stopping;
80 private boolean stopped;
81
82 /***
83 * Create a new write behind queue.
84 *
85 * @param config the configuration for the queue
86 */
87 public WriteBehindQueue(CacheConfiguration config) {
88 this.stopping = false;
89 this.stopped = true;
90
91 this.cacheName = config.getName();
92
93
94 final CacheWriterConfiguration cacheWriterConfig = config.getCacheWriterConfiguration();
95 this.minWriteDelayMs = cacheWriterConfig.getMinWriteDelay() * MS_IN_SEC;
96 this.maxWriteDelayMs = cacheWriterConfig.getMaxWriteDelay() * MS_IN_SEC;
97 this.rateLimitPerSecond = cacheWriterConfig.getRateLimitPerSecond();
98 this.maxQueueSize = cacheWriterConfig.getWriteBehindMaxQueueSize();
99 this.writeBatching = cacheWriterConfig.getWriteBatching();
100 this.writeBatchSize = cacheWriterConfig.getWriteBatchSize();
101 this.retryAttempts = cacheWriterConfig.getRetryAttempts();
102 this.retryAttemptDelaySeconds = cacheWriterConfig.getRetryAttemptDelaySeconds();
103
104 this.processingThread = new Thread(new ProcessingThread(), cacheName + " write-behind");
105 this.processingThread.setDaemon(true);
106 }
107
108 /***
109 * {@inheritDoc}
110 */
111 public void start(CacheWriter writer) {
112 queueWriteLock.lock();
113 try {
114 if (!stopped) {
115 throw new CacheException("The write-behind queue for cache '" + cacheName + "' can't be started more than once");
116 }
117
118 if (processingThread.isAlive()) {
119 throw new CacheException("The thread with name " + processingThread.getName() + " already exists and is still running");
120 }
121
122 this.stopping = false;
123 this.stopped = false;
124 this.cacheWriter = writer;
125
126 processingThread.start();
127 } finally {
128 queueWriteLock.unlock();
129 }
130 }
131
132 /***
133 * {@inheritDoc}
134 */
135 public void setOperationsFilter(OperationsFilter filter) {
136 this.filter = filter;
137 }
138
139 private long getLastProcessing() {
140 return lastProcessing.get();
141 }
142
143 /***
144 * Thread this will continuously process the items in the queue.
145 */
146 private final class ProcessingThread implements Runnable {
147 public void run() {
148 try {
149 while (!isStopped()) {
150
151 processItems();
152
153 queueWriteLock.lock();
154 try {
155
156
157
158
159
160 try {
161 if (minWriteDelayMs != 0) {
162 long delay = minWriteDelayMs;
163 do {
164 queueIsEmpty.await(delay, TimeUnit.MILLISECONDS);
165 long actualDelay = System.currentTimeMillis() - getLastProcessing();
166 if (actualDelay < minWriteDelayMs) {
167 delay = minWriteDelayMs - actualDelay;
168 } else {
169 delay = 0;
170 }
171 } while (delay > 0);
172 } else {
173 while (!stopping && waiting.size() == 0) {
174 queueIsEmpty.await();
175 }
176 }
177 } catch (final InterruptedException e) {
178
179 stop();
180 Thread.currentThread().interrupt();
181 }
182
183
184 if (stopping && waiting.isEmpty()) {
185 stopTheQueueThread();
186 }
187 queueIsFull.signal();
188 } finally {
189 queueWriteLock.unlock();
190 }
191 }
192 } finally {
193 stopTheQueueThread();
194 }
195 }
196
197 private void stopTheQueueThread() {
198
199 queueWriteLock.lock();
200 try {
201 stopped = true;
202 stopping = false;
203 queueIsStopped.signalAll();
204 } finally {
205 queueWriteLock.unlock();
206 }
207 }
208 }
209
210 private void processItems() throws CacheException {
211
212 if (busyProcessing.get()) {
213 throw new CacheException("The write behind queue for cache '" + cacheName + "' is already busy processing.");
214 }
215
216
217 busyProcessing.set(true);
218 lastProcessing.set(System.currentTimeMillis());
219
220 try {
221 final int workSize;
222 final List<SingleOperation> quarantined;
223
224 queueWriteLock.lock();
225 try {
226
227 if (waiting.size() > 0) {
228 quarantined = waiting;
229 waiting = new ArrayList<SingleOperation>();
230 } else {
231 quarantined = null;
232 }
233
234
235 if (quarantined != null) {
236 workSize = quarantined.size();
237 } else {
238 workSize = 0;
239 }
240 } finally {
241 queueWriteLock.unlock();
242 }
243
244
245 if (0 == workSize) {
246 if (LOGGER.isLoggable(Level.FINER)) {
247 LOGGER.finer(getThreadName() + " : processItems() : nothing to process");
248 }
249 return;
250 }
251
252 try {
253 filterQuarantined(quarantined);
254
255
256
257 if (writeBatching && writeBatchSize > 0) {
258
259
260 if (workSize < writeBatchSize && maxWriteDelayMs > lastProcessing.get() - lastWorkDone.get()) {
261 waitUntilEnoughWorkItemsAvailable(quarantined, workSize);
262 return;
263 }
264
265
266 if (rateLimitPerSecond > 0) {
267 final long secondsSinceLastWorkDone = (System.currentTimeMillis() - lastWorkDone.get()) / MS_IN_SEC;
268 final long maxBatchSizeSinceLastWorkDone = rateLimitPerSecond * secondsSinceLastWorkDone;
269 final int batchSize = determineBatchSize(quarantined);
270 if (batchSize > maxBatchSizeSinceLastWorkDone) {
271 waitUntilEnoughTimeHasPassed(quarantined, batchSize, secondsSinceLastWorkDone);
272 return;
273 }
274 }
275 }
276
277
278 lastWorkDone.set(System.currentTimeMillis());
279
280 if (LOGGER.isLoggable(Level.FINER)) {
281 LOGGER.finer(getThreadName() + " : processItems() : processing started");
282 }
283
284
285 processQuarantinedItems(quarantined);
286 } catch (final RuntimeException e) {
287 reassemble(quarantined);
288 throw e;
289 } catch (final Error e) {
290 reassemble(quarantined);
291 throw e;
292 }
293 } finally {
294 busyProcessing.set(false);
295
296 if (LOGGER.isLoggable(Level.FINER)) {
297 LOGGER.finer(getThreadName() + " : processItems() : processing finished");
298 }
299 }
300 }
301
302 private void waitUntilEnoughWorkItemsAvailable(List<SingleOperation> quarantined, int workSize) {
303 if (LOGGER.isLoggable(Level.FINER)) {
304 LOGGER.finer(getThreadName() + " : processItems() : only " + workSize + " work items available, waiting for "
305 + writeBatchSize + " items to fill up a batch");
306 }
307 reassemble(quarantined);
308 }
309
310 private void waitUntilEnoughTimeHasPassed(List<SingleOperation> quarantined, int batchSize, long secondsSinceLastWorkDone) {
311 if (LOGGER.isLoggable(Level.FINER)) {
312 LOGGER.finer(getThreadName() + " : processItems() : last work was done " + secondsSinceLastWorkDone
313 + " seconds ago, processing " + batchSize + " batch items would exceed the rate limit of "
314 + rateLimitPerSecond + ", waiting for a while.");
315 }
316 reassemble(quarantined);
317 }
318
319 private int determineBatchSize(List<SingleOperation> quarantined) {
320 int batchSize = writeBatchSize;
321 if (quarantined.size() < batchSize) {
322 batchSize = quarantined.size();
323 }
324 return batchSize;
325 }
326
327 private void filterQuarantined(List<SingleOperation> quarantined) {
328 OperationsFilter operationsFilter = this.filter;
329 if (operationsFilter != null) {
330 operationsFilter.filter(quarantined, CastingOperationConverter.getInstance());
331 }
332 }
333
334 private void processQuarantinedItems(List<SingleOperation> quarantined) {
335 if (LOGGER.isLoggable(Level.CONFIG)) {
336 LOGGER.config(getThreadName() + " : processItems() : processing " + quarantined.size() + " quarantined items");
337 }
338
339 if (writeBatching && writeBatchSize > 0) {
340 processBatchedOperations(quarantined);
341 } else {
342 processSingleOperation(quarantined);
343
344 }
345 }
346
347 private void processBatchedOperations(List<SingleOperation> quarantined) {
348 final int batchSize = determineBatchSize(quarantined);
349
350
351 final Map<SingleOperationType, List<SingleOperation>> separatedItemsPerType =
352 new TreeMap<SingleOperationType, List<SingleOperation>>();
353 for (int i = 0; i < batchSize; i++) {
354 final SingleOperation item = quarantined.get(i);
355
356 if (LOGGER.isLoggable(Level.CONFIG)) {
357 LOGGER.config(getThreadName() + " : processItems() : adding " + item + " to next batch");
358 }
359
360 List<SingleOperation> itemsPerType = separatedItemsPerType.get(item.getType());
361 if (null == itemsPerType) {
362 itemsPerType = new ArrayList<SingleOperation>();
363 separatedItemsPerType.put(item.getType(), itemsPerType);
364 }
365
366 itemsPerType.add(item);
367 }
368
369
370 for (List<SingleOperation> itemsPerType : separatedItemsPerType.values()) {
371 int executionsLeft = retryAttempts + 1;
372 while (executionsLeft-- > 0) {
373 try {
374 itemsPerType.get(0).createBatchOperation(itemsPerType).performBatchOperation(cacheWriter);
375 break;
376 } catch (final RuntimeException e) {
377 if (executionsLeft <= 0) {
378 throw e;
379 } else {
380 LOGGER.warning("Exception while processing write behind queue, retrying in " + retryAttemptDelaySeconds
381 + " seconds, " + executionsLeft + " retries left : " + e.getMessage());
382 try {
383 Thread.sleep(retryAttemptDelaySeconds * MS_IN_SEC);
384 } catch (InterruptedException e1) {
385 Thread.currentThread().interrupt();
386 throw e;
387 }
388 }
389 }
390 }
391 }
392
393
394 for (int i = 0; i < batchSize; i++) {
395 quarantined.remove(0);
396 }
397
398 if (!quarantined.isEmpty()) {
399 reassemble(quarantined);
400 }
401 }
402
403 private void processSingleOperation(List<SingleOperation> quarantined) {
404 while (!quarantined.isEmpty()) {
405
406 final SingleOperation item = quarantined.get(0);
407 if (LOGGER.isLoggable(Level.CONFIG)) {
408 LOGGER.config(getThreadName() + " : processItems() : processing " + item);
409 }
410
411 int executionsLeft = retryAttempts + 1;
412 while (executionsLeft-- > 0) {
413 try {
414 item.performSingleOperation(cacheWriter);
415 break;
416 } catch (final RuntimeException e) {
417 if (executionsLeft <= 0) {
418 throw e;
419 } else {
420 LOGGER.warning("Exception while processing write behind queue, retrying in " + retryAttemptDelaySeconds
421 + " seconds, " + executionsLeft + " retries left : " + e.getMessage());
422 try {
423 Thread.sleep(retryAttemptDelaySeconds * MS_IN_SEC);
424 } catch (InterruptedException e1) {
425 Thread.currentThread().interrupt();
426 throw e;
427 }
428 }
429 }
430 }
431
432 quarantined.remove(0);
433 }
434 }
435
436 /***
437 * {@inheritDoc}
438 */
439 public void write(Element element) {
440 queueWriteLock.lock();
441 try {
442 waitForQueueSizeToDrop();
443 if (stopping || stopped) {
444 throw new CacheException("The element '" + element + "' couldn't be added through the write-behind queue for cache '"
445 + cacheName + "' since it's not started.");
446 }
447 waiting.add(new WriteOperation(element));
448 if (waiting.size() + 1 < maxQueueSize) {
449 queueIsFull.signal();
450 }
451 queueIsEmpty.signal();
452 } finally {
453 queueWriteLock.unlock();
454 }
455 }
456
457 private void waitForQueueSizeToDrop() {
458 if (maxQueueSize > 0) {
459 while (getQueueSize() >= maxQueueSize) {
460 try {
461 queueIsFull.await();
462 } catch (InterruptedException e) {
463 stop();
464 Thread.currentThread().interrupt();
465 }
466 }
467 }
468 }
469
470 /***
471 * {@inheritDoc}
472 */
473 public void delete(CacheEntry entry) {
474 queueWriteLock.lock();
475 try {
476 waitForQueueSizeToDrop();
477 if (stopping || stopped) {
478 throw new CacheException("The entry for key '" + entry.getKey() + "' couldn't be deleted through the write-behind "
479 + "queue for cache '" + cacheName + "' since it's not started.");
480 }
481 waiting.add(new DeleteOperation(entry));
482 if (waiting.size() + 1 < maxQueueSize) {
483 queueIsFull.signal();
484 }
485 queueIsEmpty.signal();
486 } finally {
487 queueWriteLock.unlock();
488 }
489 }
490
491 /***
492 * {@inheritDoc}
493 */
494 public void stop() throws CacheException {
495 queueWriteLock.lock();
496 try {
497 if (stopped) {
498 return;
499 }
500
501 stopping = true;
502 queueIsEmpty.signal();
503 while (!stopped) {
504 queueIsStopped.await();
505 }
506 } catch (InterruptedException e) {
507 Thread.currentThread().interrupt();
508 throw new CacheException(e);
509 } finally {
510 queueWriteLock.unlock();
511 }
512 }
513
514 /***
515 * Gets the best estimate for items in the queue still awaiting processing.
516 * Not including elements currently processed
517 * @return the amount of elements still awaiting processing.
518 */
519 public long getQueueSize() {
520 return waiting.size();
521 }
522
523 private boolean isStopped() {
524 queueReadLock.lock();
525 try {
526 return stopped;
527 } finally {
528 queueReadLock.unlock();
529 }
530 }
531
532 private String getThreadName() {
533 return processingThread.getName();
534 }
535
536 private void reassemble(List<SingleOperation> quarantined) {
537 queueWriteLock.lock();
538 try {
539 if (null == quarantined) {
540 return;
541 }
542
543 quarantined.addAll(waiting);
544
545 waiting = quarantined;
546
547 queueIsEmpty.signal();
548 } finally {
549 queueWriteLock.unlock();
550 }
551 }
552 }