View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop.store;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Element;
21  import net.sf.ehcache.cluster.CacheCluster;
22  import net.sf.ehcache.config.NonstopConfiguration;
23  import net.sf.ehcache.constructs.nonstop.NonstopActiveDelegateHolder;
24  import net.sf.ehcache.constructs.nonstop.concurrency.ExplicitLockingContextThreadLocal;
25  import net.sf.ehcache.store.ElementValueComparator;
26  import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
27  import net.sf.ehcache.writer.CacheWriterManager;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  
31  import javax.transaction.InvalidTransactionException;
32  import javax.transaction.SystemException;
33  import javax.transaction.Transaction;
34  import javax.transaction.TransactionManager;
35  import java.util.List;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.TimeoutException;
38  
39  /***
40   * This implementation is identical to TransactionalExecutorServiceStore except that it ensures the transactional context
41   * gets propagated to the executor thread.
42   * <p/>
43   *
44   * @see ExecutorServiceStore
45   * @author Ludovic Orban
46   *
47   */
48  public class TransactionalExecutorServiceStore extends ExecutorServiceStore {
49  
50      private static final Logger LOG = LoggerFactory.getLogger(TransactionalExecutorServiceStore.class.getName());
51  
52      private final TransactionManager transactionManager;
53  
54      /***
55       * Constructor
56       * @param explicitLockingContextThreadLocal
57       */
58      public TransactionalExecutorServiceStore(final NonstopActiveDelegateHolder nonstopActiveDelegateHolder,
59              final NonstopConfiguration nonstopConfiguration, final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver,
60              CacheCluster cacheCluster, TransactionManagerLookup transactionManagerLookup,
61              ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal) {
62          super(nonstopActiveDelegateHolder, nonstopConfiguration, timeoutBehaviorResolver, cacheCluster, explicitLockingContextThreadLocal);
63          this.transactionManager = transactionManagerLookup.getTransactionManager();
64      }
65  
66      /***
67       * {@inheritDoc}.
68       */
69      @Override
70      public boolean put(final Element element) throws CacheException {
71          boolean rv = false;
72          final Transaction tx = suspendCaller();
73          try {
74              rv = executeWithExecutor(new Callable<Boolean>() {
75                  public Boolean call() throws Exception {
76                      resumeCallee(tx);
77                      try {
78                          return underlyingTerracottaStore().put(element);
79                      } finally {
80                          suspendCallee();
81                      }
82                  }
83              });
84          } catch (TimeoutException e) {
85              return resolveTimeoutBehaviorStore().put(element);
86          } finally {
87              resumeCaller(tx);
88          }
89  
90          return rv;
91      }
92  
93      /***
94       * {@inheritDoc}.
95       */
96      @Override
97      public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
98          boolean rv = false;
99          final Transaction tx = suspendCaller();
100         try {
101             rv = executeWithExecutor(new Callable<Boolean>() {
102                 public Boolean call() throws Exception {
103                     resumeCallee(tx);
104                     try {
105                         return underlyingTerracottaStore().putWithWriter(element, writerManager);
106                     } finally {
107                         suspendCallee();
108                     }
109                 }
110             });
111         } catch (TimeoutException e) {
112             return resolveTimeoutBehaviorStore().putWithWriter(element, writerManager);
113         } finally {
114             resumeCaller(tx);
115         }
116         return rv;
117     }
118 
119     /***
120      * {@inheritDoc}.
121      */
122     @Override
123     public Element get(final Object key) {
124         Element rv = null;
125         final Transaction tx = suspendCaller();
126         try {
127             rv = executeWithExecutor(new Callable<Element>() {
128                 public Element call() throws Exception {
129                     resumeCallee(tx);
130                     try {
131                         return underlyingTerracottaStore().get(key);
132                     } finally {
133                         suspendCallee();
134                     }
135                 }
136             });
137         } catch (TimeoutException e) {
138             return resolveTimeoutBehaviorStore().get(key);
139         } finally {
140             resumeCaller(tx);
141         }
142         return rv;
143     }
144 
145     /***
146      * {@inheritDoc}.
147      */
148     @Override
149     public Element getQuiet(final Object key) {
150         Element rv = null;
151         final Transaction tx = suspendCaller();
152         try {
153             rv = executeWithExecutor(new Callable<Element>() {
154                 public Element call() throws Exception {
155                     resumeCallee(tx);
156                     try {
157                         return underlyingTerracottaStore().getQuiet(key);
158                     } finally {
159                         suspendCallee();
160                     }
161                 }
162             });
163         } catch (TimeoutException e) {
164             return resolveTimeoutBehaviorStore().getQuiet(key);
165         } finally {
166             resumeCaller(tx);
167         }
168         return rv;
169     }
170 
171     /***
172      * {@inheritDoc}.
173      */
174     @Override
175     public List getKeys() {
176         List rv = null;
177         final Transaction tx = suspendCaller();
178         try {
179             rv = executeWithExecutor(new Callable<List>() {
180                 public List call() throws Exception {
181                     resumeCallee(tx);
182                     try {
183                         return underlyingTerracottaStore().getKeys();
184                     } finally {
185                         suspendCallee();
186                     }
187                 }
188             });
189         } catch (TimeoutException e) {
190             return resolveTimeoutBehaviorStore().getKeys();
191         } finally {
192             resumeCaller(tx);
193         }
194         return rv;
195     }
196 
197     /***
198      * {@inheritDoc}.
199      */
200     @Override
201     public Element remove(final Object key) {
202         Element rv = null;
203         final Transaction tx = suspendCaller();
204         try {
205             rv = executeWithExecutor(new Callable<Element>() {
206                 public Element call() throws Exception {
207                     resumeCallee(tx);
208                     try {
209                         return underlyingTerracottaStore().remove(key);
210                     } finally {
211                         suspendCallee();
212                     }
213                 }
214             });
215         } catch (TimeoutException e) {
216             return resolveTimeoutBehaviorStore().remove(key);
217         } finally {
218             resumeCaller(tx);
219         }
220         return rv;
221     }
222 
223     /***
224      * {@inheritDoc}.
225      */
226     @Override
227     public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
228         Element rv = null;
229         final Transaction tx = suspendCaller();
230         try {
231             rv = executeWithExecutor(new Callable<Element>() {
232                 public Element call() throws Exception {
233                     resumeCallee(tx);
234                     try {
235                         return underlyingTerracottaStore().removeWithWriter(key, writerManager);
236                     } finally {
237                         suspendCallee();
238                     }
239                 }
240             });
241         } catch (TimeoutException e) {
242             return resolveTimeoutBehaviorStore().removeWithWriter(key, writerManager);
243         } finally {
244             resumeCaller(tx);
245         }
246         return rv;
247     }
248 
249     /***
250      * {@inheritDoc}.
251      * The timeout used by this method is {@link net.sf.ehcache.config.NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
252      * config.
253      */
254     @Override
255     public void removeAll() throws CacheException {
256         final Transaction tx = suspendCaller();
257         try {
258             executeWithExecutor(new Callable<Void>() {
259                 public Void call() throws Exception {
260                     resumeCallee(tx);
261                     try {
262                         underlyingTerracottaStore().removeAll();
263                         return null;
264                     } finally {
265                         suspendCallee();
266                     }
267                 }
268             }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
269         } catch (TimeoutException e) {
270             resolveTimeoutBehaviorStore().removeAll();
271         } finally {
272             resumeCaller(tx);
273         }
274     }
275 
276     /***
277      * {@inheritDoc}.
278      */
279     @Override
280     public Element putIfAbsent(final Element element) throws NullPointerException {
281         Element rv = null;
282         final Transaction tx = suspendCaller();
283         try {
284             rv = executeWithExecutor(new Callable<Element>() {
285                 public Element call() throws Exception {
286                     resumeCallee(tx);
287                     try {
288                         return underlyingTerracottaStore().putIfAbsent(element);
289                     } finally {
290                         suspendCallee();
291                     }
292                 }
293             });
294         } catch (TimeoutException e) {
295             return resolveTimeoutBehaviorStore().putIfAbsent(element);
296         } finally {
297             resumeCaller(tx);
298         }
299         return rv;
300     }
301 
302     /***
303      * {@inheritDoc}.
304      */
305     @Override
306     public Element removeElement(final Element element, final ElementValueComparator comparator) throws NullPointerException {
307         Element rv = null;
308         final Transaction tx = suspendCaller();
309         try {
310             rv = executeWithExecutor(new Callable<Element>() {
311                 public Element call() throws Exception {
312                     resumeCallee(tx);
313                     try {
314                         return underlyingTerracottaStore().removeElement(element, comparator);
315                     } finally {
316                         suspendCallee();
317                     }
318                 }
319             });
320         } catch (TimeoutException e) {
321             return resolveTimeoutBehaviorStore().removeElement(element, comparator);
322         } finally {
323             resumeCaller(tx);
324         }
325         return rv;
326     }
327 
328     /***
329      * {@inheritDoc}.
330      */
331     @Override
332     public boolean replace(final Element old, final Element element, final ElementValueComparator comparator) throws NullPointerException,
333             IllegalArgumentException {
334         boolean rv = false;
335         final Transaction tx = suspendCaller();
336         try {
337             rv = executeWithExecutor(new Callable<Boolean>() {
338                 public Boolean call() throws Exception {
339                     resumeCallee(tx);
340                     try {
341                         return underlyingTerracottaStore().replace(old, element, comparator);
342                     } finally {
343                         suspendCallee();
344                     }
345                 }
346             });
347         } catch (TimeoutException e) {
348             return resolveTimeoutBehaviorStore().replace(old, element, comparator);
349         } finally {
350             resumeCaller(tx);
351         }
352         return rv;
353     }
354 
355     /***
356      * {@inheritDoc}.
357      */
358     @Override
359     public Element replace(final Element element) throws NullPointerException {
360         Element rv = null;
361         final Transaction tx = suspendCaller();
362         try {
363             rv = executeWithExecutor(new Callable<Element>() {
364                 public Element call() throws Exception {
365                     resumeCallee(tx);
366                     try {
367                         return underlyingTerracottaStore().replace(element);
368                     } finally {
369                         suspendCallee();
370                     }
371                 }
372             });
373         } catch (TimeoutException e) {
374             return resolveTimeoutBehaviorStore().replace(element);
375         } finally {
376             resumeCaller(tx);
377         }
378         return rv;
379     }
380 
381     /***
382      * {@inheritDoc}.
383      */
384     @Override
385     public int getSize() {
386         int rv = 0;
387         final Transaction tx = suspendCaller();
388         try {
389             rv = executeWithExecutor(new Callable<Integer>() {
390                 public Integer call() throws Exception {
391                     resumeCallee(tx);
392                     try {
393                         return underlyingTerracottaStore().getSize();
394                     } finally {
395                         suspendCallee();
396                     }
397                 }
398             });
399         } catch (TimeoutException e) {
400             return resolveTimeoutBehaviorStore().getSize();
401         } finally {
402             resumeCaller(tx);
403         }
404         return rv;
405     }
406 
407 
408     /***
409      * {@inheritDoc}.
410      */
411     @Override
412     public int getTerracottaClusteredSize() {
413         int rv = 0;
414         final Transaction tx = suspendCaller();
415         try {
416             rv = executeWithExecutor(new Callable<Integer>() {
417                 public Integer call() throws Exception {
418                     resumeCallee(tx);
419                     try {
420                         return underlyingTerracottaStore().getTerracottaClusteredSize();
421                     } finally {
422                         suspendCallee();
423                     }
424                 }
425             });
426         } catch (TimeoutException e) {
427             return resolveTimeoutBehaviorStore().getTerracottaClusteredSize();
428         } finally {
429             resumeCaller(tx);
430         }
431         return rv;
432     }
433 
434     /***
435      * {@inheritDoc}.
436      */
437     @Override
438     public boolean containsKey(final Object key) {
439         boolean rv = false;
440         final Transaction tx = suspendCaller();
441         try {
442             rv = executeWithExecutor(new Callable<Boolean>() {
443                 public Boolean call() throws Exception {
444                     resumeCallee(tx);
445                     try {
446                         return underlyingTerracottaStore().containsKey(key);
447                     } finally {
448                         suspendCallee();
449                     }
450                 }
451             });
452         } catch (TimeoutException e) {
453             return resolveTimeoutBehaviorStore().containsKey(key);
454         } finally {
455             resumeCaller(tx);
456         }
457         return rv;
458     }
459 
460 
461 
462     private void resumeCaller(Transaction tx) {
463         if (tx == null) {
464             return;
465         }
466 
467         try {
468             transactionManager.resume(tx);
469         } catch (IllegalStateException ise) {
470             LOG.warn("error resuming JTA transaction context on caller thread", ise);
471         } catch (InvalidTransactionException ite) {
472             LOG.warn("error resuming JTA transaction context on caller thread", ite);
473         } catch (SystemException se) {
474             LOG.warn("error resuming JTA transaction context on caller thread", se);
475         }
476     }
477 
478     private Transaction suspendCaller() {
479         try {
480             return transactionManager.suspend();
481         } catch (SystemException se) {
482             throw new CacheException("error suspending JTA transaction context", se);
483         }
484     }
485 
486     private void resumeCallee(Transaction tx) {
487         if (tx == null) {
488             return;
489         }
490 
491         try {
492             transactionManager.resume(tx);
493         } catch (IllegalStateException ise) {
494             throw new CacheException("error resuming JTA transaction context on caller thread", ise);
495         } catch (InvalidTransactionException ite) {
496             throw new CacheException("error resuming JTA transaction context on caller thread", ite);
497         } catch (SystemException se) {
498             throw new CacheException("error resuming JTA transaction context on caller thread", se);
499         }
500     }
501 
502     private Transaction suspendCallee() {
503         try {
504             return transactionManager.suspend();
505         } catch (SystemException se) {
506             LOG.warn("error suspending JTA transaction context", se);
507         }
508         return null;
509     }
510 
511 }