1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop.store;
18
19 import java.io.IOException;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicBoolean;
29
30 import net.sf.ehcache.CacheException;
31 import net.sf.ehcache.Element;
32 import net.sf.ehcache.Status;
33 import net.sf.ehcache.cluster.CacheCluster;
34 import net.sf.ehcache.cluster.ClusterNode;
35 import net.sf.ehcache.cluster.ClusterTopologyListener;
36 import net.sf.ehcache.config.NonstopConfiguration;
37 import net.sf.ehcache.constructs.nonstop.ClusterOperation;
38 import net.sf.ehcache.constructs.nonstop.NonstopActiveDelegateHolder;
39 import net.sf.ehcache.constructs.nonstop.concurrency.CacheOperationUnderExplicitLockCallable;
40 import net.sf.ehcache.constructs.nonstop.concurrency.ExplicitLockingContextThreadLocal;
41 import net.sf.ehcache.search.Attribute;
42 import net.sf.ehcache.search.Results;
43 import net.sf.ehcache.search.attribute.AttributeExtractor;
44 import net.sf.ehcache.store.ElementValueComparator;
45 import net.sf.ehcache.store.Policy;
46 import net.sf.ehcache.store.StoreListener;
47 import net.sf.ehcache.store.StoreQuery;
48 import net.sf.ehcache.store.TerracottaStore;
49 import net.sf.ehcache.writer.CacheWriterManager;
50
51 /***
52 * This implementation executes all operations using a NonstopExecutorService. On Timeout, uses the
53 * {@link NonstopTimeoutBehaviorStoreResolver} to
54 * resolve the timeout behavior store and execute it.
55 * <p/>
56 *
57 * @author Abhishek Sanoujam
58 *
59 */
60 public class ExecutorServiceStore implements RejoinAwareNonstopStore {
61
62 /***
63 * The NonstopConfiguration of the cache using this store
64 */
65 protected final NonstopConfiguration nonstopConfiguration;
66 private final NonstopActiveDelegateHolder nonstopActiveDelegateHolder;
67 private final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver;
68 private final AtomicBoolean clusterOffline = new AtomicBoolean();
69 private final List<RejoinAwareBlockingOperation> rejoinAwareOperations = new CopyOnWriteArrayList<RejoinAwareBlockingOperation>();
70 private final ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal;
71
72 /***
73 * Constructor accepting the {@link NonstopActiveDelegateHolder}, {@link NonstopConfiguration} and
74 * {@link NonstopTimeoutBehaviorStoreResolver}
75 *
76 * @param explicitLockingContextThreadLocal
77 *
78 */
79 public ExecutorServiceStore(final NonstopActiveDelegateHolder nonstopActiveDelegateHolder,
80 final NonstopConfiguration nonstopConfiguration, final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver,
81 CacheCluster cacheCluster, ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal) {
82 this.nonstopActiveDelegateHolder = nonstopActiveDelegateHolder;
83 this.nonstopConfiguration = nonstopConfiguration;
84 this.timeoutBehaviorResolver = timeoutBehaviorResolver;
85 this.explicitLockingContextThreadLocal = explicitLockingContextThreadLocal;
86 cacheCluster.addTopologyListener(new ClusterStatusListener(this, cacheCluster));
87 }
88
89 /***
90 * Make the cluster offline as cluster rejoin is beginning
91 */
92 void clusterOffline() {
93 clusterOffline.set(true);
94 synchronized (clusterOffline) {
95 clusterOffline.notifyAll();
96 }
97 }
98
99 /***
100 * Make the cluster online
101 */
102 void clusterOnline() {
103 clusterOffline.set(false);
104 synchronized (clusterOffline) {
105 clusterOffline.notifyAll();
106 }
107 }
108
109 private <V> V forceExecuteWithExecutor(final Callable<V> callable) throws CacheException, TimeoutException {
110 return forceExecuteWithExecutor(callable, nonstopConfiguration.getTimeoutMillis());
111 }
112
113 private <V> V forceExecuteWithExecutor(final Callable<V> callable, final long timeoutMillis) throws CacheException, TimeoutException {
114 return executeWithExecutor(callable, timeoutMillis, true);
115 }
116
117 /***
118 * Execute call within NonStop executor
119 *
120 * @param callable
121 * @param <V>
122 * @throws CacheException
123 * @throws TimeoutException
124 * @return returns the result of the callable
125 */
126 protected <V> V executeWithExecutor(final Callable<V> callable) throws CacheException, TimeoutException {
127 return executeWithExecutor(callable, nonstopConfiguration.getTimeoutMillis(), false);
128 }
129
130 /***
131 * Execute call within NonStop executor
132 *
133 * @param callable
134 * @param timeoutMillis
135 * @param <V>
136 * @throws CacheException
137 * @throws TimeoutException
138 * @return the result of the callable
139 */
140 protected <V> V executeWithExecutor(final Callable<V> callable, final long timeoutMillis) throws CacheException, TimeoutException {
141 return executeWithExecutor(callable, timeoutMillis, false);
142 }
143
144 private <V> V executeWithExecutor(final Callable<V> callable, final long timeOutMills, final boolean force) throws CacheException,
145 TimeoutException {
146 Callable<V> effectiveCallable = callable;
147 final long start = System.nanoTime();
148 if (!force) {
149 checkForClusterOffline(start, timeOutMills);
150 }
151 final boolean operationUnderExplicitLock = explicitLockingContextThreadLocal.areAnyExplicitLocksAcquired();
152 if (operationUnderExplicitLock) {
153 effectiveCallable = new CacheOperationUnderExplicitLockCallable<V>(
154 explicitLockingContextThreadLocal.getCurrentThreadLockContext(), nonstopConfiguration, callable);
155 }
156 try {
157 final long remaining = timeOutMills - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
158 return nonstopActiveDelegateHolder.getNonstopExecutorService().execute(effectiveCallable, remaining);
159 } catch (InterruptedException e) {
160
161 throw new CacheException(e);
162 }
163 }
164
165 /***
166 * Get the underlying Terracotta store
167 *
168 * @return the underlying Terracotta store
169 */
170 protected TerracottaStore underlyingTerracottaStore() {
171 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore();
172 }
173
174 /***
175 * Get the timeout behavior resolver NonstopStore
176 *
177 * @return the timeout behavior resolver NonstopStore
178 */
179 protected NonstopStore resolveTimeoutBehaviorStore() {
180 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore();
181 }
182
183 private void checkForClusterOffline(final long start, final long timeoutMills) throws TimeoutException {
184 while (clusterOffline.get()) {
185 if (nonstopConfiguration.isImmediateTimeout()) {
186 throw new TimeoutException("Cluster is currently offline");
187 }
188 final long remaining = timeoutMills - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
189 if (remaining <= 0) {
190 break;
191 }
192 synchronized (clusterOffline) {
193 try {
194 clusterOffline.wait(remaining);
195 } catch (InterruptedException e) {
196
197 throw new CacheException(e);
198 }
199 }
200 }
201 if (clusterOffline.get()) {
202
203 throw new TimeoutException("Cluster is currently offline");
204 }
205 }
206
207
208
209
210
211
212
213
214 /***
215 * {@inheritDoc}.
216 */
217 public void dispose() {
218 try {
219
220 forceExecuteWithExecutor(new Callable<Void>() {
221 public Void call() throws Exception {
222 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().dispose();
223 return null;
224 }
225 });
226 } catch (TimeoutException e) {
227 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().dispose();
228 }
229 }
230
231 /***
232 * {@inheritDoc}.
233 * The timeout used by this method is {@link NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
234 * config.
235 */
236 public void setNodeCoherent(final boolean coherent) throws UnsupportedOperationException {
237 try {
238
239 forceExecuteWithExecutor(new Callable<Void>() {
240 public Void call() throws Exception {
241 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setNodeCoherent(coherent);
242 return null;
243 }
244 }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
245 } catch (TimeoutException e) {
246 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setNodeCoherent(coherent);
247 }
248 }
249
250 /***
251 * {@inheritDoc}.
252 */
253 public void setAttributeExtractors(final Map<String, AttributeExtractor> extractors) {
254 try {
255
256 forceExecuteWithExecutor(new Callable<Void>() {
257 public Void call() throws Exception {
258 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setAttributeExtractors(extractors);
259 return null;
260 }
261 });
262 } catch (TimeoutException e) {
263 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setAttributeExtractors(extractors);
264 }
265 }
266
267 /***
268 * {@inheritDoc}.
269 */
270 public void addStoreListener(final StoreListener listener) {
271 try {
272
273 forceExecuteWithExecutor(new Callable<Void>() {
274 public Void call() throws Exception {
275 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().addStoreListener(listener);
276 return null;
277 }
278 });
279 } catch (TimeoutException e) {
280 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().addStoreListener(listener);
281 }
282 }
283
284
285
286
287
288 /***
289 * {@inheritDoc}.
290 */
291 public void removeStoreListener(final StoreListener listener) {
292 try {
293 executeWithExecutor(new Callable<Void>() {
294 public Void call() throws Exception {
295 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeStoreListener(listener);
296 return null;
297 }
298 });
299 } catch (TimeoutException e) {
300 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeStoreListener(listener);
301 }
302 }
303
304 /***
305 * {@inheritDoc}.
306 */
307 public boolean put(final Element element) throws CacheException {
308 boolean rv = false;
309 try {
310 rv = executeWithExecutor(new Callable<Boolean>() {
311 public Boolean call() throws Exception {
312 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().put(element);
313 }
314 });
315 } catch (TimeoutException e) {
316 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().put(element);
317 }
318 return rv;
319 }
320
321 /***
322 * {@inheritDoc}.
323 */
324 public void putAll(final Collection<Element> elements) throws CacheException {
325 try {
326 executeWithExecutor(new Callable<Void>() {
327 public Void call() throws Exception {
328 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putAll(elements);
329 return null;
330 }
331 }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
332 } catch (TimeoutException e) {
333 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putAll(elements);
334 }
335 }
336
337 /***
338 * {@inheritDoc}.
339 */
340 public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
341 boolean rv = false;
342 try {
343 rv = executeWithExecutor(new Callable<Boolean>() {
344 public Boolean call() throws Exception {
345 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putWithWriter(element, writerManager);
346 }
347 });
348 } catch (TimeoutException e) {
349 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putWithWriter(element, writerManager);
350 }
351 return rv;
352 }
353
354 /***
355 * {@inheritDoc}.
356 */
357 public Element get(final Object key) {
358 Element rv = null;
359 try {
360 rv = executeWithExecutor(new Callable<Element>() {
361 public Element call() throws Exception {
362 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().get(key);
363 }
364 });
365 } catch (TimeoutException e) {
366 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().get(key);
367 }
368 return rv;
369 }
370
371 /***
372 * {@inheritDoc}.
373 */
374 public Element getQuiet(final Object key) {
375 Element rv = null;
376 try {
377 rv = executeWithExecutor(new Callable<Element>() {
378 public Element call() throws Exception {
379 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getQuiet(key);
380 }
381 });
382 } catch (TimeoutException e) {
383 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getQuiet(key);
384 }
385 return rv;
386 }
387
388 /***
389 * {@inheritDoc}.
390 */
391 public List getKeys() {
392 List rv = null;
393 try {
394 rv = executeWithExecutor(new Callable<List>() {
395 public List call() throws Exception {
396 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getKeys();
397 }
398 });
399 } catch (TimeoutException e) {
400 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getKeys();
401 }
402 return rv;
403 }
404
405 /***
406 * {@inheritDoc}.
407 */
408 public Element remove(final Object key) {
409 Element rv = null;
410 try {
411 rv = executeWithExecutor(new Callable<Element>() {
412 public Element call() throws Exception {
413 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().remove(key);
414 }
415 });
416 } catch (TimeoutException e) {
417 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().remove(key);
418 }
419 return rv;
420 }
421
422 /***
423 * {@inheritDoc}.
424 */
425 public void removeAll(final Collection<Object> keys) {
426 try {
427 executeWithExecutor(new Callable<Void>() {
428 public Void call() throws Exception {
429 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeAll(keys);
430 return null;
431 }
432 }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
433 } catch (TimeoutException e) {
434 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeAll(keys);
435 }
436 }
437
438 /***
439 * {@inheritDoc}.
440 */
441 public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
442 Element rv = null;
443 try {
444 rv = executeWithExecutor(new Callable<Element>() {
445 public Element call() throws Exception {
446 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeWithWriter(key, writerManager);
447 }
448 });
449 } catch (TimeoutException e) {
450 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeWithWriter(key, writerManager);
451 }
452 return rv;
453 }
454
455 /***
456 * {@inheritDoc}.
457 * The timeout used by this method is {@link NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
458 * config.
459 */
460 public void removeAll() throws CacheException {
461 try {
462 executeWithExecutor(new Callable<Void>() {
463 public Void call() throws Exception {
464 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeAll();
465 return null;
466 }
467 }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
468 } catch (TimeoutException e) {
469 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeAll();
470 }
471 }
472
473 /***
474 * {@inheritDoc}.
475 */
476 public Element putIfAbsent(final Element element) throws NullPointerException {
477 Element rv = null;
478 try {
479 rv = executeWithExecutor(new Callable<Element>() {
480 public Element call() throws Exception {
481 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putIfAbsent(element);
482 }
483 });
484 } catch (TimeoutException e) {
485 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putIfAbsent(element);
486 }
487 return rv;
488 }
489
490 /***
491 * {@inheritDoc}.
492 */
493 public Element removeElement(final Element element, final ElementValueComparator comparator) throws NullPointerException {
494 Element rv = null;
495 try {
496 rv = executeWithExecutor(new Callable<Element>() {
497 public Element call() throws Exception {
498 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeElement(element, comparator);
499 }
500 });
501 } catch (TimeoutException e) {
502 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeElement(element, comparator);
503 }
504 return rv;
505 }
506
507 /***
508 * {@inheritDoc}.
509 */
510 public boolean replace(final Element old, final Element element, final ElementValueComparator comparator) throws NullPointerException,
511 IllegalArgumentException {
512 boolean rv = false;
513 try {
514 rv = executeWithExecutor(new Callable<Boolean>() {
515 public Boolean call() throws Exception {
516 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().replace(old, element, comparator);
517 }
518 });
519 } catch (TimeoutException e) {
520 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().replace(old, element, comparator);
521 }
522 return rv;
523 }
524
525 /***
526 * {@inheritDoc}.
527 */
528 public Element replace(final Element element) throws NullPointerException {
529 Element rv = null;
530 try {
531 rv = executeWithExecutor(new Callable<Element>() {
532 public Element call() throws Exception {
533 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().replace(element);
534 }
535 });
536 } catch (TimeoutException e) {
537 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().replace(element);
538 }
539 return rv;
540 }
541
542 /***
543 * {@inheritDoc}.
544 */
545 public int getSize() {
546 int rv = 0;
547 try {
548 rv = executeWithExecutor(new Callable<Integer>() {
549 public Integer call() throws Exception {
550 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getSize();
551 }
552 });
553 } catch (TimeoutException e) {
554 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getSize();
555 }
556 return rv;
557 }
558
559 /***
560 * {@inheritDoc}.
561 */
562 public int getInMemorySize() {
563 int rv = 0;
564 try {
565 rv = executeWithExecutor(new Callable<Integer>() {
566 public Integer call() throws Exception {
567 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemorySize();
568 }
569 });
570 } catch (TimeoutException e) {
571 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemorySize();
572 }
573 return rv;
574 }
575
576 /***
577 * {@inheritDoc}.
578 */
579 public int getOffHeapSize() {
580 int rv = 0;
581 try {
582 rv = executeWithExecutor(new Callable<Integer>() {
583 public Integer call() throws Exception {
584 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSize();
585 }
586 });
587 } catch (TimeoutException e) {
588 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOffHeapSize();
589 }
590 return rv;
591 }
592
593 /***
594 * {@inheritDoc}.
595 */
596 public int getOnDiskSize() {
597 int rv = 0;
598 try {
599 rv = executeWithExecutor(new Callable<Integer>() {
600 public Integer call() throws Exception {
601 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSize();
602 }
603 });
604 } catch (TimeoutException e) {
605 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOnDiskSize();
606 }
607 return rv;
608 }
609
610 /***
611 * {@inheritDoc}.
612 */
613 public int getTerracottaClusteredSize() {
614 int rv = 0;
615 try {
616 rv = executeWithExecutor(new Callable<Integer>() {
617 public Integer call() throws Exception {
618 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getTerracottaClusteredSize();
619 }
620 });
621 } catch (TimeoutException e) {
622 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getTerracottaClusteredSize();
623 }
624 return rv;
625 }
626
627 /***
628 * {@inheritDoc}.
629 */
630 public long getInMemorySizeInBytes() {
631 long rv = 0;
632 try {
633 rv = executeWithExecutor(new Callable<Long>() {
634 public Long call() throws Exception {
635 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemorySizeInBytes();
636 }
637 });
638 } catch (TimeoutException e) {
639 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemorySizeInBytes();
640 }
641 return rv;
642 }
643
644 /***
645 * {@inheritDoc}.
646 */
647 public long getOffHeapSizeInBytes() {
648 long rv = 0;
649 try {
650 rv = executeWithExecutor(new Callable<Long>() {
651 public Long call() throws Exception {
652 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSizeInBytes();
653 }
654 });
655 } catch (TimeoutException e) {
656 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOffHeapSizeInBytes();
657 }
658 return rv;
659 }
660
661 /***
662 * {@inheritDoc}.
663 */
664 public long getOnDiskSizeInBytes() {
665 long rv = 0;
666 try {
667 rv = executeWithExecutor(new Callable<Long>() {
668 public Long call() throws Exception {
669 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSizeInBytes();
670 }
671 });
672 } catch (TimeoutException e) {
673 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOnDiskSizeInBytes();
674 }
675 return rv;
676 }
677
678 /***
679 * {@inheritDoc}.
680 */
681 public Status getStatus() {
682 Status rv = null;
683 try {
684 rv = executeWithExecutor(new Callable<Status>() {
685 public Status call() throws Exception {
686 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getStatus();
687 }
688 });
689 } catch (TimeoutException e) {
690 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getStatus();
691 }
692 return rv;
693 }
694
695 /***
696 * {@inheritDoc}.
697 */
698 public boolean containsKey(final Object key) {
699 boolean rv = false;
700 try {
701 rv = executeWithExecutor(new Callable<Boolean>() {
702 public Boolean call() throws Exception {
703 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKey(key);
704 }
705 });
706 } catch (TimeoutException e) {
707 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKey(key);
708 }
709 return rv;
710 }
711
712 /***
713 * {@inheritDoc}.
714 */
715 public boolean containsKeyOnDisk(final Object key) {
716 boolean rv = false;
717 try {
718 rv = executeWithExecutor(new Callable<Boolean>() {
719 public Boolean call() throws Exception {
720 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOnDisk(key);
721 }
722 });
723 } catch (TimeoutException e) {
724 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyOnDisk(key);
725 }
726 return rv;
727 }
728
729 /***
730 * {@inheritDoc}.
731 */
732 public boolean containsKeyOffHeap(final Object key) {
733 boolean rv = false;
734 try {
735 rv = executeWithExecutor(new Callable<Boolean>() {
736 public Boolean call() throws Exception {
737 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOffHeap(key);
738 }
739 });
740 } catch (TimeoutException e) {
741 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyOffHeap(key);
742 }
743 return rv;
744 }
745
746 /***
747 * {@inheritDoc}.
748 */
749 public boolean containsKeyInMemory(final Object key) {
750 boolean rv = false;
751 try {
752 rv = executeWithExecutor(new Callable<Boolean>() {
753 public Boolean call() throws Exception {
754 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyInMemory(key);
755 }
756 });
757 } catch (TimeoutException e) {
758 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyInMemory(key);
759 }
760 return rv;
761 }
762
763 /***
764 * {@inheritDoc}.
765 */
766 public void expireElements() {
767 try {
768 executeWithExecutor(new Callable<Void>() {
769 public Void call() throws Exception {
770 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().expireElements();
771 return null;
772 }
773 });
774 } catch (TimeoutException e) {
775 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().expireElements();
776 }
777 }
778
779 /***
780 * {@inheritDoc}.
781 */
782 public void flush() throws IOException {
783 try {
784 executeWithExecutor(new Callable<Void>() {
785 public Void call() throws Exception {
786 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().flush();
787 return null;
788 }
789 });
790 } catch (TimeoutException e) {
791 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().flush();
792 }
793 }
794
795 /***
796 * {@inheritDoc}.
797 */
798 public boolean bufferFull() {
799 boolean rv = false;
800 try {
801 rv = executeWithExecutor(new Callable<Boolean>() {
802 public Boolean call() throws Exception {
803 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().bufferFull();
804 }
805 });
806 } catch (TimeoutException e) {
807 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().bufferFull();
808 }
809 return rv;
810 }
811
812 /***
813 * {@inheritDoc}.
814 */
815 public Policy getInMemoryEvictionPolicy() {
816 Policy rv = null;
817 try {
818 rv = executeWithExecutor(new Callable<Policy>() {
819 public Policy call() throws Exception {
820 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemoryEvictionPolicy();
821 }
822 });
823 } catch (TimeoutException e) {
824 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemoryEvictionPolicy();
825 }
826 return rv;
827 }
828
829 /***
830 * {@inheritDoc}.
831 */
832 public void setInMemoryEvictionPolicy(final Policy policy) {
833 try {
834 executeWithExecutor(new Callable<Void>() {
835 public Void call() throws Exception {
836 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setInMemoryEvictionPolicy(policy);
837 return null;
838 }
839 });
840 } catch (TimeoutException e) {
841 timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setInMemoryEvictionPolicy(policy);
842 }
843 }
844
845 /***
846 * {@inheritDoc}.
847 */
848 public Object getInternalContext() {
849 Object rv = null;
850 try {
851 rv = executeWithExecutor(new Callable<Object>() {
852 public Object call() throws Exception {
853 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInternalContext();
854 }
855 });
856 } catch (TimeoutException e) {
857 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInternalContext();
858 }
859 return rv;
860 }
861
862 /***
863 * {@inheritDoc}.
864 */
865 public boolean isCacheCoherent() {
866 boolean rv = false;
867 try {
868 rv = executeWithExecutor(new Callable<Boolean>() {
869 public Boolean call() throws Exception {
870 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isCacheCoherent();
871 }
872 });
873 } catch (TimeoutException e) {
874 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isCacheCoherent();
875 }
876 return rv;
877 }
878
879 /***
880 * {@inheritDoc}.
881 */
882 public boolean isClusterCoherent() {
883 boolean rv = false;
884 try {
885 rv = executeWithExecutor(new Callable<Boolean>() {
886 public Boolean call() throws Exception {
887 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isClusterCoherent();
888 }
889 });
890 } catch (TimeoutException e) {
891 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isClusterCoherent();
892 }
893 return rv;
894 }
895
896 /***
897 * {@inheritDoc}.
898 */
899 public boolean isNodeCoherent() {
900 boolean rv = false;
901 try {
902 rv = executeWithExecutor(new Callable<Boolean>() {
903 public Boolean call() throws Exception {
904 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isNodeCoherent();
905 }
906 });
907 } catch (TimeoutException e) {
908 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isNodeCoherent();
909 }
910 return rv;
911 }
912
913 /***
914 * {@inheritDoc}.
915 *
916 * @throws InterruptedException
917 */
918 public void waitUntilClusterCoherent() throws UnsupportedOperationException, InterruptedException {
919 final RejoinAwareBlockingOperation<Void> operation = new RejoinAwareBlockingOperation<Void>(this, new Callable<Void>() {
920 public Void call() throws Exception {
921 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().waitUntilClusterCoherent();
922 return null;
923 }
924 });
925 rejoinAwareOperations.add(operation);
926 try {
927 operation.call();
928 } catch (Exception e) {
929 if (e instanceof InterruptedException) {
930 throw (InterruptedException) e;
931 } else {
932 throw new CacheException(e);
933 }
934 } finally {
935 rejoinAwareOperations.remove(operation);
936 }
937 }
938
939 /***
940 * {@inheritDoc}.
941 */
942 public Object getMBean() {
943 Object rv = null;
944 try {
945 rv = executeWithExecutor(new Callable<Object>() {
946 public Object call() throws Exception {
947 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getMBean();
948 }
949 });
950 } catch (TimeoutException e) {
951 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getMBean();
952 }
953 return rv;
954 }
955
956 /***
957 * {@inheritDoc}.
958 */
959 public Results executeQuery(final StoreQuery query) {
960 Results rv = null;
961 try {
962 rv = executeWithExecutor(new Callable<Results>() {
963 public Results call() throws Exception {
964 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().executeQuery(query);
965 }
966 });
967 } catch (TimeoutException e) {
968 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().executeQuery(query);
969 }
970 return rv;
971 }
972
973 /***
974 * {@inheritDoc}.
975 */
976 public <T> Attribute<T> getSearchAttribute(final String attributeName) {
977 try {
978 return executeWithExecutor(new Callable<Attribute<T>>() {
979 public Attribute<T> call() throws Exception {
980 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getSearchAttribute(attributeName);
981 }
982 });
983 } catch (TimeoutException e) {
984 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getSearchAttribute(attributeName);
985 }
986 }
987
988 /***
989 * {@inheritDoc}
990 */
991 public Set getLocalKeys() {
992 try {
993 return executeWithExecutor(new Callable<Set>() {
994 public Set call() throws Exception {
995 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getLocalKeys();
996 }
997 });
998 } catch (TimeoutException e) {
999 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getLocalKeys();
1000 }
1001 }
1002
1003 /***
1004 * {@inheritDoc}
1005 */
1006 public Element unlockedGet(final Object key) {
1007 try {
1008 return executeWithExecutor(new Callable<Element>() {
1009 public Element call() throws Exception {
1010 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unlockedGet(key);
1011 }
1012 });
1013 } catch (TimeoutException e) {
1014 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unlockedGet(key);
1015 }
1016 }
1017
1018 /***
1019 * {@inheritDoc}
1020 */
1021 public Element unlockedGetQuiet(final Object key) {
1022 try {
1023 return executeWithExecutor(new Callable<Element>() {
1024 public Element call() throws Exception {
1025 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unlockedGetQuiet(key);
1026 }
1027 });
1028 } catch (TimeoutException e) {
1029 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unlockedGetQuiet(key);
1030 }
1031 }
1032
1033 /***
1034 * {@inheritDoc}
1035 */
1036 public Element unsafeGet(final Object key) {
1037 try {
1038 return executeWithExecutor(new Callable<Element>() {
1039 public Element call() throws Exception {
1040 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unsafeGet(key);
1041 }
1042 });
1043 } catch (TimeoutException e) {
1044 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unsafeGet(key);
1045 }
1046 }
1047
1048 /***
1049 * {@inheritDoc}
1050 */
1051 public Element unsafeGetQuiet(final Object key) {
1052 try {
1053 return executeWithExecutor(new Callable<Element>() {
1054 public Element call() throws Exception {
1055 return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unsafeGetQuiet(key);
1056 }
1057 });
1058 } catch (TimeoutException e) {
1059 return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unsafeGetQuiet(key);
1060 }
1061 }
1062
1063 /***
1064 * {@inheritDoc}
1065 */
1066 public <V> V executeClusterOperation(final ClusterOperation<V> operation) {
1067 try {
1068 return executeWithExecutor(new ClusterOperationCallableImpl<V>(operation));
1069 } catch (TimeoutException e) {
1070 return operation.performClusterOperationTimedOut(this.nonstopConfiguration.getTimeoutBehavior().getTimeoutBehaviorType());
1071 }
1072 }
1073
1074 /***
1075 * Executes the {@link ClusterOperation} parameter, but without any timeout. This call will block until the {@link ClusterOperation}
1076 * completes. The
1077 * {@link ClusterOperation#performClusterOperationTimedOut(net.sf.ehcache.config.TimeoutBehaviorConfiguration.TimeoutBehaviorType)} will
1078 * never be invoked for this
1079 *
1080 * @throws InterruptedException if the executing thread is interrupted before the {@link ClusterOperation} can complete
1081 */
1082 protected <V> V executeClusterOperationNoTimeout(final ClusterOperation<V> operation) throws InterruptedException {
1083 try {
1084 return executeWithExecutor(new ClusterOperationCallableImpl<V>(operation), Integer.MAX_VALUE, true);
1085 } catch (TimeoutException e) {
1086 throw new AssertionError("This should never happen as executed with no-timeout");
1087 } catch (CacheException e) {
1088 Throwable rootCause = getRootCause(e);
1089 if (rootCause instanceof InterruptedException) {
1090 throw (InterruptedException) rootCause;
1091 } else {
1092 throw e;
1093 }
1094 }
1095 }
1096
1097 private Throwable getRootCause(final CacheException exception) {
1098 Throwable e = exception;
1099 while (e.getCause() != null) {
1100 e = e.getCause();
1101 }
1102 return e;
1103 }
1104
1105 /***
1106 * A {@link ClusterTopologyListener} implementation that listens for cluster online/offline events
1107 *
1108 * @author Abhishek Sanoujam
1109 *
1110 */
1111 private static class ClusterStatusListener implements ClusterTopologyListener {
1112
1113 private final ExecutorServiceStore executorServiceStore;
1114 private final CacheCluster cacheCluster;
1115
1116 public ClusterStatusListener(ExecutorServiceStore executorServiceStore, CacheCluster cacheCluster) {
1117 this.executorServiceStore = executorServiceStore;
1118 this.cacheCluster = cacheCluster;
1119 }
1120
1121 /***
1122 * {@inheritDoc}
1123 */
1124 public void clusterOffline(ClusterNode node) {
1125 if (cacheCluster.getCurrentNode().equals(node)) {
1126 executorServiceStore.clusterOffline();
1127 }
1128 }
1129
1130 /***
1131 * {@inheritDoc}
1132 */
1133 public void clusterOnline(ClusterNode node) {
1134 if (cacheCluster.getCurrentNode().equals(node)) {
1135 executorServiceStore.clusterOnline();
1136 }
1137 }
1138
1139 /***
1140 * {@inheritDoc}
1141 */
1142 public void nodeJoined(ClusterNode node) {
1143
1144 }
1145
1146 /***
1147 * {@inheritDoc}
1148 */
1149 public void nodeLeft(ClusterNode node) {
1150
1151 }
1152
1153 /***
1154 * {@inheritDoc}
1155 */
1156 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
1157
1158 }
1159
1160 }
1161
1162 /***
1163 * {@inheritDoc}
1164 */
1165 public void clusterRejoined() {
1166 for (RejoinAwareBlockingOperation operation : rejoinAwareOperations) {
1167 operation.clusterRejoined();
1168 }
1169 }
1170 }