View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop.store;
18  
19  import java.io.IOException;
20  import java.util.Collection;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.CopyOnWriteArrayList;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  
30  import net.sf.ehcache.CacheException;
31  import net.sf.ehcache.Element;
32  import net.sf.ehcache.Status;
33  import net.sf.ehcache.cluster.CacheCluster;
34  import net.sf.ehcache.cluster.ClusterNode;
35  import net.sf.ehcache.cluster.ClusterTopologyListener;
36  import net.sf.ehcache.config.NonstopConfiguration;
37  import net.sf.ehcache.constructs.nonstop.ClusterOperation;
38  import net.sf.ehcache.constructs.nonstop.NonstopActiveDelegateHolder;
39  import net.sf.ehcache.constructs.nonstop.concurrency.CacheOperationUnderExplicitLockCallable;
40  import net.sf.ehcache.constructs.nonstop.concurrency.ExplicitLockingContextThreadLocal;
41  import net.sf.ehcache.search.Attribute;
42  import net.sf.ehcache.search.Results;
43  import net.sf.ehcache.search.attribute.AttributeExtractor;
44  import net.sf.ehcache.store.ElementValueComparator;
45  import net.sf.ehcache.store.Policy;
46  import net.sf.ehcache.store.StoreListener;
47  import net.sf.ehcache.store.StoreQuery;
48  import net.sf.ehcache.store.TerracottaStore;
49  import net.sf.ehcache.writer.CacheWriterManager;
50  
51  /***
52   * This implementation executes all operations using a NonstopExecutorService. On Timeout, uses the
53   * {@link NonstopTimeoutBehaviorStoreResolver} to
54   * resolve the timeout behavior store and execute it.
55   * <p/>
56   *
57   * @author Abhishek Sanoujam
58   *
59   */
60  public class ExecutorServiceStore implements RejoinAwareNonstopStore {
61  
62      /***
63       * The NonstopConfiguration of the cache using this store
64       */
65      protected final NonstopConfiguration nonstopConfiguration;
66      private final NonstopActiveDelegateHolder nonstopActiveDelegateHolder;
67      private final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver;
68      private final AtomicBoolean clusterOffline = new AtomicBoolean();
69      private final List<RejoinAwareBlockingOperation> rejoinAwareOperations = new CopyOnWriteArrayList<RejoinAwareBlockingOperation>();
70      private final ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal;
71  
72      /***
73       * Constructor accepting the {@link NonstopActiveDelegateHolder}, {@link NonstopConfiguration} and
74       * {@link NonstopTimeoutBehaviorStoreResolver}
75       *
76       * @param explicitLockingContextThreadLocal
77       *
78       */
79      public ExecutorServiceStore(final NonstopActiveDelegateHolder nonstopActiveDelegateHolder,
80              final NonstopConfiguration nonstopConfiguration, final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver,
81              CacheCluster cacheCluster, ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal) {
82          this.nonstopActiveDelegateHolder = nonstopActiveDelegateHolder;
83          this.nonstopConfiguration = nonstopConfiguration;
84          this.timeoutBehaviorResolver = timeoutBehaviorResolver;
85          this.explicitLockingContextThreadLocal = explicitLockingContextThreadLocal;
86          cacheCluster.addTopologyListener(new ClusterStatusListener(this, cacheCluster));
87      }
88  
89      /***
90       * Make the cluster offline as cluster rejoin is beginning
91       */
92      void clusterOffline() {
93          clusterOffline.set(true);
94          synchronized (clusterOffline) {
95              clusterOffline.notifyAll();
96          }
97      }
98  
99      /***
100      * Make the cluster online
101      */
102     void clusterOnline() {
103         clusterOffline.set(false);
104         synchronized (clusterOffline) {
105             clusterOffline.notifyAll();
106         }
107     }
108 
109     private <V> V forceExecuteWithExecutor(final Callable<V> callable) throws CacheException, TimeoutException {
110         return forceExecuteWithExecutor(callable, nonstopConfiguration.getTimeoutMillis());
111     }
112 
113     private <V> V forceExecuteWithExecutor(final Callable<V> callable, final long timeoutMillis) throws CacheException, TimeoutException {
114         return executeWithExecutor(callable, timeoutMillis, true);
115     }
116 
117     /***
118      * Execute call within NonStop executor
119      *
120      * @param callable
121      * @param <V>
122      * @throws CacheException
123      * @throws TimeoutException
124      * @return returns the result of the callable
125      */
126     protected <V> V executeWithExecutor(final Callable<V> callable) throws CacheException, TimeoutException {
127         return executeWithExecutor(callable, nonstopConfiguration.getTimeoutMillis(), false);
128     }
129 
130     /***
131      * Execute call within NonStop executor
132      *
133      * @param callable
134      * @param timeoutMillis
135      * @param <V>
136      * @throws CacheException
137      * @throws TimeoutException
138      * @return the result of the callable
139      */
140     protected <V> V executeWithExecutor(final Callable<V> callable, final long timeoutMillis) throws CacheException, TimeoutException {
141         return executeWithExecutor(callable, timeoutMillis, false);
142     }
143 
144     private <V> V executeWithExecutor(final Callable<V> callable, final long timeOutMills, final boolean force) throws CacheException,
145             TimeoutException {
146         Callable<V> effectiveCallable = callable;
147         final long start = System.nanoTime();
148         if (!force) {
149             checkForClusterOffline(start, timeOutMills);
150         }
151         final boolean operationUnderExplicitLock = explicitLockingContextThreadLocal.areAnyExplicitLocksAcquired();
152         if (operationUnderExplicitLock) {
153             effectiveCallable = new CacheOperationUnderExplicitLockCallable<V>(
154                     explicitLockingContextThreadLocal.getCurrentThreadLockContext(), nonstopConfiguration, callable);
155         }
156         try {
157             final long remaining = timeOutMills - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
158             return nonstopActiveDelegateHolder.getNonstopExecutorService().execute(effectiveCallable, remaining);
159         } catch (InterruptedException e) {
160             // rethrow as CacheException
161             throw new CacheException(e);
162         }
163     }
164 
165     /***
166      * Get the underlying Terracotta store
167      *
168      * @return the underlying Terracotta store
169      */
170     protected TerracottaStore underlyingTerracottaStore() {
171         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore();
172     }
173 
174     /***
175      * Get the timeout behavior resolver NonstopStore
176      *
177      * @return the timeout behavior resolver NonstopStore
178      */
179     protected NonstopStore resolveTimeoutBehaviorStore() {
180         return timeoutBehaviorResolver.resolveTimeoutBehaviorStore();
181     }
182 
183     private void checkForClusterOffline(final long start, final long timeoutMills) throws TimeoutException {
184         while (clusterOffline.get()) {
185             if (nonstopConfiguration.isImmediateTimeout()) {
186                 throw new TimeoutException("Cluster is currently offline");
187             }
188             final long remaining = timeoutMills - TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
189             if (remaining <= 0) {
190                 break;
191             }
192             synchronized (clusterOffline) {
193                 try {
194                     clusterOffline.wait(remaining);
195                 } catch (InterruptedException e) {
196                     // rethrow as CacheException
197                     throw new CacheException(e);
198                 }
199             }
200         }
201         if (clusterOffline.get()) {
202             // still cluster offline
203             throw new TimeoutException("Cluster is currently offline");
204         }
205     }
206 
207     // /////////////////////////////////////////////////////////
208     // methods below use the 'force' executeWithExecutor version
209     // these are methods that are used during rejoin, as during rejoin
210     // the cluster is offline and unless force is used, the methods
211     // won't be executed at all
212     // /////////////////////////////////////////////////////////
213 
214     /***
215      * {@inheritDoc}.
216      */
217     public void dispose() {
218         try {
219             // always execute even when cluster offline
220             forceExecuteWithExecutor(new Callable<Void>() {
221                 public Void call() throws Exception {
222                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().dispose();
223                     return null;
224                 }
225             });
226         } catch (TimeoutException e) {
227             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().dispose();
228         }
229     }
230 
231     /***
232      * {@inheritDoc}.
233      * The timeout used by this method is {@link NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
234      * config.
235      */
236     public void setNodeCoherent(final boolean coherent) throws UnsupportedOperationException {
237         try {
238             // always execute even when cluster offline
239             forceExecuteWithExecutor(new Callable<Void>() {
240                 public Void call() throws Exception {
241                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setNodeCoherent(coherent);
242                     return null;
243                 }
244             }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
245         } catch (TimeoutException e) {
246             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setNodeCoherent(coherent);
247         }
248     }
249 
250     /***
251      * {@inheritDoc}.
252      */
253     public void setAttributeExtractors(final Map<String, AttributeExtractor> extractors) {
254         try {
255             // always execute even when cluster offline
256             forceExecuteWithExecutor(new Callable<Void>() {
257                 public Void call() throws Exception {
258                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setAttributeExtractors(extractors);
259                     return null;
260                 }
261             });
262         } catch (TimeoutException e) {
263             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setAttributeExtractors(extractors);
264         }
265     }
266 
267     /***
268      * {@inheritDoc}.
269      */
270     public void addStoreListener(final StoreListener listener) {
271         try {
272             // always execute even when cluster offline
273             forceExecuteWithExecutor(new Callable<Void>() {
274                 public Void call() throws Exception {
275                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().addStoreListener(listener);
276                     return null;
277                 }
278             });
279         } catch (TimeoutException e) {
280             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().addStoreListener(listener);
281         }
282     }
283 
284     // /////////////////////////////////////////////////////////
285     // methods below use the normal executeWithExecutor version
286     // /////////////////////////////////////////////////////////
287 
288     /***
289      * {@inheritDoc}.
290      */
291     public void removeStoreListener(final StoreListener listener) {
292         try {
293             executeWithExecutor(new Callable<Void>() {
294                 public Void call() throws Exception {
295                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeStoreListener(listener);
296                     return null;
297                 }
298             });
299         } catch (TimeoutException e) {
300             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeStoreListener(listener);
301         }
302     }
303 
304     /***
305      * {@inheritDoc}.
306      */
307     public boolean put(final Element element) throws CacheException {
308         boolean rv = false;
309         try {
310             rv = executeWithExecutor(new Callable<Boolean>() {
311                 public Boolean call() throws Exception {
312                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().put(element);
313                 }
314             });
315         } catch (TimeoutException e) {
316             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().put(element);
317         }
318         return rv;
319     }
320 
321     /***
322      * {@inheritDoc}.
323      */
324     public void putAll(final Collection<Element> elements) throws CacheException {
325         try {
326             executeWithExecutor(new Callable<Void>() {
327                 public Void call() throws Exception {
328                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putAll(elements);
329                     return null;
330                 }
331             }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
332         } catch (TimeoutException e) {
333             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putAll(elements);
334         }
335     }
336 
337     /***
338      * {@inheritDoc}.
339      */
340     public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
341         boolean rv = false;
342         try {
343             rv = executeWithExecutor(new Callable<Boolean>() {
344                 public Boolean call() throws Exception {
345                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putWithWriter(element, writerManager);
346                 }
347             });
348         } catch (TimeoutException e) {
349             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putWithWriter(element, writerManager);
350         }
351         return rv;
352     }
353 
354     /***
355      * {@inheritDoc}.
356      */
357     public Element get(final Object key) {
358         Element rv = null;
359         try {
360             rv = executeWithExecutor(new Callable<Element>() {
361                 public Element call() throws Exception {
362                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().get(key);
363                 }
364             });
365         } catch (TimeoutException e) {
366             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().get(key);
367         }
368         return rv;
369     }
370 
371     /***
372      * {@inheritDoc}.
373      */
374     public Element getQuiet(final Object key) {
375         Element rv = null;
376         try {
377             rv = executeWithExecutor(new Callable<Element>() {
378                 public Element call() throws Exception {
379                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getQuiet(key);
380                 }
381             });
382         } catch (TimeoutException e) {
383             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getQuiet(key);
384         }
385         return rv;
386     }
387 
388     /***
389      * {@inheritDoc}.
390      */
391     public List getKeys() {
392         List rv = null;
393         try {
394             rv = executeWithExecutor(new Callable<List>() {
395                 public List call() throws Exception {
396                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getKeys();
397                 }
398             });
399         } catch (TimeoutException e) {
400             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getKeys();
401         }
402         return rv;
403     }
404 
405     /***
406      * {@inheritDoc}.
407      */
408     public Element remove(final Object key) {
409         Element rv = null;
410         try {
411             rv = executeWithExecutor(new Callable<Element>() {
412                 public Element call() throws Exception {
413                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().remove(key);
414                 }
415             });
416         } catch (TimeoutException e) {
417             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().remove(key);
418         }
419         return rv;
420     }
421 
422     /***
423      * {@inheritDoc}.
424      */
425     public void removeAll(final Collection<Object> keys) {
426         try {
427             executeWithExecutor(new Callable<Void>() {
428                 public Void call() throws Exception {
429                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeAll(keys);
430                     return null;
431                 }
432             }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
433         } catch (TimeoutException e) {
434             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeAll(keys);
435         }
436     }
437 
438     /***
439      * {@inheritDoc}.
440      */
441     public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
442         Element rv = null;
443         try {
444             rv = executeWithExecutor(new Callable<Element>() {
445                 public Element call() throws Exception {
446                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeWithWriter(key, writerManager);
447                 }
448             });
449         } catch (TimeoutException e) {
450             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeWithWriter(key, writerManager);
451         }
452         return rv;
453     }
454 
455     /***
456      * {@inheritDoc}.
457      * The timeout used by this method is {@link NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
458      * config.
459      */
460     public void removeAll() throws CacheException {
461         try {
462             executeWithExecutor(new Callable<Void>() {
463                 public Void call() throws Exception {
464                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeAll();
465                     return null;
466                 }
467             }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
468         } catch (TimeoutException e) {
469             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeAll();
470         }
471     }
472 
473     /***
474      * {@inheritDoc}.
475      */
476     public Element putIfAbsent(final Element element) throws NullPointerException {
477         Element rv = null;
478         try {
479             rv = executeWithExecutor(new Callable<Element>() {
480                 public Element call() throws Exception {
481                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().putIfAbsent(element);
482                 }
483             });
484         } catch (TimeoutException e) {
485             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().putIfAbsent(element);
486         }
487         return rv;
488     }
489 
490     /***
491      * {@inheritDoc}.
492      */
493     public Element removeElement(final Element element, final ElementValueComparator comparator) throws NullPointerException {
494         Element rv = null;
495         try {
496             rv = executeWithExecutor(new Callable<Element>() {
497                 public Element call() throws Exception {
498                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().removeElement(element, comparator);
499                 }
500             });
501         } catch (TimeoutException e) {
502             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().removeElement(element, comparator);
503         }
504         return rv;
505     }
506 
507     /***
508      * {@inheritDoc}.
509      */
510     public boolean replace(final Element old, final Element element, final ElementValueComparator comparator) throws NullPointerException,
511             IllegalArgumentException {
512         boolean rv = false;
513         try {
514             rv = executeWithExecutor(new Callable<Boolean>() {
515                 public Boolean call() throws Exception {
516                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().replace(old, element, comparator);
517                 }
518             });
519         } catch (TimeoutException e) {
520             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().replace(old, element, comparator);
521         }
522         return rv;
523     }
524 
525     /***
526      * {@inheritDoc}.
527      */
528     public Element replace(final Element element) throws NullPointerException {
529         Element rv = null;
530         try {
531             rv = executeWithExecutor(new Callable<Element>() {
532                 public Element call() throws Exception {
533                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().replace(element);
534                 }
535             });
536         } catch (TimeoutException e) {
537             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().replace(element);
538         }
539         return rv;
540     }
541 
542     /***
543      * {@inheritDoc}.
544      */
545     public int getSize() {
546         int rv = 0;
547         try {
548             rv = executeWithExecutor(new Callable<Integer>() {
549                 public Integer call() throws Exception {
550                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getSize();
551                 }
552             });
553         } catch (TimeoutException e) {
554             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getSize();
555         }
556         return rv;
557     }
558 
559     /***
560      * {@inheritDoc}.
561      */
562     public int getInMemorySize() {
563         int rv = 0;
564         try {
565             rv = executeWithExecutor(new Callable<Integer>() {
566                 public Integer call() throws Exception {
567                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemorySize();
568                 }
569             });
570         } catch (TimeoutException e) {
571             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemorySize();
572         }
573         return rv;
574     }
575 
576     /***
577      * {@inheritDoc}.
578      */
579     public int getOffHeapSize() {
580         int rv = 0;
581         try {
582             rv = executeWithExecutor(new Callable<Integer>() {
583                 public Integer call() throws Exception {
584                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSize();
585                 }
586             });
587         } catch (TimeoutException e) {
588             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOffHeapSize();
589         }
590         return rv;
591     }
592 
593     /***
594      * {@inheritDoc}.
595      */
596     public int getOnDiskSize() {
597         int rv = 0;
598         try {
599             rv = executeWithExecutor(new Callable<Integer>() {
600                 public Integer call() throws Exception {
601                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSize();
602                 }
603             });
604         } catch (TimeoutException e) {
605             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOnDiskSize();
606         }
607         return rv;
608     }
609 
610     /***
611      * {@inheritDoc}.
612      */
613     public int getTerracottaClusteredSize() {
614         int rv = 0;
615         try {
616             rv = executeWithExecutor(new Callable<Integer>() {
617                 public Integer call() throws Exception {
618                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getTerracottaClusteredSize();
619                 }
620             });
621         } catch (TimeoutException e) {
622             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getTerracottaClusteredSize();
623         }
624         return rv;
625     }
626 
627     /***
628      * {@inheritDoc}.
629      */
630     public long getInMemorySizeInBytes() {
631         long rv = 0;
632         try {
633             rv = executeWithExecutor(new Callable<Long>() {
634                 public Long call() throws Exception {
635                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemorySizeInBytes();
636                 }
637             });
638         } catch (TimeoutException e) {
639             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemorySizeInBytes();
640         }
641         return rv;
642     }
643 
644     /***
645      * {@inheritDoc}.
646      */
647     public long getOffHeapSizeInBytes() {
648         long rv = 0;
649         try {
650             rv = executeWithExecutor(new Callable<Long>() {
651                 public Long call() throws Exception {
652                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSizeInBytes();
653                 }
654             });
655         } catch (TimeoutException e) {
656             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOffHeapSizeInBytes();
657         }
658         return rv;
659     }
660 
661     /***
662      * {@inheritDoc}.
663      */
664     public long getOnDiskSizeInBytes() {
665         long rv = 0;
666         try {
667             rv = executeWithExecutor(new Callable<Long>() {
668                 public Long call() throws Exception {
669                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSizeInBytes();
670                 }
671             });
672         } catch (TimeoutException e) {
673             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getOnDiskSizeInBytes();
674         }
675         return rv;
676     }
677 
678     /***
679      * {@inheritDoc}.
680      */
681     public Status getStatus() {
682         Status rv = null;
683         try {
684             rv = executeWithExecutor(new Callable<Status>() {
685                 public Status call() throws Exception {
686                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getStatus();
687                 }
688             });
689         } catch (TimeoutException e) {
690             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getStatus();
691         }
692         return rv;
693     }
694 
695     /***
696      * {@inheritDoc}.
697      */
698     public boolean containsKey(final Object key) {
699         boolean rv = false;
700         try {
701             rv = executeWithExecutor(new Callable<Boolean>() {
702                 public Boolean call() throws Exception {
703                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKey(key);
704                 }
705             });
706         } catch (TimeoutException e) {
707             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKey(key);
708         }
709         return rv;
710     }
711 
712     /***
713      * {@inheritDoc}.
714      */
715     public boolean containsKeyOnDisk(final Object key) {
716         boolean rv = false;
717         try {
718             rv = executeWithExecutor(new Callable<Boolean>() {
719                 public Boolean call() throws Exception {
720                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOnDisk(key);
721                 }
722             });
723         } catch (TimeoutException e) {
724             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyOnDisk(key);
725         }
726         return rv;
727     }
728 
729     /***
730      * {@inheritDoc}.
731      */
732     public boolean containsKeyOffHeap(final Object key) {
733         boolean rv = false;
734         try {
735             rv = executeWithExecutor(new Callable<Boolean>() {
736                 public Boolean call() throws Exception {
737                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOffHeap(key);
738                 }
739             });
740         } catch (TimeoutException e) {
741             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyOffHeap(key);
742         }
743         return rv;
744     }
745 
746     /***
747      * {@inheritDoc}.
748      */
749     public boolean containsKeyInMemory(final Object key) {
750         boolean rv = false;
751         try {
752             rv = executeWithExecutor(new Callable<Boolean>() {
753                 public Boolean call() throws Exception {
754                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyInMemory(key);
755                 }
756             });
757         } catch (TimeoutException e) {
758             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().containsKeyInMemory(key);
759         }
760         return rv;
761     }
762 
763     /***
764      * {@inheritDoc}.
765      */
766     public void expireElements() {
767         try {
768             executeWithExecutor(new Callable<Void>() {
769                 public Void call() throws Exception {
770                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().expireElements();
771                     return null;
772                 }
773             });
774         } catch (TimeoutException e) {
775             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().expireElements();
776         }
777     }
778 
779     /***
780      * {@inheritDoc}.
781      */
782     public void flush() throws IOException {
783         try {
784             executeWithExecutor(new Callable<Void>() {
785                 public Void call() throws Exception {
786                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().flush();
787                     return null;
788                 }
789             });
790         } catch (TimeoutException e) {
791             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().flush();
792         }
793     }
794 
795     /***
796      * {@inheritDoc}.
797      */
798     public boolean bufferFull() {
799         boolean rv = false;
800         try {
801             rv = executeWithExecutor(new Callable<Boolean>() {
802                 public Boolean call() throws Exception {
803                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().bufferFull();
804                 }
805             });
806         } catch (TimeoutException e) {
807             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().bufferFull();
808         }
809         return rv;
810     }
811 
812     /***
813      * {@inheritDoc}.
814      */
815     public Policy getInMemoryEvictionPolicy() {
816         Policy rv = null;
817         try {
818             rv = executeWithExecutor(new Callable<Policy>() {
819                 public Policy call() throws Exception {
820                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemoryEvictionPolicy();
821                 }
822             });
823         } catch (TimeoutException e) {
824             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInMemoryEvictionPolicy();
825         }
826         return rv;
827     }
828 
829     /***
830      * {@inheritDoc}.
831      */
832     public void setInMemoryEvictionPolicy(final Policy policy) {
833         try {
834             executeWithExecutor(new Callable<Void>() {
835                 public Void call() throws Exception {
836                     nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setInMemoryEvictionPolicy(policy);
837                     return null;
838                 }
839             });
840         } catch (TimeoutException e) {
841             timeoutBehaviorResolver.resolveTimeoutBehaviorStore().setInMemoryEvictionPolicy(policy);
842         }
843     }
844 
845     /***
846      * {@inheritDoc}.
847      */
848     public Object getInternalContext() {
849         Object rv = null;
850         try {
851             rv = executeWithExecutor(new Callable<Object>() {
852                 public Object call() throws Exception {
853                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInternalContext();
854                 }
855             });
856         } catch (TimeoutException e) {
857             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getInternalContext();
858         }
859         return rv;
860     }
861 
862     /***
863      * {@inheritDoc}.
864      */
865     public boolean isCacheCoherent() {
866         boolean rv = false;
867         try {
868             rv = executeWithExecutor(new Callable<Boolean>() {
869                 public Boolean call() throws Exception {
870                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isCacheCoherent();
871                 }
872             });
873         } catch (TimeoutException e) {
874             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isCacheCoherent();
875         }
876         return rv;
877     }
878 
879     /***
880      * {@inheritDoc}.
881      */
882     public boolean isClusterCoherent() {
883         boolean rv = false;
884         try {
885             rv = executeWithExecutor(new Callable<Boolean>() {
886                 public Boolean call() throws Exception {
887                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isClusterCoherent();
888                 }
889             });
890         } catch (TimeoutException e) {
891             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isClusterCoherent();
892         }
893         return rv;
894     }
895 
896     /***
897      * {@inheritDoc}.
898      */
899     public boolean isNodeCoherent() {
900         boolean rv = false;
901         try {
902             rv = executeWithExecutor(new Callable<Boolean>() {
903                 public Boolean call() throws Exception {
904                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().isNodeCoherent();
905                 }
906             });
907         } catch (TimeoutException e) {
908             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().isNodeCoherent();
909         }
910         return rv;
911     }
912 
913     /***
914      * {@inheritDoc}.
915      *
916      * @throws InterruptedException
917      */
918     public void waitUntilClusterCoherent() throws UnsupportedOperationException, InterruptedException {
919         final RejoinAwareBlockingOperation<Void> operation = new RejoinAwareBlockingOperation<Void>(this, new Callable<Void>() {
920             public Void call() throws Exception {
921                 nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().waitUntilClusterCoherent();
922                 return null;
923             }
924         });
925         rejoinAwareOperations.add(operation);
926         try {
927             operation.call();
928         } catch (Exception e) {
929             if (e instanceof InterruptedException) {
930                 throw (InterruptedException) e;
931             } else {
932                 throw new CacheException(e);
933             }
934         } finally {
935             rejoinAwareOperations.remove(operation);
936         }
937     }
938 
939     /***
940      * {@inheritDoc}.
941      */
942     public Object getMBean() {
943         Object rv = null;
944         try {
945             rv = executeWithExecutor(new Callable<Object>() {
946                 public Object call() throws Exception {
947                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getMBean();
948                 }
949             });
950         } catch (TimeoutException e) {
951             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getMBean();
952         }
953         return rv;
954     }
955 
956     /***
957      * {@inheritDoc}.
958      */
959     public Results executeQuery(final StoreQuery query) {
960         Results rv = null;
961         try {
962             rv = executeWithExecutor(new Callable<Results>() {
963                 public Results call() throws Exception {
964                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().executeQuery(query);
965                 }
966             });
967         } catch (TimeoutException e) {
968             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().executeQuery(query);
969         }
970         return rv;
971     }
972 
973     /***
974      * {@inheritDoc}.
975      */
976     public <T> Attribute<T> getSearchAttribute(final String attributeName) {
977         try {
978             return executeWithExecutor(new Callable<Attribute<T>>() {
979                 public Attribute<T> call() throws Exception {
980                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getSearchAttribute(attributeName);
981                 }
982             });
983         } catch (TimeoutException e) {
984             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getSearchAttribute(attributeName);
985         }
986     }
987 
988     /***
989      * {@inheritDoc}
990      */
991     public Set getLocalKeys() {
992         try {
993             return executeWithExecutor(new Callable<Set>() {
994                 public Set call() throws Exception {
995                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getLocalKeys();
996                 }
997             });
998         } catch (TimeoutException e) {
999             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().getLocalKeys();
1000         }
1001     }
1002 
1003     /***
1004      * {@inheritDoc}
1005      */
1006     public Element unlockedGet(final Object key) {
1007         try {
1008             return executeWithExecutor(new Callable<Element>() {
1009                 public Element call() throws Exception {
1010                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unlockedGet(key);
1011                 }
1012             });
1013         } catch (TimeoutException e) {
1014             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unlockedGet(key);
1015         }
1016     }
1017 
1018     /***
1019      * {@inheritDoc}
1020      */
1021     public Element unlockedGetQuiet(final Object key) {
1022         try {
1023             return executeWithExecutor(new Callable<Element>() {
1024                 public Element call() throws Exception {
1025                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unlockedGetQuiet(key);
1026                 }
1027             });
1028         } catch (TimeoutException e) {
1029             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unlockedGetQuiet(key);
1030         }
1031     }
1032 
1033     /***
1034      * {@inheritDoc}
1035      */
1036     public Element unsafeGet(final Object key) {
1037         try {
1038             return executeWithExecutor(new Callable<Element>() {
1039                 public Element call() throws Exception {
1040                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unsafeGet(key);
1041                 }
1042             });
1043         } catch (TimeoutException e) {
1044             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unsafeGet(key);
1045         }
1046     }
1047 
1048     /***
1049      * {@inheritDoc}
1050      */
1051     public Element unsafeGetQuiet(final Object key) {
1052         try {
1053             return executeWithExecutor(new Callable<Element>() {
1054                 public Element call() throws Exception {
1055                     return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().unsafeGetQuiet(key);
1056                 }
1057             });
1058         } catch (TimeoutException e) {
1059             return timeoutBehaviorResolver.resolveTimeoutBehaviorStore().unsafeGetQuiet(key);
1060         }
1061     }
1062 
1063     /***
1064      * {@inheritDoc}
1065      */
1066     public <V> V executeClusterOperation(final ClusterOperation<V> operation) {
1067         try {
1068             return executeWithExecutor(new ClusterOperationCallableImpl<V>(operation));
1069         } catch (TimeoutException e) {
1070             return operation.performClusterOperationTimedOut(this.nonstopConfiguration.getTimeoutBehavior().getTimeoutBehaviorType());
1071         }
1072     }
1073 
1074     /***
1075      * Executes the {@link ClusterOperation} parameter, but without any timeout. This call will block until the {@link ClusterOperation}
1076      * completes. The
1077      * {@link ClusterOperation#performClusterOperationTimedOut(net.sf.ehcache.config.TimeoutBehaviorConfiguration.TimeoutBehaviorType)} will
1078      * never be invoked for this
1079      *
1080      * @throws InterruptedException if the executing thread is interrupted before the {@link ClusterOperation} can complete
1081      */
1082     protected <V> V executeClusterOperationNoTimeout(final ClusterOperation<V> operation) throws InterruptedException {
1083         try {
1084             return executeWithExecutor(new ClusterOperationCallableImpl<V>(operation), Integer.MAX_VALUE, true);
1085         } catch (TimeoutException e) {
1086             throw new AssertionError("This should never happen as executed with no-timeout");
1087         } catch (CacheException e) {
1088             Throwable rootCause = getRootCause(e);
1089             if (rootCause instanceof InterruptedException) {
1090                 throw (InterruptedException) rootCause;
1091             } else {
1092                 throw e;
1093             }
1094         }
1095     }
1096 
1097     private Throwable getRootCause(final CacheException exception) {
1098         Throwable e = exception;
1099         while (e.getCause() != null) {
1100             e = e.getCause();
1101         }
1102         return e;
1103     }
1104 
1105     /***
1106      * A {@link ClusterTopologyListener} implementation that listens for cluster online/offline events
1107      *
1108      * @author Abhishek Sanoujam
1109      *
1110      */
1111     private static class ClusterStatusListener implements ClusterTopologyListener {
1112 
1113         private final ExecutorServiceStore executorServiceStore;
1114         private final CacheCluster cacheCluster;
1115 
1116         public ClusterStatusListener(ExecutorServiceStore executorServiceStore, CacheCluster cacheCluster) {
1117             this.executorServiceStore = executorServiceStore;
1118             this.cacheCluster = cacheCluster;
1119         }
1120 
1121         /***
1122          * {@inheritDoc}
1123          */
1124         public void clusterOffline(ClusterNode node) {
1125             if (cacheCluster.getCurrentNode().equals(node)) {
1126                 executorServiceStore.clusterOffline();
1127             }
1128         }
1129 
1130         /***
1131          * {@inheritDoc}
1132          */
1133         public void clusterOnline(ClusterNode node) {
1134             if (cacheCluster.getCurrentNode().equals(node)) {
1135                 executorServiceStore.clusterOnline();
1136             }
1137         }
1138 
1139         /***
1140          * {@inheritDoc}
1141          */
1142         public void nodeJoined(ClusterNode node) {
1143             // no-op
1144         }
1145 
1146         /***
1147          * {@inheritDoc}
1148          */
1149         public void nodeLeft(ClusterNode node) {
1150             // no-op
1151         }
1152 
1153         /***
1154          * {@inheritDoc}
1155          */
1156         public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
1157             // no-op
1158         }
1159 
1160     }
1161 
1162     /***
1163      * {@inheritDoc}
1164      */
1165     public void clusterRejoined() {
1166         for (RejoinAwareBlockingOperation operation : rejoinAwareOperations) {
1167             operation.clusterRejoined();
1168         }
1169     }
1170 }