View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  package net.sf.ehcache.transaction.xa;
17  
18  import net.sf.ehcache.CacheException;
19  import net.sf.ehcache.Ehcache;
20  import net.sf.ehcache.Element;
21  import net.sf.ehcache.statistics.LiveCacheStatisticsWrapper;
22  import net.sf.ehcache.store.ElementValueComparator;
23  import net.sf.ehcache.store.Store;
24  import net.sf.ehcache.store.chm.ConcurrentHashMap;
25  import net.sf.ehcache.transaction.SoftLock;
26  import net.sf.ehcache.transaction.SoftLockFactory;
27  import net.sf.ehcache.transaction.TransactionID;
28  import net.sf.ehcache.transaction.TransactionIDFactory;
29  import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
30  import net.sf.ehcache.transaction.xa.commands.Command;
31  import net.sf.ehcache.transaction.xa.processor.XARequestProcessor;
32  import net.sf.ehcache.transaction.xa.processor.XARequest;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import javax.transaction.RollbackException;
37  import javax.transaction.SystemException;
38  import javax.transaction.Transaction;
39  import javax.transaction.TransactionManager;
40  import javax.transaction.xa.XAException;
41  import javax.transaction.xa.XAResource;
42  import javax.transaction.xa.Xid;
43  import java.util.ArrayList;
44  import java.util.Arrays;
45  import java.util.Collections;
46  import java.util.HashSet;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Set;
50  import java.util.concurrent.ConcurrentMap;
51  
52  /***
53   * The EhcacheXAResource implementation
54   *
55   * @author Ludovic Orban
56   */
57  public class EhcacheXAResourceImpl implements EhcacheXAResource {
58  
59      private static final Logger LOG = LoggerFactory.getLogger(EhcacheXAResourceImpl.class.getName());
60      private static final long MILLISECOND_PER_SECOND = 1000L;
61  
62      private final Ehcache cache;
63      private final Store underlyingStore;
64      private final TransactionIDFactory transactionIDFactory;
65      private final TransactionManager txnManager;
66      private final SoftLockFactory softLockFactory;
67      private final ConcurrentMap<Xid, XATransactionContext> xidToContextMap = new ConcurrentHashMap<Xid, XATransactionContext>();
68      private final XARequestProcessor processor;
69      private volatile Xid currentXid;
70      private volatile int transactionTimeout;
71      private final List<XAExecutionListener> listeners = new ArrayList<XAExecutionListener>();
72      private final ElementValueComparator comparator;
73  
74      /***
75       * Constructor
76       * @param cache the cache
77       * @param underlyingStore the underlying store
78       * @param txnManagerLookup the transaction manager lookup
79       * @param softLockFactory the soft lock factory
80       * @param transactionIDFactory the transaction ID factory
81       */
82      public EhcacheXAResourceImpl(Ehcache cache, Store underlyingStore, TransactionManagerLookup txnManagerLookup,
83                                   SoftLockFactory softLockFactory, TransactionIDFactory transactionIDFactory) {
84          this.cache = cache;
85          this.underlyingStore = underlyingStore;
86          this.transactionIDFactory = transactionIDFactory;
87          this.txnManager = txnManagerLookup.getTransactionManager();
88          this.softLockFactory = softLockFactory;
89          this.processor = new XARequestProcessor(this);
90          this.transactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
91          this.comparator = cache.getCacheConfiguration().getElementValueComparatorConfiguration().getElementComparatorInstance();
92      }
93  
94      /***
95       * {@inheritDoc}
96       */
97      public void start(Xid xid, int flag) throws XAException {
98          LOG.debug("start [{}] [{}]", xid, prettyPrintXAResourceFlags(flag));
99  
100         if (currentXid != null) {
101             throw new EhcacheXAException("resource already started on " + currentXid, XAException.XAER_PROTO);
102         }
103 
104         if (flag == TMNOFLAGS) {
105             if (xidToContextMap.containsKey(xid)) {
106                 throw new EhcacheXAException("cannot start with duplicate XID: " + xid, XAException.XAER_DUPID);
107             }
108             currentXid = xid;
109         } else if (flag == TMRESUME) {
110             if (!xidToContextMap.containsKey(xid)) {
111                 throw new EhcacheXAException("cannot resume non-existent XID: " + xid, XAException.XAER_NOTA);
112             }
113             currentXid = xid;
114         } else if (flag == TMJOIN) {
115             currentXid = xid;
116         } else {
117             throw new EhcacheXAException("unsupported flag: " + flag, XAException.XAER_PROTO);
118         }
119     }
120 
121     /***
122      * {@inheritDoc}
123      */
124     public void end(Xid xid, int flag) throws XAException {
125         LOG.debug("end [{}] [{}]", xid, prettyPrintXAResourceFlags(flag));
126 
127         if (currentXid == null) {
128             throw new EhcacheXAException("resource not started on " + xid, XAException.XAER_PROTO);
129         }
130 
131         if (flag == TMSUCCESS || flag == TMSUSPEND) {
132             if (!currentXid.equals(xid)) {
133                 throw new EhcacheXAException("cannot end working on unknown XID " + xid, XAException.XAER_NOTA);
134             }
135             currentXid = null;
136         } else if (flag == TMFAIL) {
137             if (!currentXid.equals(xid)) {
138                 throw new EhcacheXAException("cannot end working on " + xid + " while work on current XID " + currentXid + " hasn't ended",
139                         XAException.XAER_PROTO);
140             }
141             xidToContextMap.remove(xid);
142             currentXid = null;
143         } else {
144             throw new EhcacheXAException("unsupported flag: " + flag, XAException.XAER_PROTO);
145         }
146     }
147 
148     /***
149      * {@inheritDoc}
150      */
151     public void forget(Xid xid) throws XAException {
152         LOG.debug("forget [{}]", xid);
153 
154         processor.process(new XARequest(XARequest.RequestType.FORGET, xid));
155     }
156 
157     /***
158      * The forget implementation
159      * @param xid a XID
160      * @throws XAException when an error occurs
161      */
162     public void forgetInternal(Xid xid) throws XAException {
163         List<Xid> xids = Arrays.asList(recover(TMSTARTRSCAN));
164         if (!xids.contains(xid)) {
165             throw new EhcacheXAException("forget called on in-doubt XID" + xid, XAException.XAER_PROTO);
166         }
167     }
168 
169     /***
170      * {@inheritDoc}
171      */
172     public int getTransactionTimeout() throws XAException {
173         return transactionTimeout;
174     }
175 
176     /***
177      * {@inheritDoc}
178      */
179     public boolean isSameRM(XAResource xaResource) throws XAException {
180         boolean b = xaResource == this;
181         LOG.debug("{} isSameRm {} -> " + b, this, xaResource);
182         return b;
183     }
184 
185     /***
186      * {@inheritDoc}
187      */
188     public int prepare(Xid xid) throws XAException {
189         LOG.debug("prepare [{}]", xid);
190 
191         if (currentXid != null) {
192             throw new EhcacheXAException("prepare called on non-ended XID: " + xid, XAException.XAER_PROTO);
193         }
194         return processor.process(new XARequest(XARequest.RequestType.PREPARE, xid));
195     }
196 
197     /***
198      * The prepare implementation
199      * @param xid a XID
200      * @return XA_OK or XA_RDONLY
201      * @throws XAException when an error occurs
202      */
203     public int prepareInternal(Xid xid) throws XAException {
204         fireBeforePrepare();
205 
206         XATransactionContext twopcTransactionContext = xidToContextMap.get(xid);
207         if (twopcTransactionContext == null) {
208             throw new EhcacheXAException("transaction never started: " + xid, XAException.XAER_NOTA);
209         }
210 
211 
212         XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
213 
214         List<Command> commands = twopcTransactionContext.getCommands();
215         List<Command> preparedCommands = new LinkedList<Command>();
216 
217         boolean prepareUpdated = false;
218         LOG.debug("preparing {} command(s) for [{}]", commands.size(), xid);
219         for (Command command : commands) {
220             try {
221                 prepareUpdated |= command.prepare(underlyingStore, softLockFactory, xidTransactionID, comparator);
222                 preparedCommands.add(0, command);
223             } catch (OptimisticLockFailureException ie) {
224                 for (Command preparedCommand : preparedCommands) {
225                     preparedCommand.rollback(underlyingStore);
226                 }
227                 preparedCommands.clear();
228                 throw new EhcacheXAException(command + " failed because value changed between execution and 2PC",
229                         XAException.XA_RBINTEGRITY, ie);
230             }
231         }
232 
233         xidToContextMap.remove(xid);
234 
235         if (!prepareUpdated) {
236             rollbackInternal(xid);
237         }
238 
239         LOG.debug("prepared xid [{}] read only? {}", xid, !prepareUpdated);
240         return prepareUpdated ? XA_OK : XA_RDONLY;
241     }
242 
243     /***
244      * {@inheritDoc}
245      */
246     public void commit(Xid xid, boolean onePhase) throws XAException {
247         LOG.debug("commit [{}] [{}]", xid, onePhase);
248 
249         if (currentXid != null) {
250             throw new EhcacheXAException("commit called on non-ended XID: " + xid, XAException.XAER_PROTO);
251         }
252         this.processor.process(new XARequest(XARequest.RequestType.COMMIT, xid, onePhase));
253     }
254 
255     /***
256      * The commit implementation
257      * @param xid a XID
258      * @param onePhase true if onePhase, false otherwise
259      * @throws XAException when an error occurs
260      */
261     public void commitInternal(Xid xid, boolean onePhase) throws XAException {
262         LiveCacheStatisticsWrapper liveCacheStatisticsWrapper = (LiveCacheStatisticsWrapper) cache.getLiveCacheStatistics();
263         liveCacheStatisticsWrapper.xaCommit();
264         if (onePhase) {
265             XATransactionContext twopcTransactionContext = xidToContextMap.get(xid);
266             if (twopcTransactionContext == null) {
267                 throw new EhcacheXAException("cannot call commit(onePhase=true) after prepare", XAException.XAER_PROTO);
268             }
269 
270             int rc = prepareInternal(xid);
271             if (rc == XA_RDONLY) {
272                 return;
273             }
274         }
275 
276         XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
277         Set<SoftLock> softLocks = softLockFactory.collectAllSoftLocksForTransactionID(xidTransactionID);
278         LOG.debug("committing {} soft lock(s) for [{}]", softLocks.size(), xid);
279         for (SoftLock softLock : softLocks) {
280             if (softLock.isExpired()) {
281                 softLock.lock();
282                 softLock.freeze();
283             }
284         }
285 
286         for (SoftLock softLock : softLocks) {
287             try {
288                 softLock.getTransactionID().markForCommit();
289             } catch (IllegalStateException ise) {
290                 throw new EhcacheXAException("XID already was rolling back: " + xid, XAException.XAER_RMERR);
291             }
292 
293             Element frozenElement = softLock.getFrozenElement();
294 
295             if (frozenElement != null) {
296                 underlyingStore.put(frozenElement);
297             } else {
298                 underlyingStore.remove(softLock.getKey());
299             }
300         }
301 
302         for (SoftLock softLock : softLocks) {
303             softLock.unfreeze();
304             softLock.unlock();
305         }
306 
307 
308         fireAfterCommitOrRollback();
309     }
310 
311     /***
312      * {@inheritDoc}
313      */
314     public Xid[] recover(int flags) throws XAException {
315         LOG.debug("recover [{}]", prettyPrintXAResourceFlags(flags));
316 
317         if ((flags & TMSTARTRSCAN) != TMSTARTRSCAN) {
318             return new Xid[0];
319         }
320 
321         final Set<Xid> xids = Collections.synchronizedSet(new HashSet<Xid>());
322 
323         Thread t = new Thread("ehcache recovery thread") {
324             @Override
325             public void run() {
326                 Set<TransactionID> transactionIDs = softLockFactory.collectExpiredTransactionIDs();
327                 for (TransactionID transactionID : transactionIDs) {
328                     XidTransactionID xidTransactionID = (XidTransactionID) transactionID;
329                     xids.add(xidTransactionID.getXid());
330                 }
331             }
332         };
333         try {
334             t.start();
335             t.join(transactionTimeout * MILLISECOND_PER_SECOND);
336         } catch (InterruptedException e) {
337             // ignore
338         }
339         if (t.isAlive()) {
340             t.interrupt();
341         }
342 
343         return xids.toArray(new Xid[0]);
344     }
345 
346     /***
347      * {@inheritDoc}
348      */
349     public void rollback(Xid xid) throws XAException {
350         LOG.debug("rollback [{}]", xid);
351 
352         this.processor.process(new XARequest(XARequest.RequestType.ROLLBACK, xid));
353     }
354 
355     /***
356      * The rollback implementation
357      * @param xid a XID
358      * @throws XAException when an error occurs
359      */
360     public void rollbackInternal(Xid xid) throws XAException {
361         LiveCacheStatisticsWrapper liveCacheStatisticsWrapper = (LiveCacheStatisticsWrapper) cache.getLiveCacheStatistics();
362         liveCacheStatisticsWrapper.xaRollback();
363         XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
364         Set<SoftLock> softLocks = softLockFactory.collectAllSoftLocksForTransactionID(xidTransactionID);
365         for (SoftLock softLock : softLocks) {
366             if (softLock.isExpired()) {
367                 softLock.lock();
368                 softLock.freeze();
369             }
370         }
371 
372         for (SoftLock softLock : softLocks) {
373             try {
374                 ((XidTransactionID) softLock.getTransactionID()).markForRollback();
375             } catch (IllegalStateException ise) {
376                 throw new EhcacheXAException("XID already was committing: " + xid, XAException.XAER_RMERR);
377             }
378 
379             Element frozenElement = softLock.getFrozenElement();
380 
381             if (frozenElement != null) {
382                 underlyingStore.put(frozenElement);
383             } else {
384                 underlyingStore.remove(softLock.getKey());
385             }
386         }
387 
388         for (SoftLock softLock : softLocks) {
389             softLock.unfreeze();
390             softLock.unlock();
391         }
392 
393         // in case of a phase 1 rollback, we need to clean the context
394         xidToContextMap.remove(xid);
395 
396         fireAfterCommitOrRollback();
397     }
398 
399     /***
400      * {@inheritDoc}
401      */
402     public boolean setTransactionTimeout(int timeout) throws XAException {
403         if (timeout < 0) {
404             throw new EhcacheXAException("timeout must be >= 0, was: " + timeout, XAException.XAER_INVAL);
405         }
406         if (timeout == 0) {
407             this.transactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
408         } else {
409             this.transactionTimeout = timeout;
410         }
411         return true;
412     }
413 
414     /***
415      * {@inheritDoc}
416      */
417     public void addTwoPcExecutionListener(XAExecutionListener listener) {
418         listeners.add(listener);
419     }
420 
421     private void fireBeforePrepare() {
422         for (XAExecutionListener listener : listeners) {
423             listener.beforePrepare(this);
424         }
425     }
426 
427     private void fireAfterCommitOrRollback() {
428         for (XAExecutionListener listener : listeners) {
429             listener.afterCommitOrRollback(this);
430         }
431     }
432 
433     /***
434      * {@inheritDoc}
435      */
436     public String getCacheName() {
437         return cache.getName();
438     }
439 
440     /***
441      * {@inheritDoc}
442      */
443     public XATransactionContext createTransactionContext() throws SystemException, RollbackException {
444         XATransactionContext ctx = getCurrentTransactionContext();
445         if (ctx != null) {
446             return ctx;
447         }
448 
449         Transaction transaction = txnManager.getTransaction();
450         LOG.debug("enlisting {} in {}", this, transaction);
451         transaction.enlistResource(this);
452 
453         // currentXid is set by a call to start() which itself is called by transaction.enlistResource(this)
454         if (currentXid == null) {
455             throw new CacheException("enlistment of XAResource of cache named '" + getCacheName() +
456                     "' did not end up calling XAResource.start()");
457         }
458 
459         ctx = xidToContextMap.get(currentXid);
460         if (ctx == null) {
461             LOG.debug("creating new context for XID [{}]", currentXid);
462             ctx = new XATransactionContext(underlyingStore);
463             xidToContextMap.put(currentXid, ctx);
464         }
465 
466         return ctx;
467     }
468 
469     /***
470      * {@inheritDoc}
471      */
472     public XATransactionContext getCurrentTransactionContext() {
473         if (currentXid == null) {
474             LOG.debug("getting current TX context of XAResource with current XID [null]: null");
475             return null;
476         }
477         XATransactionContext xaTransactionContext = xidToContextMap.get(currentXid);
478         LOG.debug("getting current TX context of XAResource with current XID [{}]: {}", currentXid, xaTransactionContext);
479         return xaTransactionContext;
480     }
481 
482 
483     private static String prettyPrintXAResourceFlags(int flags) {
484         StringBuilder sb = new StringBuilder();
485 
486 
487         if ((flags & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
488             if (sb.length() > 0) {
489                 sb.append('|');
490             }
491             sb.append("TMENDRSCAN");
492         }
493         if ((flags & XAResource.TMFAIL) == XAResource.TMFAIL) {
494             if (sb.length() > 0) {
495                 sb.append('|');
496             }
497             sb.append("TMFAIL");
498         }
499         if ((flags & XAResource.TMJOIN) == XAResource.TMJOIN) {
500             if (sb.length() > 0) {
501                 sb.append('|');
502             }
503             sb.append("TMJOIN");
504         }
505         if ((flags & XAResource.TMONEPHASE) == XAResource.TMONEPHASE) {
506             if (sb.length() > 0) {
507                 sb.append('|');
508             }
509             sb.append("TMONEPHASE");
510         }
511         if ((flags & XAResource.TMRESUME) == XAResource.TMRESUME) {
512             if (sb.length() > 0) {
513                 sb.append('|');
514             }
515             sb.append("TMRESUME");
516         }
517         if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
518             if (sb.length() > 0) {
519                 sb.append('|');
520             }
521             sb.append("TMSTARTRSCAN");
522         }
523         if ((flags & XAResource.TMSUCCESS) == XAResource.TMSUCCESS) {
524             if (sb.length() > 0) {
525                 sb.append('|');
526             }
527             sb.append("TMSUCCESS");
528         }
529         if ((flags & XAResource.TMSUSPEND) == XAResource.TMSUSPEND) {
530             if (sb.length() > 0) {
531                 sb.append('|');
532             }
533             sb.append("TMSUSPEND");
534         }
535         if (sb.length() == 0 && flags == XAResource.TMNOFLAGS) {
536             sb.append("TMNOFLAGS");
537         }
538         if (sb.length() == 0) {
539             sb.append("unknown flag: ").append(flags);
540         }
541 
542         return sb.toString();
543     }
544 
545     @Override
546     public String toString() {
547         return "EhcacheXAResourceImpl of cache " + cache.getName();
548     }
549 }