View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.ehcache.transaction.xa;
17  
18  import java.util.HashMap;
19  import java.util.Iterator;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  
26  import javax.transaction.RollbackException;
27  import javax.transaction.SystemException;
28  import javax.transaction.Transaction;
29  import javax.transaction.xa.XAException;
30  
31  import net.sf.ehcache.CacheEntry;
32  import net.sf.ehcache.CacheException;
33  import net.sf.ehcache.Ehcache;
34  import net.sf.ehcache.Element;
35  import net.sf.ehcache.search.attribute.AttributeExtractor;
36  import net.sf.ehcache.store.ElementValueComparator;
37  import net.sf.ehcache.store.Store;
38  import net.sf.ehcache.store.compound.ReadWriteCopyStrategy;
39  import net.sf.ehcache.transaction.AbstractTransactionStore;
40  import net.sf.ehcache.transaction.SoftLock;
41  import net.sf.ehcache.transaction.SoftLockFactory;
42  import net.sf.ehcache.transaction.TransactionAwareAttributeExtractor;
43  import net.sf.ehcache.transaction.TransactionException;
44  import net.sf.ehcache.transaction.TransactionIDFactory;
45  import net.sf.ehcache.transaction.TransactionInterruptedException;
46  import net.sf.ehcache.transaction.TransactionTimeoutException;
47  import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
48  import net.sf.ehcache.transaction.xa.commands.StorePutCommand;
49  import net.sf.ehcache.transaction.xa.commands.StoreRemoveCommand;
50  import net.sf.ehcache.util.LargeSet;
51  import net.sf.ehcache.util.SetWrapperList;
52  import net.sf.ehcache.writer.CacheWriterManager;
53  
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /***
58   * @author Ludovic Orban
59   */
60  public class XATransactionStore extends AbstractTransactionStore {
61  
62      private static final Logger LOG = LoggerFactory.getLogger(XATransactionStore.class.getName());
63      private static final long MILLISECOND_PER_SECOND = 1000L;
64  
65      private final TransactionManagerLookup transactionManagerLookup;
66      private final TransactionIDFactory transactionIdFactory;
67      private final SoftLockFactory softLockFactory;
68      private final Ehcache cache;
69  
70      private final ConcurrentHashMap<Transaction, EhcacheXAResource> transactionToXAResourceMap =
71              new ConcurrentHashMap<Transaction, EhcacheXAResource>();
72      private final ConcurrentHashMap<Transaction, Long> transactionToTimeoutMap = new ConcurrentHashMap<Transaction, Long>();
73  
74      /***
75       * Constructor
76       * @param transactionManagerLookup the transaction manager lookup implementation
77       * @param softLockFactory the soft lock factory
78       * @param transactionIdFactory the transaction ID factory
79       * @param cache the cache
80       * @param store the underlying store
81       * @param copyStrategy the original copy strategy
82       */
83      public XATransactionStore(TransactionManagerLookup transactionManagerLookup, SoftLockFactory softLockFactory,
84                                TransactionIDFactory transactionIdFactory, Ehcache cache, Store store,
85                                ReadWriteCopyStrategy<Element> copyStrategy) {
86          super(store, copyStrategy);
87          this.transactionManagerLookup = transactionManagerLookup;
88          this.transactionIdFactory = transactionIdFactory;
89          if (transactionManagerLookup.getTransactionManager() == null) {
90              throw new TransactionException("no JTA transaction manager could be located, cannot bind twopc cache with JTA");
91          }
92          this.softLockFactory = softLockFactory;
93          this.cache = cache;
94      }
95  
96      private Transaction getCurrentTransaction() throws SystemException {
97          Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
98          if (transaction == null) {
99              throw new TransactionException("JTA transaction not started");
100         }
101         return transaction;
102     }
103 
104     /***
105      * Get or create the XAResource of this XA store
106      * @return the EhcacheXAResource of this store
107      * @throws SystemException when something goes wrong with the transaction manager
108      */
109     public EhcacheXAResourceImpl getOrCreateXAResource() throws SystemException {
110         Transaction transaction = getCurrentTransaction();
111         EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
112         if (xaResource == null) {
113             LOG.debug("creating new XAResource");
114             xaResource = new EhcacheXAResourceImpl(cache, underlyingStore, transactionManagerLookup,
115                     softLockFactory, transactionIdFactory);
116             transactionToXAResourceMap.put(transaction, xaResource);
117             xaResource.addTwoPcExecutionListener(new CleanupXAResource(getCurrentTransaction()));
118         }
119         return xaResource;
120     }
121 
122     private XATransactionContext getTransactionContext() {
123         try {
124             Transaction transaction = getCurrentTransaction();
125             EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
126             if (xaResource == null) {
127                 return null;
128             }
129             XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
130 
131             if (transactionContext == null) {
132                 transactionManagerLookup.register(xaResource);
133                 LOG.debug("creating new XA context");
134                 transactionContext = xaResource.createTransactionContext();
135                 xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
136             } else {
137                 transactionContext = xaResource.getCurrentTransactionContext();
138             }
139 
140             LOG.debug("using XA context {}", transactionContext);
141             return transactionContext;
142         } catch (SystemException e) {
143             throw new TransactionException("cannot get the current transaction", e);
144         } catch (RollbackException e) {
145             throw new TransactionException("transaction rolled back", e);
146         }
147     }
148 
149     private XATransactionContext getOrCreateTransactionContext() {
150         try {
151             EhcacheXAResourceImpl xaResource = getOrCreateXAResource();
152             XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
153 
154             if (transactionContext == null) {
155                 transactionManagerLookup.register(xaResource);
156                 LOG.debug("creating new XA context");
157                 transactionContext = xaResource.createTransactionContext();
158                 xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
159             } else {
160                 transactionContext = xaResource.getCurrentTransactionContext();
161             }
162 
163             LOG.debug("using XA context {}", transactionContext);
164             return transactionContext;
165         } catch (SystemException e) {
166             throw new TransactionException("cannot get the current transaction", e);
167         } catch (RollbackException e) {
168             throw new TransactionException("transaction rolled back", e);
169         }
170     }
171 
172     /***
173      * This class is used to clean up the transactionToXAResourceMap after a transaction
174      * committed or rolled back.
175      */
176     private final class CleanupXAResource implements XAExecutionListener {
177         private final Transaction transaction;
178 
179         private CleanupXAResource(Transaction transaction) {
180             this.transaction = transaction;
181         }
182 
183         public void beforePrepare(EhcacheXAResource xaResource) {
184         }
185 
186         public void afterCommitOrRollback(EhcacheXAResource xaResource) {
187             transactionToXAResourceMap.remove(transaction);
188             transactionToTimeoutMap.remove(transaction);
189         }
190     }
191 
192     /***
193      * This class is used to unregister the XAResource after a transaction
194      * committed or rolled back.
195      */
196     private final class UnregisterXAResource implements XAExecutionListener {
197 
198         public void beforePrepare(EhcacheXAResource xaResource) {
199         }
200 
201         public void afterCommitOrRollback(EhcacheXAResource xaResource) {
202             transactionManagerLookup.unregister(xaResource);
203         }
204     }
205 
206 
207     /***
208      * @return milliseconds left before timeout
209      */
210     private long assertNotTimedOut() {
211         try {
212             if (Thread.interrupted()) {
213                 throw new TransactionInterruptedException("transaction interrupted");
214             }
215 
216             Transaction transaction = getCurrentTransaction();
217 
218             EhcacheXAResource xaResource = transactionToXAResourceMap.get(transaction);
219             Long timeoutTimestamp = transactionToTimeoutMap.get(transaction);
220             if (xaResource != null && timeoutTimestamp == null) {
221                 int xaResourceTimeout = xaResource.getTransactionTimeout();
222 
223                 timeoutTimestamp = System.currentTimeMillis() + (xaResourceTimeout * MILLISECOND_PER_SECOND);
224                 transactionToTimeoutMap.put(transaction, timeoutTimestamp);
225             } else if (timeoutTimestamp == null) {
226                 int defaultTransactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
227                 timeoutTimestamp = System.currentTimeMillis() + (defaultTransactionTimeout * MILLISECOND_PER_SECOND);
228                 transactionToTimeoutMap.put(transaction, timeoutTimestamp);
229             }
230 
231             if (timeoutTimestamp <= System.currentTimeMillis()) {
232                 throw new TransactionTimeoutException("transaction timed out");
233             }
234 
235             return timeoutTimestamp - System.currentTimeMillis();
236         } catch (SystemException e) {
237             throw new TransactionException("cannot get the current transaction", e);
238         } catch (XAException e) {
239             throw new TransactionException("cannot get the XAResource transaction timeout", e);
240         }
241     }
242 
243     /* transactional methods */
244 
245     /***
246      * {@inheritDoc}
247      */
248     public Element get(Object key) {
249         LOG.debug("cache {} get {}", cache.getName(), key);
250         XATransactionContext context = getTransactionContext();
251         Element element;
252         if (context == null) {
253             element = getFromUnderlyingStore(key);
254         } else {
255             element = context.get(key);
256             if (element == null && !context.isRemoved(key)) {
257                 element = getFromUnderlyingStore(key);
258             }
259         }
260         return copyElementForRead(element);
261     }
262 
263 
264     /***
265      * {@inheritDoc}
266      */
267     public Element getQuiet(Object key) {
268         LOG.debug("cache {} getQuiet {}", cache.getName(), key);
269         XATransactionContext context = getTransactionContext();
270         Element element;
271         if (context == null) {
272             element = getQuietFromUnderlyingStore(key);
273         } else {
274             element = context.get(key);
275             if (element == null && !context.isRemoved(key)) {
276                 element = getQuietFromUnderlyingStore(key);
277             }
278         }
279         return copyElementForRead(element);
280     }
281 
282     /***
283      * {@inheritDoc}
284      */
285     public int getSize() {
286         LOG.debug("cache {} getSize", cache.getName());
287         XATransactionContext context = getOrCreateTransactionContext();
288         int size = underlyingStore.getSize();
289         return size + context.getSizeModifier();
290     }
291 
292     /***
293      * {@inheritDoc}
294      */
295     public int getTerracottaClusteredSize() {
296         try {
297             Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
298             if (transaction == null) {
299                 return underlyingStore.getTerracottaClusteredSize();
300             }
301         } catch (SystemException se) {
302             throw new TransactionException("cannot get the current transaction", se);
303         }
304 
305         LOG.debug("cache {} getTerracottaClusteredSize", cache.getName());
306         XATransactionContext context = getOrCreateTransactionContext();
307         int size = underlyingStore.getTerracottaClusteredSize();
308         return size + context.getSizeModifier();
309     }
310 
311     /***
312      * {@inheritDoc}
313      */
314     public boolean containsKey(Object key) {
315         LOG.debug("cache {} containsKey", cache.getName(), key);
316         XATransactionContext context = getOrCreateTransactionContext();
317         return !context.isRemoved(key) && (context.getAddedKeys().contains(key) || underlyingStore.containsKey(key));
318     }
319 
320     /***
321      * {@inheritDoc}
322      */
323     public List getKeys() {
324         LOG.debug("cache {} getKeys", cache.getName());
325         XATransactionContext context = getOrCreateTransactionContext();
326         Set<Object> keys = new LargeSet<Object>() {
327 
328             @Override
329             public int sourceSize() {
330                 return underlyingStore.getSize();
331             }
332 
333             @Override
334             public Iterator<Object> sourceIterator() {
335                 return underlyingStore.getKeys().iterator();
336             }
337         };
338         keys.addAll(context.getAddedKeys());
339         keys.removeAll(context.getRemovedKeys());
340         return new SetWrapperList(keys);
341     }
342 
343 
344     private Element getFromUnderlyingStore(final Object key) {
345         while (true) {
346             long timeLeft = assertNotTimedOut();
347             LOG.debug("cache {} underlying.get key {} not timed out, time left: " + timeLeft, cache.getName(), key);
348 
349             Element element = underlyingStore.get(key);
350             if (element == null) {
351                 return null;
352             }
353             Object value = element.getObjectValue();
354             if (value instanceof SoftLock) {
355                 SoftLock softLock = (SoftLock) value;
356                 try {
357                     LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
358                     boolean gotLock = softLock.tryLock(timeLeft);
359                     if (gotLock) {
360                         softLock.clearTryLock();
361                     }
362                 } catch (InterruptedException e) {
363                     Thread.currentThread().interrupt();
364                 }
365             } else {
366                 return element;
367             }
368         }
369     }
370 
371     private Element getQuietFromUnderlyingStore(final Object key) {
372         while (true) {
373             long timeLeft = assertNotTimedOut();
374             LOG.debug("cache {} underlying.getQuiet key {} not timed out, time left: " + timeLeft, cache.getName(), key);
375 
376             Element element = underlyingStore.getQuiet(key);
377             if (element == null) {
378                 return null;
379             }
380             Object value = element.getObjectValue();
381             if (value instanceof SoftLock) {
382                 SoftLock softLock = (SoftLock) value;
383                 try {
384                     LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
385                     boolean gotLock = softLock.tryLock(timeLeft);
386                     if (gotLock) {
387                         softLock.clearTryLock();
388                     }
389                 } catch (InterruptedException e) {
390                     Thread.currentThread().interrupt();
391                 }
392             } else {
393                 return element;
394             }
395         }
396     }
397 
398     private Element getCurrentElement(final Object key, final XATransactionContext context) {
399         Element previous = context.get(key);
400         if (previous == null && !context.isRemoved(key)) {
401             previous = getQuietFromUnderlyingStore(key);
402         }
403         return previous;
404     }
405 
406     /***
407      * {@inheritDoc}
408      */
409     public boolean put(Element element) throws CacheException {
410         LOG.debug("cache {} put {}", cache.getName(), element);
411         // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
412         getOrCreateTransactionContext();
413 
414         Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
415         return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
416     }
417 
418     /***
419      * {@inheritDoc}
420      */
421     public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException {
422         LOG.debug("cache {} putWithWriter {}", cache.getName(), element);
423         // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
424         getOrCreateTransactionContext();
425 
426         Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
427         if (writerManager != null) {
428             writerManager.put(element);
429         } else {
430             cache.getWriterManager().put(element);
431         }
432         return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
433     }
434 
435     private boolean internalPut(final StorePutCommand putCommand) {
436         final Element element = putCommand.getElement();
437         boolean isNull;
438         if (element == null) {
439             return true;
440         }
441         XATransactionContext context = getOrCreateTransactionContext();
442         // In case this key is currently being updated...
443         isNull = underlyingStore.get(element.getKey()) == null;
444         if (isNull) {
445             isNull = context.get(element.getKey()) == null;
446         }
447         context.addCommand(putCommand, element);
448         return isNull;
449     }
450 
451 
452     /***
453      * {@inheritDoc}
454      */
455     public Element remove(Object key) {
456         LOG.debug("cache {} remove {}", cache.getName(), key);
457         // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
458         getOrCreateTransactionContext();
459 
460         Element oldElement = getQuietFromUnderlyingStore(key);
461         return removeInternal(new StoreRemoveCommand(key, oldElement));
462     }
463 
464     private Element removeInternal(final StoreRemoveCommand command) {
465         Element element = command.getEntry().getElement();
466         getOrCreateTransactionContext().addCommand(command, element);
467         return copyElementForRead(element);
468     }
469 
470     /***
471      * {@inheritDoc}
472      */
473     public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException {
474         LOG.debug("cache {} removeWithWriter {}", cache.getName(), key);
475         // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
476         getOrCreateTransactionContext();
477 
478         Element oldElement = getQuietFromUnderlyingStore(key);
479         if (writerManager != null) {
480             writerManager.remove(new CacheEntry(key, null));
481         } else {
482             cache.getWriterManager().remove(new CacheEntry(key, null));
483         }
484         return removeInternal(new StoreRemoveCommand(key, oldElement));
485     }
486 
487     /***
488      * {@inheritDoc}
489      */
490     public void removeAll() throws CacheException {
491         LOG.debug("cache {} removeAll", cache.getName());
492         List keys = getKeys();
493         for (Object key : keys) {
494             remove(key);
495         }
496     }
497 
498     /***
499      * {@inheritDoc}
500      */
501     public Element putIfAbsent(Element element) throws NullPointerException {
502         LOG.debug("cache {} putIfAbsent {}", cache.getName(), element);
503         XATransactionContext context = getOrCreateTransactionContext();
504         Element previous = getCurrentElement(element.getObjectKey(), context);
505 
506         if (previous == null) {
507             Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
508             Element elementForWrite = copyElementForWrite(element);
509             context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
510         }
511 
512         return copyElementForRead(previous);
513     }
514 
515     /***
516      * {@inheritDoc}
517      */
518     public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
519         LOG.debug("cache {} removeElement {}", cache.getName(), element);
520         XATransactionContext context = getOrCreateTransactionContext();
521         Element previous = getCurrentElement(element.getKey(), context);
522 
523         Element elementForWrite = copyElementForWrite(element);
524         if (previous != null && comparator.equals(previous, elementForWrite)) {
525             Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
526             context.addCommand(new StoreRemoveCommand(element.getObjectKey(), oldElement), elementForWrite);
527             return copyElementForRead(previous);
528         }
529         return null;
530     }
531 
532     /***
533      * {@inheritDoc}
534      */
535     public boolean replace(Element old, Element element, ElementValueComparator comparator)
536             throws NullPointerException, IllegalArgumentException {
537         LOG.debug("cache {} replace2 {}", cache.getName(), element);
538         XATransactionContext context = getOrCreateTransactionContext();
539         Element previous = getCurrentElement(element.getKey(), context);
540 
541         boolean replaced = false;
542         if (previous != null && comparator.equals(previous, copyElementForWrite(old))) {
543             Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
544             Element elementForWrite = copyElementForWrite(element);
545             context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
546             replaced = true;
547         }
548         return replaced;
549     }
550 
551     /***
552      * {@inheritDoc}
553      */
554     public Element replace(Element element) throws NullPointerException {
555         LOG.debug("cache {} replace1 {}", cache.getName(), element);
556         XATransactionContext context = getOrCreateTransactionContext();
557         Element previous = getCurrentElement(element.getKey(), context);
558 
559         if (previous != null) {
560             Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
561             Element elementForWrite = copyElementForWrite(element);
562             context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
563         }
564         return copyElementForRead(previous);
565     }
566 
567     /***
568      * {@inheritDoc}
569      */
570     @Override
571     public void setAttributeExtractors(Map<String, AttributeExtractor> extractors) {
572         Map<String, AttributeExtractor> wrappedExtractors = new HashMap(extractors.size());
573         for (Entry<String, AttributeExtractor> e : extractors.entrySet()) {
574             wrappedExtractors.put(e.getKey(), new TransactionAwareAttributeExtractor(copyStrategy, e.getValue()));
575         }
576         underlyingStore.setAttributeExtractors(wrappedExtractors);
577     }
578 
579 }