View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop.store;
18  
19  import java.io.IOException;
20  import java.util.Collection;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import net.sf.ehcache.CacheException;
28  import net.sf.ehcache.Element;
29  import net.sf.ehcache.Status;
30  import net.sf.ehcache.cluster.CacheCluster;
31  import net.sf.ehcache.concurrent.CacheLockProvider;
32  import net.sf.ehcache.config.CacheConfiguration;
33  import net.sf.ehcache.config.NonstopConfiguration;
34  import net.sf.ehcache.config.TimeoutBehaviorConfiguration.TimeoutBehaviorType;
35  import net.sf.ehcache.constructs.nonstop.ClusterOperation;
36  import net.sf.ehcache.constructs.nonstop.NonstopActiveDelegateHolder;
37  import net.sf.ehcache.constructs.nonstop.concurrency.ExplicitLockingContextThreadLocal;
38  import net.sf.ehcache.constructs.nonstop.concurrency.NonstopCacheLockProvider;
39  import net.sf.ehcache.search.Attribute;
40  import net.sf.ehcache.search.Results;
41  import net.sf.ehcache.search.attribute.AttributeExtractor;
42  import net.sf.ehcache.store.ElementValueComparator;
43  import net.sf.ehcache.store.Policy;
44  import net.sf.ehcache.store.StoreListener;
45  import net.sf.ehcache.store.StoreQuery;
46  import net.sf.ehcache.store.TerracottaStore;
47  import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
48  import net.sf.ehcache.writer.CacheWriterManager;
49  
50  /***
51   * A {@link NonstopStore} implementation which does not block threads when the cluster goes down.
52   *
53   * @author Abhishek Sanoujam
54   *
55   */
56  public class NonstopStoreImpl implements NonstopTimeoutBehaviorStoreResolver, RejoinAwareNonstopStore {
57  
58      private final NonstopActiveDelegateHolder nonstopActiveDelegateHolder;
59      private final NonstopConfiguration nonstopConfig;
60      private final ConcurrentMap<TimeoutBehaviorType, NonstopStore> timeoutBehaviors;
61      private final ExecutorServiceStore executorServiceStore;
62      private final ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal;
63      private final CacheLockProvider nonstopCacheLockProvider;
64  
65      /***
66       * Constructor accepting the {@link NonstopActiveDelegateHolder}, {@link CacheCluster} and {@link NonstopConfiguration}
67       *
68       */
69      public NonstopStoreImpl(NonstopActiveDelegateHolder nonstopActiveDelegateHolder, CacheCluster cacheCluster,
70              NonstopConfiguration nonstopConfig, CacheConfiguration.TransactionalMode transactionalMode,
71              TransactionManagerLookup transactionManagerLookup) {
72          this.nonstopActiveDelegateHolder = nonstopActiveDelegateHolder;
73          this.nonstopConfig = nonstopConfig;
74          this.explicitLockingContextThreadLocal = new ExplicitLockingContextThreadLocal();
75          this.timeoutBehaviors = new ConcurrentHashMap<TimeoutBehaviorType, NonstopStore>();
76          if (transactionalMode.equals(CacheConfiguration.TransactionalMode.XA_STRICT)) {
77              executorServiceStore = new TransactionalExecutorServiceStore(nonstopActiveDelegateHolder, nonstopConfig, this, cacheCluster,
78                      transactionManagerLookup, explicitLockingContextThreadLocal);
79          } else {
80              executorServiceStore = new ExecutorServiceStore(nonstopActiveDelegateHolder, nonstopConfig, this, cacheCluster,
81                      explicitLockingContextThreadLocal);
82          }
83          this.nonstopCacheLockProvider = new NonstopCacheLockProvider(this, nonstopActiveDelegateHolder, explicitLockingContextThreadLocal);
84      }
85  
86      /***
87       * {@inheritDoc}
88       */
89      public NonstopStore resolveTimeoutBehaviorStore() {
90          final TimeoutBehaviorType timeoutBehaviorType = nonstopConfig.getTimeoutBehavior().getTimeoutBehaviorType();
91          NonstopStore timeoutStore = timeoutBehaviors.get(timeoutBehaviorType);
92          if (timeoutStore == null) {
93              timeoutStore = nonstopConfig.getTimeoutBehavior().getNonstopTimeoutBehaviorFactory()
94                      .createNonstopTimeoutBehaviorStore(nonstopActiveDelegateHolder);
95              NonstopStore prev = timeoutBehaviors.putIfAbsent(timeoutBehaviorType, timeoutStore);
96              if (prev != null) {
97                  timeoutStore = prev;
98              }
99          }
100         return timeoutStore;
101     }
102 
103     /***
104      * Package protected method - used in tests
105      *
106      * @return the underlying {@link TerracottaStore}
107      */
108     TerracottaStore getUnderlyingStore() {
109         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore();
110     }
111 
112     /***
113      * {@inheritDoc}
114      */
115     public Object getInternalContext() {
116         return nonstopCacheLockProvider;
117     }
118 
119     /***
120      * {@inheritDoc}
121      *
122      * @throws InterruptedException
123      */
124     public void waitUntilClusterCoherent() throws InterruptedException {
125         executorServiceStore.waitUntilClusterCoherent();
126     }
127 
128     // -------------------------------------------------------
129     // Methods below delegate directly to the underlying store
130     // -------------------------------------------------------
131 
132     /***
133      * {@inheritDoc}
134      */
135     public boolean bufferFull() {
136         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().bufferFull();
137     }
138 
139     /***
140      * {@inheritDoc}
141      */
142     public boolean containsKeyOffHeap(Object key) {
143         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOffHeap(key);
144     }
145 
146     /***
147      * {@inheritDoc}
148      */
149     public boolean containsKeyOnDisk(Object key) {
150         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().containsKeyOnDisk(key);
151     }
152 
153     /***
154      * {@inheritDoc}
155      */
156     public Policy getInMemoryEvictionPolicy() {
157         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getInMemoryEvictionPolicy();
158     }
159 
160     /***
161      * {@inheritDoc}
162      */
163     public Object getMBean() {
164         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getMBean();
165     }
166 
167     /***
168      * {@inheritDoc}
169      */
170     public int getOffHeapSize() {
171         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSize();
172     }
173 
174     /***
175      * {@inheritDoc}
176      */
177     public long getOffHeapSizeInBytes() {
178         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOffHeapSizeInBytes();
179     }
180 
181     /***
182      * {@inheritDoc}
183      */
184     public int getOnDiskSize() {
185         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSize();
186     }
187 
188     /***
189      * {@inheritDoc}
190      */
191     public long getOnDiskSizeInBytes() {
192         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getOnDiskSizeInBytes();
193     }
194 
195     /***
196      * {@inheritDoc}
197      */
198     public Status getStatus() {
199         return nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().getStatus();
200     }
201 
202     /***
203      * {@inheritDoc}
204      */
205     public void setInMemoryEvictionPolicy(Policy policy) {
206         nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setInMemoryEvictionPolicy(policy);
207     }
208 
209     /***
210      * {@inheritDoc}
211      */
212     public void expireElements() {
213         nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().expireElements();
214     }
215 
216     // -------------------------------------------------------
217     // All methods below delegate to the clusterAwareStore
218     // -------------------------------------------------------
219 
220     /***
221      * {@inheritDoc}
222      */
223     public void addStoreListener(StoreListener listener) {
224         executorServiceStore.addStoreListener(listener);
225     }
226 
227     /***
228      * {@inheritDoc}
229      */
230     public void removeStoreListener(StoreListener listener) {
231         executorServiceStore.removeStoreListener(listener);
232     }
233 
234     /***
235      * {@inheritDoc}
236      */
237     public void setAttributeExtractors(Map<String, AttributeExtractor> extractors) {
238         // this method should also go though to the underlying store(s) to avoid race conditions (DEV-5751)
239         nonstopActiveDelegateHolder.getUnderlyingTerracottaStore().setAttributeExtractors(extractors);
240     }
241 
242     /***
243      * {@inheritDoc}
244      */
245     public boolean isCacheCoherent() {
246         return executorServiceStore.isCacheCoherent();
247     }
248 
249     /***
250      * {@inheritDoc}
251      */
252     public boolean isClusterCoherent() {
253         return executorServiceStore.isClusterCoherent();
254     }
255 
256     /***
257      * {@inheritDoc}
258      */
259     public boolean isNodeCoherent() {
260         return executorServiceStore.isNodeCoherent();
261     }
262 
263     /***
264      * {@inheritDoc}
265      */
266     public void setNodeCoherent(boolean coherent) throws UnsupportedOperationException {
267         executorServiceStore.setNodeCoherent(coherent);
268     }
269 
270     /***
271      * {@inheritDoc}
272      */
273     public void dispose() {
274         executorServiceStore.dispose();
275     }
276 
277     /***
278      * {@inheritDoc}
279      */
280     public boolean containsKey(Object key) {
281         return executorServiceStore.containsKey(key);
282     }
283 
284     /***
285      * {@inheritDoc}
286      */
287     public boolean containsKeyInMemory(Object key) {
288         return executorServiceStore.containsKeyInMemory(key);
289     }
290 
291     /***
292      * {@inheritDoc}
293      */
294     public Results executeQuery(StoreQuery query) {
295         return executorServiceStore.executeQuery(query);
296     }
297 
298     /***
299      * {@inheritDoc}
300      */
301     public <T> Attribute<T> getSearchAttribute(String attributeName) {
302         return executorServiceStore.getSearchAttribute(attributeName);
303     }
304 
305     /***
306      * {@inheritDoc}
307      */
308     public void flush() throws IOException {
309         executorServiceStore.flush();
310     }
311 
312     /***
313      * {@inheritDoc}
314      */
315     public Element get(Object key) {
316         return executorServiceStore.get(key);
317     }
318 
319     /***
320      * {@inheritDoc}
321      */
322     public int getInMemorySize() {
323         return executorServiceStore.getInMemorySize();
324     }
325 
326     /***
327      * {@inheritDoc}
328      */
329     public long getInMemorySizeInBytes() {
330         return executorServiceStore.getInMemorySizeInBytes();
331     }
332 
333     /***
334      * {@inheritDoc}
335      */
336     public List getKeys() {
337         return executorServiceStore.getKeys();
338     }
339 
340     /***
341      * {@inheritDoc}
342      */
343     public Element getQuiet(Object key) {
344         return executorServiceStore.getQuiet(key);
345     }
346 
347     /***
348      * {@inheritDoc}
349      */
350     public int getSize() {
351         return executorServiceStore.getSize();
352     }
353 
354     /***
355      * {@inheritDoc}
356      */
357     public int getTerracottaClusteredSize() {
358         return executorServiceStore.getTerracottaClusteredSize();
359     }
360 
361     /***
362      * {@inheritDoc}
363      */
364     public boolean put(Element element) throws CacheException {
365         return executorServiceStore.put(element);
366     }
367 
368     /***
369      * {@inheritDoc}
370      */
371     public void putAll(Collection<Element> elements) throws CacheException {
372         executorServiceStore.putAll(elements);
373     }
374 
375     /***
376      * {@inheritDoc}
377      */
378     public Element putIfAbsent(Element element) throws NullPointerException {
379         return executorServiceStore.putIfAbsent(element);
380     }
381 
382     /***
383      * {@inheritDoc}
384      */
385     public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException {
386         return executorServiceStore.putWithWriter(element, writerManager);
387     }
388 
389     /***
390      * {@inheritDoc}
391      */
392     public Element remove(Object key) {
393         return executorServiceStore.remove(key);
394     }
395 
396     /***
397      * {@inheritDoc}
398      */
399     public void removeAll(final Collection<Object> keys) {
400         executorServiceStore.removeAll(keys);
401     }
402 
403     /***
404      * {@inheritDoc}
405      */
406     public void removeAll() throws CacheException {
407         executorServiceStore.removeAll();
408     }
409 
410     /***
411      * {@inheritDoc}
412      */
413     public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
414         return executorServiceStore.removeElement(element, comparator);
415     }
416 
417     /***
418      * {@inheritDoc}
419      */
420     public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException {
421         return executorServiceStore.removeWithWriter(key, writerManager);
422     }
423 
424     /***
425      * {@inheritDoc}
426      */
427     public boolean replace(Element old, Element element, ElementValueComparator comparator) throws NullPointerException,
428             IllegalArgumentException {
429         return executorServiceStore.replace(old, element, comparator);
430     }
431 
432     /***
433      * {@inheritDoc}
434      */
435     public Element replace(Element element) throws NullPointerException {
436         return executorServiceStore.replace(element);
437     }
438 
439     /***
440      * {@inheritDoc}
441      */
442     public Set getLocalKeys() {
443         return executorServiceStore.getLocalKeys();
444     }
445 
446     /***
447      * {@inheritDoc}
448      */
449     public Element unlockedGet(Object key) {
450         return executorServiceStore.unlockedGet(key);
451     }
452 
453     /***
454      * {@inheritDoc}
455      */
456     public Element unlockedGetQuiet(Object key) {
457         return executorServiceStore.unlockedGetQuiet(key);
458     }
459 
460     /***
461      * {@inheritDoc}
462      */
463     public Element unsafeGet(Object key) {
464         return executorServiceStore.unsafeGet(key);
465     }
466 
467     /***
468      * {@inheritDoc}
469      */
470     public Element unsafeGetQuiet(Object key) {
471         return executorServiceStore.unsafeGetQuiet(key);
472     }
473 
474     /***
475      * {@inheritDoc}
476      */
477     public <V> V executeClusterOperation(ClusterOperation<V> operation) {
478         return executorServiceStore.executeClusterOperation(operation);
479     }
480 
481     /***
482      * {@inheritDoc}
483      */
484     public void clusterRejoined() {
485         executorServiceStore.clusterRejoined();
486     }
487 
488 }