View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.ehcache.transaction.local;
17  
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import javax.transaction.RollbackException;
23  import javax.transaction.Synchronization;
24  import javax.transaction.SystemException;
25  import javax.transaction.Transaction;
26  import javax.transaction.TransactionManager;
27  import javax.transaction.xa.XAException;
28  import javax.transaction.xa.XAResource;
29  import javax.transaction.xa.Xid;
30  
31  import net.sf.ehcache.CacheEntry;
32  import net.sf.ehcache.CacheException;
33  import net.sf.ehcache.Ehcache;
34  import net.sf.ehcache.Element;
35  import net.sf.ehcache.TransactionController;
36  import net.sf.ehcache.store.ElementValueComparator;
37  import net.sf.ehcache.store.compound.NullReadWriteCopyStrategy;
38  import net.sf.ehcache.transaction.AbstractTransactionStore;
39  import net.sf.ehcache.transaction.TransactionException;
40  import net.sf.ehcache.transaction.TransactionID;
41  import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
42  import net.sf.ehcache.transaction.xa.EhcacheXAResource;
43  import net.sf.ehcache.transaction.xa.XAExecutionListener;
44  import net.sf.ehcache.transaction.xa.XATransactionContext;
45  import net.sf.ehcache.writer.CacheWriterManager;
46  
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  /***
51   * A Store implementation with support for local transactions driven by a JTA transaction manager
52   *
53   * @author Ludovic Orban
54   */
55  public class JtaLocalTransactionStore extends AbstractTransactionStore {
56  
57      private static final Logger LOG = LoggerFactory.getLogger(JtaLocalTransactionStore.class.getName());
58      private static final String ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME = "net.sf.ehcache.transaction.xa.alternativeTerminationMode";
59      private static final AtomicBoolean ATOMIKOS_WARNING_ISSUED = new AtomicBoolean(false);
60  
61      private static final ThreadLocal<Transaction> BOUND_JTA_TRANSACTIONS = new ThreadLocal<Transaction>();
62  
63      private final TransactionManagerLookup transactionManagerLookup;
64      private final TransactionController transactionController;
65      private final TransactionManager transactionManager;
66      private final Ehcache cache;
67  
68      /***
69       * Create a new JtaLocalTransactionStore instance
70       * @param underlyingStore the underlying LocalTransactionStore
71       * @param transactionManagerLookup the TransactionManagerLookup
72       * @param transactionController the TransactionController
73       */
74      public JtaLocalTransactionStore(LocalTransactionStore underlyingStore, TransactionManagerLookup transactionManagerLookup,
75                                      TransactionController transactionController) {
76          super(underlyingStore, new NullReadWriteCopyStrategy());
77          this.transactionManagerLookup = transactionManagerLookup;
78          this.transactionController = transactionController;
79          this.transactionManager = transactionManagerLookup.getTransactionManager();
80          if (this.transactionManager == null) {
81              throw new TransactionException("no JTA transaction manager could be located");
82          }
83          this.cache = underlyingStore.getCache();
84  
85          if (transactionManager.getClass().getName().contains("atomikos")) {
86              System.setProperty(ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME, "true");
87              if (ATOMIKOS_WARNING_ISSUED.compareAndSet(false, true)) {
88                  LOG.warn("Atomikos transaction manager detected, make sure you configured com.atomikos.icatch.threaded_2pc=false");
89              }
90          }
91      }
92  
93      private void registerInJtaContext() {
94          try {
95              if (transactionController.getCurrentTransactionContext() != null) {
96                  // already started local TX and registered in JTA
97  
98                  // make sure the JTA transaction hasn't changed (happens when TM.suspend() is called)
99                  Transaction tx = transactionManager.getTransaction();
100                 if (!BOUND_JTA_TRANSACTIONS.get().equals(tx)) {
101                     throw new TransactionException("Invalid JTA transaction context, cache was first used in transaction ["
102                             + BOUND_JTA_TRANSACTIONS.get() + "]" +
103                             " but is now used in transaction [" + tx + "].");
104                 }
105             } else {
106                 Transaction tx = transactionManager.getTransaction();
107                 if (tx == null) {
108                     throw new TransactionException("no JTA transaction context started, xa caches cannot be used outside of" +
109                             " JTA transactions");
110                 }
111                 BOUND_JTA_TRANSACTIONS.set(tx);
112 
113                 transactionController.begin();
114 
115                 // DEV-5376
116                 if (Boolean.getBoolean(ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME)) {
117 
118                     JtaLocalEhcacheXAResource xaRes = new JtaLocalEhcacheXAResource(transactionController,
119                             transactionController.getCurrentTransactionContext().getTransactionId());
120                     transactionManagerLookup.register(xaRes);
121                     tx.enlistResource(xaRes);
122                 } else {
123                     tx.registerSynchronization(new JtaLocalEhcacheSynchronization(transactionController,
124                             transactionController.getCurrentTransactionContext().getTransactionId()));
125                 }
126             }
127         } catch (SystemException e) {
128             throw new TransactionException("internal JTA transaction manager error, cannot bind xa cache with it", e);
129         } catch (RollbackException e) {
130             throw new TransactionException("JTA transaction rolled back, cannot bind xa cache with it", e);
131         }
132     }
133 
134     /***
135      * A Synchronization used to terminate the local transaction and clean it up
136      */
137     private static final class JtaLocalEhcacheSynchronization implements Synchronization {
138         private final TransactionController transactionController;
139         private final TransactionID transactionId;
140 
141         private JtaLocalEhcacheSynchronization(TransactionController transactionController, TransactionID transactionId) {
142             this.transactionController = transactionController;
143             this.transactionId = transactionId;
144         }
145 
146         public void beforeCompletion() {
147             //
148         }
149 
150         public void afterCompletion(int status) {
151             JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
152             if (status == javax.transaction.Status.STATUS_COMMITTED) {
153                 transactionController.commit(true);
154             } else if (status == javax.transaction.Status.STATUS_ROLLEDBACK) {
155                 transactionController.rollback();
156             } else {
157                 transactionController.rollback();
158                 LOG.warn("The transaction manager reported UNKNOWN transaction status upon termination." +
159                         " The ehcache transaction has been rolled back!");
160             }
161         }
162 
163         @Override
164         public String toString() {
165             return "JtaLocalEhcacheSynchronization of transaction [" + transactionId + "]";
166         }
167     }
168 
169     /***
170      * A XAResource implementation used to terminate the local transaction and clean it up.
171      *
172      * It should only be used with transaction managers providing a defective Synchronization
173      * mechanism with which rollback/commit cannot be reliably differentiated as it relies
174      * on the fact that the same thread is used to call Ehcache methods as well as XAResource
175      * which isn't guaranteed by the specification.
176      * This mechanism also has a slight performance impact as there is one extra resource
177      * participating in the 2PC.
178      */
179     private static final class JtaLocalEhcacheXAResource implements EhcacheXAResource {
180         private final TransactionController transactionController;
181         private final TransactionID transactionId;
182 
183         private JtaLocalEhcacheXAResource(TransactionController transactionController, TransactionID transactionId) {
184             this.transactionController = transactionController;
185             this.transactionId = transactionId;
186         }
187 
188         public void commit(Xid xid, boolean onePhase) throws XAException {
189             transactionController.commit(true);
190             JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
191         }
192 
193         public void end(Xid xid, int flag) throws XAException {
194             //
195         }
196 
197         public void forget(Xid xid) throws XAException {
198             //
199         }
200 
201         public int getTransactionTimeout() throws XAException {
202             return 0;
203         }
204 
205         public boolean isSameRM(XAResource xaResource) throws XAException {
206             return xaResource == this;
207         }
208 
209         public int prepare(Xid xid) throws XAException {
210             return XA_OK;
211         }
212 
213         public Xid[] recover(int flags) throws XAException {
214             return new Xid[0];
215         }
216 
217         public void rollback(Xid xid) throws XAException {
218             transactionController.rollback();
219             JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
220         }
221 
222         public boolean setTransactionTimeout(int timeout) throws XAException {
223             return false;
224         }
225 
226         public void start(Xid xid, int flag) throws XAException {
227             //
228         }
229 
230         public void addTwoPcExecutionListener(XAExecutionListener listener) {
231             throw new UnsupportedOperationException();
232         }
233 
234         public String getCacheName() {
235             return transactionId.toString();
236         }
237 
238         public XATransactionContext createTransactionContext() throws SystemException, RollbackException {
239             throw new UnsupportedOperationException();
240         }
241 
242         public XATransactionContext getCurrentTransactionContext() {
243             throw new UnsupportedOperationException();
244         }
245 
246         @Override
247         public String toString() {
248             return "JtaLocalEhcacheXAResource of transaction [" + transactionId + "]";
249         }
250     }
251 
252     private void setRollbackOnly() {
253         try {
254             BOUND_JTA_TRANSACTIONS.get().setRollbackOnly();
255             transactionController.setRollbackOnly();
256         } catch (SystemException e) {
257             LOG.warn("internal JTA transaction manager error", e);
258         }
259     }
260 
261     /* transactional methods */
262 
263     /***
264      * {@inheritDoc}
265      */
266     public boolean put(Element element) throws CacheException {
267         registerInJtaContext();
268         try {
269             return underlyingStore.put(element);
270         } catch (CacheException e) {
271             setRollbackOnly();
272             throw e;
273         }
274     }
275 
276     /***
277      * {@inheritDoc}
278      */
279     public void putAll(Collection<Element> elements) throws CacheException {
280         registerInJtaContext();
281         try {
282             underlyingStore.putAll(elements);
283         } catch (CacheException e) {
284             setRollbackOnly();
285             throw e;
286         }
287     }
288 
289     /***
290      * {@inheritDoc}
291      */
292     public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
293         registerInJtaContext();
294         try {
295             boolean put = underlyingStore.put(element);
296             transactionManager.getTransaction().registerSynchronization(new Synchronization() {
297                 public void beforeCompletion() {
298                     if (writerManager != null) {
299                         writerManager.put(element);
300                     } else {
301                         cache.getWriterManager().put(element);
302                     }
303                 }
304                 public void afterCompletion(int status) {
305                     //
306                 }
307             });
308             return put;
309         } catch (CacheException e) {
310             setRollbackOnly();
311             throw e;
312         } catch (RollbackException e) {
313             throw new TransactionException("error registering writer synchronization", e);
314         } catch (SystemException e) {
315             throw new TransactionException("error registering writer synchronization", e);
316         }
317     }
318 
319     /***
320      * {@inheritDoc}
321      */
322     public Element get(Object key) {
323         registerInJtaContext();
324         try {
325             return underlyingStore.get(key);
326         } catch (CacheException e) {
327             setRollbackOnly();
328             throw e;
329         }
330     }
331 
332     /***
333      * {@inheritDoc}
334      */
335     public Element getQuiet(Object key) {
336         registerInJtaContext();
337         try {
338             return underlyingStore.getQuiet(key);
339         } catch (CacheException e) {
340             setRollbackOnly();
341             throw e;
342         }
343     }
344 
345     /***
346      * {@inheritDoc}
347      */
348     public List getKeys() {
349         registerInJtaContext();
350         try {
351             return underlyingStore.getKeys();
352         } catch (CacheException e) {
353             setRollbackOnly();
354             throw e;
355         }
356     }
357 
358     /***
359      * {@inheritDoc}
360      */
361     public Element remove(Object key) {
362         registerInJtaContext();
363         try {
364             return underlyingStore.remove(key);
365         } catch (CacheException e) {
366             setRollbackOnly();
367             throw e;
368         }
369     }
370 
371 
372     /***
373      * {@inheritDoc}
374      */
375     public void removeAll(Collection<Object> keys) {
376         registerInJtaContext();
377         try {
378             underlyingStore.removeAll(keys);
379         } catch (CacheException e) {
380             setRollbackOnly();
381             throw e;
382         }
383     }
384 
385     /***
386      * {@inheritDoc}
387      */
388     public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
389         registerInJtaContext();
390         try {
391             Element removed = underlyingStore.remove(key);
392             final CacheEntry cacheEntry = new CacheEntry(key, getQuiet(key));
393             transactionManager.getTransaction().registerSynchronization(new Synchronization() {
394                 public void beforeCompletion() {
395                     if (writerManager != null) {
396                         writerManager.remove(cacheEntry);
397                     } else {
398                         cache.getWriterManager().remove(cacheEntry);
399                     }
400                 }
401                 public void afterCompletion(int status) {
402                     //
403                 }
404             });
405             return removed;
406         } catch (CacheException e) {
407             setRollbackOnly();
408             throw e;
409         } catch (RollbackException e) {
410             throw new TransactionException("error registering writer synchronization", e);
411         } catch (SystemException e) {
412             throw new TransactionException("error registering writer synchronization", e);
413         }
414     }
415 
416     /***
417      * {@inheritDoc}
418      */
419     public void removeAll() throws CacheException {
420         registerInJtaContext();
421         try {
422             underlyingStore.removeAll();
423         } catch (CacheException e) {
424             setRollbackOnly();
425             throw e;
426         }
427     }
428 
429     /***
430      * {@inheritDoc}
431      */
432     public Element putIfAbsent(Element element) throws NullPointerException {
433         registerInJtaContext();
434         try {
435             return underlyingStore.putIfAbsent(element);
436         } catch (CacheException e) {
437             setRollbackOnly();
438             throw e;
439         }
440     }
441 
442     /***
443      * {@inheritDoc}
444      */
445     public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
446         registerInJtaContext();
447         try {
448             return underlyingStore.removeElement(element, comparator);
449         } catch (CacheException e) {
450             setRollbackOnly();
451             throw e;
452         }
453     }
454 
455     /***
456      * {@inheritDoc}
457      */
458     public boolean replace(Element old, Element element, ElementValueComparator comparator)
459             throws NullPointerException, IllegalArgumentException {
460         registerInJtaContext();
461         try {
462             return underlyingStore.replace(old, element, comparator);
463         } catch (CacheException e) {
464             setRollbackOnly();
465             throw e;
466         }
467     }
468 
469     /***
470      * {@inheritDoc}
471      */
472     public Element replace(Element element) throws NullPointerException {
473         registerInJtaContext();
474         try {
475             return underlyingStore.replace(element);
476         } catch (CacheException e) {
477             setRollbackOnly();
478             throw e;
479         }
480     }
481 
482     /***
483      * {@inheritDoc}
484      */
485     public int getSize() {
486         registerInJtaContext();
487         try {
488             return underlyingStore.getSize();
489         } catch (CacheException e) {
490             setRollbackOnly();
491             throw e;
492         }
493     }
494 
495     /***
496      * {@inheritDoc}
497      */
498     public int getTerracottaClusteredSize() {
499         if (transactionController.getCurrentTransactionContext() == null) {
500             return underlyingStore.getTerracottaClusteredSize();
501         }
502 
503         registerInJtaContext();
504         try {
505             return underlyingStore.getTerracottaClusteredSize();
506         } catch (CacheException e) {
507             setRollbackOnly();
508             throw e;
509         }
510     }
511 
512     /***
513      * {@inheritDoc}
514      */
515     public boolean containsKey(Object key) {
516         registerInJtaContext();
517         try {
518             return underlyingStore.containsKey(key);
519         } catch (CacheException e) {
520             setRollbackOnly();
521             throw e;
522         }
523     }
524 
525 
526 
527 }
528