1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package net.sf.ehcache.transaction.xa;
17
18 import net.sf.ehcache.CacheException;
19 import net.sf.ehcache.Ehcache;
20 import net.sf.ehcache.Element;
21 import net.sf.ehcache.statistics.LiveCacheStatisticsWrapper;
22 import net.sf.ehcache.store.ElementValueComparator;
23 import net.sf.ehcache.store.Store;
24 import net.sf.ehcache.store.chm.ConcurrentHashMap;
25 import net.sf.ehcache.transaction.SoftLock;
26 import net.sf.ehcache.transaction.SoftLockFactory;
27 import net.sf.ehcache.transaction.TransactionID;
28 import net.sf.ehcache.transaction.TransactionIDFactory;
29 import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
30 import net.sf.ehcache.transaction.xa.commands.Command;
31 import net.sf.ehcache.transaction.xa.processor.XARequestProcessor;
32 import net.sf.ehcache.transaction.xa.processor.XARequest;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import javax.transaction.RollbackException;
37 import javax.transaction.SystemException;
38 import javax.transaction.Transaction;
39 import javax.transaction.TransactionManager;
40 import javax.transaction.xa.XAException;
41 import javax.transaction.xa.XAResource;
42 import javax.transaction.xa.Xid;
43 import java.util.ArrayList;
44 import java.util.Arrays;
45 import java.util.Collections;
46 import java.util.HashSet;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Set;
50 import java.util.concurrent.ConcurrentMap;
51
52 /***
53 * The EhcacheXAResource implementation
54 *
55 * @author Ludovic Orban
56 */
57 public class EhcacheXAResourceImpl implements EhcacheXAResource {
58
59 private static final Logger LOG = LoggerFactory.getLogger(EhcacheXAResourceImpl.class.getName());
60 private static final long MILLISECOND_PER_SECOND = 1000L;
61
62 private final Ehcache cache;
63 private final Store underlyingStore;
64 private final TransactionIDFactory transactionIDFactory;
65 private final TransactionManager txnManager;
66 private final SoftLockFactory softLockFactory;
67 private final ConcurrentMap<Xid, XATransactionContext> xidToContextMap = new ConcurrentHashMap<Xid, XATransactionContext>();
68 private final XARequestProcessor processor;
69 private volatile Xid currentXid;
70 private volatile int transactionTimeout;
71 private final List<XAExecutionListener> listeners = new ArrayList<XAExecutionListener>();
72 private final ElementValueComparator comparator;
73
74 /***
75 * Constructor
76 * @param cache the cache
77 * @param underlyingStore the underlying store
78 * @param txnManagerLookup the transaction manager lookup
79 * @param softLockFactory the soft lock factory
80 * @param transactionIDFactory the transaction ID factory
81 */
82 public EhcacheXAResourceImpl(Ehcache cache, Store underlyingStore, TransactionManagerLookup txnManagerLookup,
83 SoftLockFactory softLockFactory, TransactionIDFactory transactionIDFactory) {
84 this.cache = cache;
85 this.underlyingStore = underlyingStore;
86 this.transactionIDFactory = transactionIDFactory;
87 this.txnManager = txnManagerLookup.getTransactionManager();
88 this.softLockFactory = softLockFactory;
89 this.processor = new XARequestProcessor(this);
90 this.transactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
91 this.comparator = cache.getCacheConfiguration().getElementValueComparatorConfiguration().getElementComparatorInstance();
92 }
93
94 /***
95 * {@inheritDoc}
96 */
97 public void start(Xid xid, int flag) throws XAException {
98 LOG.debug("start [{}] [{}]", xid, prettyPrintXAResourceFlags(flag));
99
100 if (currentXid != null) {
101 throw new EhcacheXAException("resource already started on " + currentXid, XAException.XAER_PROTO);
102 }
103
104 if (flag == TMNOFLAGS) {
105 if (xidToContextMap.containsKey(xid)) {
106 throw new EhcacheXAException("cannot start with duplicate XID: " + xid, XAException.XAER_DUPID);
107 }
108 currentXid = xid;
109 } else if (flag == TMRESUME) {
110 if (!xidToContextMap.containsKey(xid)) {
111 throw new EhcacheXAException("cannot resume non-existent XID: " + xid, XAException.XAER_NOTA);
112 }
113 currentXid = xid;
114 } else if (flag == TMJOIN) {
115 currentXid = xid;
116 } else {
117 throw new EhcacheXAException("unsupported flag: " + flag, XAException.XAER_PROTO);
118 }
119 }
120
121 /***
122 * {@inheritDoc}
123 */
124 public void end(Xid xid, int flag) throws XAException {
125 LOG.debug("end [{}] [{}]", xid, prettyPrintXAResourceFlags(flag));
126
127 if (currentXid == null) {
128 throw new EhcacheXAException("resource not started on " + xid, XAException.XAER_PROTO);
129 }
130
131 if (flag == TMSUCCESS || flag == TMSUSPEND) {
132 if (!currentXid.equals(xid)) {
133 throw new EhcacheXAException("cannot end working on unknown XID " + xid, XAException.XAER_NOTA);
134 }
135 currentXid = null;
136 } else if (flag == TMFAIL) {
137 if (!currentXid.equals(xid)) {
138 throw new EhcacheXAException("cannot end working on " + xid + " while work on current XID " + currentXid + " hasn't ended",
139 XAException.XAER_PROTO);
140 }
141 xidToContextMap.remove(xid);
142 currentXid = null;
143 } else {
144 throw new EhcacheXAException("unsupported flag: " + flag, XAException.XAER_PROTO);
145 }
146 }
147
148 /***
149 * {@inheritDoc}
150 */
151 public void forget(Xid xid) throws XAException {
152 LOG.debug("forget [{}]", xid);
153
154 processor.process(new XARequest(XARequest.RequestType.FORGET, xid));
155 }
156
157 /***
158 * The forget implementation
159 * @param xid a XID
160 * @throws XAException when an error occurs
161 */
162 public void forgetInternal(Xid xid) throws XAException {
163 List<Xid> xids = Arrays.asList(recover(TMSTARTRSCAN));
164 if (!xids.contains(xid)) {
165 throw new EhcacheXAException("forget called on in-doubt XID" + xid, XAException.XAER_PROTO);
166 }
167 }
168
169 /***
170 * {@inheritDoc}
171 */
172 public int getTransactionTimeout() throws XAException {
173 return transactionTimeout;
174 }
175
176 /***
177 * {@inheritDoc}
178 */
179 public boolean isSameRM(XAResource xaResource) throws XAException {
180 boolean b = xaResource == this;
181 LOG.debug("{} isSameRm {} -> " + b, this, xaResource);
182 return b;
183 }
184
185 /***
186 * {@inheritDoc}
187 */
188 public int prepare(Xid xid) throws XAException {
189 LOG.debug("prepare [{}]", xid);
190
191 if (currentXid != null) {
192 throw new EhcacheXAException("prepare called on non-ended XID: " + xid, XAException.XAER_PROTO);
193 }
194 return processor.process(new XARequest(XARequest.RequestType.PREPARE, xid));
195 }
196
197 /***
198 * The prepare implementation
199 * @param xid a XID
200 * @return XA_OK or XA_RDONLY
201 * @throws XAException when an error occurs
202 */
203 public int prepareInternal(Xid xid) throws XAException {
204 fireBeforePrepare();
205
206 XATransactionContext twopcTransactionContext = xidToContextMap.get(xid);
207 if (twopcTransactionContext == null) {
208 throw new EhcacheXAException("transaction never started: " + xid, XAException.XAER_NOTA);
209 }
210
211
212 XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
213
214 List<Command> commands = twopcTransactionContext.getCommands();
215 List<Command> preparedCommands = new LinkedList<Command>();
216
217 boolean prepareUpdated = false;
218 LOG.debug("preparing {} command(s) for [{}]", commands.size(), xid);
219 for (Command command : commands) {
220 try {
221 prepareUpdated |= command.prepare(underlyingStore, softLockFactory, xidTransactionID, comparator);
222 preparedCommands.add(0, command);
223 } catch (OptimisticLockFailureException ie) {
224 for (Command preparedCommand : preparedCommands) {
225 preparedCommand.rollback(underlyingStore);
226 }
227 preparedCommands.clear();
228 throw new EhcacheXAException(command + " failed because value changed between execution and 2PC",
229 XAException.XA_RBINTEGRITY, ie);
230 }
231 }
232
233 xidToContextMap.remove(xid);
234
235 if (!prepareUpdated) {
236 rollbackInternal(xid);
237 }
238
239 LOG.debug("prepared xid [{}] read only? {}", xid, !prepareUpdated);
240 return prepareUpdated ? XA_OK : XA_RDONLY;
241 }
242
243 /***
244 * {@inheritDoc}
245 */
246 public void commit(Xid xid, boolean onePhase) throws XAException {
247 LOG.debug("commit [{}] [{}]", xid, onePhase);
248
249 if (currentXid != null) {
250 throw new EhcacheXAException("commit called on non-ended XID: " + xid, XAException.XAER_PROTO);
251 }
252 this.processor.process(new XARequest(XARequest.RequestType.COMMIT, xid, onePhase));
253 }
254
255 /***
256 * The commit implementation
257 * @param xid a XID
258 * @param onePhase true if onePhase, false otherwise
259 * @throws XAException when an error occurs
260 */
261 public void commitInternal(Xid xid, boolean onePhase) throws XAException {
262 LiveCacheStatisticsWrapper liveCacheStatisticsWrapper = (LiveCacheStatisticsWrapper) cache.getLiveCacheStatistics();
263 liveCacheStatisticsWrapper.xaCommit();
264 if (onePhase) {
265 XATransactionContext twopcTransactionContext = xidToContextMap.get(xid);
266 if (twopcTransactionContext == null) {
267 throw new EhcacheXAException("cannot call commit(onePhase=true) after prepare", XAException.XAER_PROTO);
268 }
269
270 int rc = prepareInternal(xid);
271 if (rc == XA_RDONLY) {
272 return;
273 }
274 }
275
276 XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
277 Set<SoftLock> softLocks = softLockFactory.collectAllSoftLocksForTransactionID(xidTransactionID);
278 LOG.debug("committing {} soft lock(s) for [{}]", softLocks.size(), xid);
279 for (SoftLock softLock : softLocks) {
280 if (softLock.isExpired()) {
281 softLock.lock();
282 softLock.freeze();
283 }
284 }
285
286 for (SoftLock softLock : softLocks) {
287 try {
288 softLock.getTransactionID().markForCommit();
289 } catch (IllegalStateException ise) {
290 throw new EhcacheXAException("XID already was rolling back: " + xid, XAException.XAER_RMERR);
291 }
292
293 Element frozenElement = softLock.getFrozenElement();
294
295 if (frozenElement != null) {
296 underlyingStore.put(frozenElement);
297 } else {
298 underlyingStore.remove(softLock.getKey());
299 }
300 }
301
302 for (SoftLock softLock : softLocks) {
303 softLock.unfreeze();
304 softLock.unlock();
305 }
306
307
308 fireAfterCommitOrRollback();
309 }
310
311 /***
312 * {@inheritDoc}
313 */
314 public Xid[] recover(int flags) throws XAException {
315 LOG.debug("recover [{}]", prettyPrintXAResourceFlags(flags));
316
317 if ((flags & TMSTARTRSCAN) != TMSTARTRSCAN) {
318 return new Xid[0];
319 }
320
321 final Set<Xid> xids = Collections.synchronizedSet(new HashSet<Xid>());
322
323 Thread t = new Thread("ehcache recovery thread") {
324 @Override
325 public void run() {
326 Set<TransactionID> transactionIDs = softLockFactory.collectExpiredTransactionIDs();
327 for (TransactionID transactionID : transactionIDs) {
328 XidTransactionID xidTransactionID = (XidTransactionID) transactionID;
329 xids.add(xidTransactionID.getXid());
330 }
331 }
332 };
333 try {
334 t.start();
335 t.join(transactionTimeout * MILLISECOND_PER_SECOND);
336 } catch (InterruptedException e) {
337
338 }
339 if (t.isAlive()) {
340 t.interrupt();
341 }
342
343 return xids.toArray(new Xid[0]);
344 }
345
346 /***
347 * {@inheritDoc}
348 */
349 public void rollback(Xid xid) throws XAException {
350 LOG.debug("rollback [{}]", xid);
351
352 this.processor.process(new XARequest(XARequest.RequestType.ROLLBACK, xid));
353 }
354
355 /***
356 * The rollback implementation
357 * @param xid a XID
358 * @throws XAException when an error occurs
359 */
360 public void rollbackInternal(Xid xid) throws XAException {
361 LiveCacheStatisticsWrapper liveCacheStatisticsWrapper = (LiveCacheStatisticsWrapper) cache.getLiveCacheStatistics();
362 liveCacheStatisticsWrapper.xaRollback();
363 XidTransactionID xidTransactionID = transactionIDFactory.createXidTransactionID(xid);
364 Set<SoftLock> softLocks = softLockFactory.collectAllSoftLocksForTransactionID(xidTransactionID);
365 for (SoftLock softLock : softLocks) {
366 if (softLock.isExpired()) {
367 softLock.lock();
368 softLock.freeze();
369 }
370 }
371
372 for (SoftLock softLock : softLocks) {
373 try {
374 ((XidTransactionID) softLock.getTransactionID()).markForRollback();
375 } catch (IllegalStateException ise) {
376 throw new EhcacheXAException("XID already was committing: " + xid, XAException.XAER_RMERR);
377 }
378
379 Element frozenElement = softLock.getFrozenElement();
380
381 if (frozenElement != null) {
382 underlyingStore.put(frozenElement);
383 } else {
384 underlyingStore.remove(softLock.getKey());
385 }
386 }
387
388 for (SoftLock softLock : softLocks) {
389 softLock.unfreeze();
390 softLock.unlock();
391 }
392
393
394 xidToContextMap.remove(xid);
395
396 fireAfterCommitOrRollback();
397 }
398
399 /***
400 * {@inheritDoc}
401 */
402 public boolean setTransactionTimeout(int timeout) throws XAException {
403 if (timeout < 0) {
404 throw new EhcacheXAException("timeout must be >= 0, was: " + timeout, XAException.XAER_INVAL);
405 }
406 if (timeout == 0) {
407 this.transactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
408 } else {
409 this.transactionTimeout = timeout;
410 }
411 return true;
412 }
413
414 /***
415 * {@inheritDoc}
416 */
417 public void addTwoPcExecutionListener(XAExecutionListener listener) {
418 listeners.add(listener);
419 }
420
421 private void fireBeforePrepare() {
422 for (XAExecutionListener listener : listeners) {
423 listener.beforePrepare(this);
424 }
425 }
426
427 private void fireAfterCommitOrRollback() {
428 for (XAExecutionListener listener : listeners) {
429 listener.afterCommitOrRollback(this);
430 }
431 }
432
433 /***
434 * {@inheritDoc}
435 */
436 public String getCacheName() {
437 return cache.getName();
438 }
439
440 /***
441 * {@inheritDoc}
442 */
443 public XATransactionContext createTransactionContext() throws SystemException, RollbackException {
444 XATransactionContext ctx = getCurrentTransactionContext();
445 if (ctx != null) {
446 return ctx;
447 }
448
449 Transaction transaction = txnManager.getTransaction();
450 LOG.debug("enlisting {} in {}", this, transaction);
451 transaction.enlistResource(this);
452
453
454 if (currentXid == null) {
455 throw new CacheException("enlistment of XAResource of cache named '" + getCacheName() +
456 "' did not end up calling XAResource.start()");
457 }
458
459 ctx = xidToContextMap.get(currentXid);
460 if (ctx == null) {
461 LOG.debug("creating new context for XID [{}]", currentXid);
462 ctx = new XATransactionContext(underlyingStore);
463 xidToContextMap.put(currentXid, ctx);
464 }
465
466 return ctx;
467 }
468
469 /***
470 * {@inheritDoc}
471 */
472 public XATransactionContext getCurrentTransactionContext() {
473 if (currentXid == null) {
474 LOG.debug("getting current TX context of XAResource with current XID [null]: null");
475 return null;
476 }
477 XATransactionContext xaTransactionContext = xidToContextMap.get(currentXid);
478 LOG.debug("getting current TX context of XAResource with current XID [{}]: {}", currentXid, xaTransactionContext);
479 return xaTransactionContext;
480 }
481
482
483 private static String prettyPrintXAResourceFlags(int flags) {
484 StringBuilder sb = new StringBuilder();
485
486
487 if ((flags & XAResource.TMENDRSCAN) == XAResource.TMENDRSCAN) {
488 if (sb.length() > 0) {
489 sb.append('|');
490 }
491 sb.append("TMENDRSCAN");
492 }
493 if ((flags & XAResource.TMFAIL) == XAResource.TMFAIL) {
494 if (sb.length() > 0) {
495 sb.append('|');
496 }
497 sb.append("TMFAIL");
498 }
499 if ((flags & XAResource.TMJOIN) == XAResource.TMJOIN) {
500 if (sb.length() > 0) {
501 sb.append('|');
502 }
503 sb.append("TMJOIN");
504 }
505 if ((flags & XAResource.TMONEPHASE) == XAResource.TMONEPHASE) {
506 if (sb.length() > 0) {
507 sb.append('|');
508 }
509 sb.append("TMONEPHASE");
510 }
511 if ((flags & XAResource.TMRESUME) == XAResource.TMRESUME) {
512 if (sb.length() > 0) {
513 sb.append('|');
514 }
515 sb.append("TMRESUME");
516 }
517 if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) {
518 if (sb.length() > 0) {
519 sb.append('|');
520 }
521 sb.append("TMSTARTRSCAN");
522 }
523 if ((flags & XAResource.TMSUCCESS) == XAResource.TMSUCCESS) {
524 if (sb.length() > 0) {
525 sb.append('|');
526 }
527 sb.append("TMSUCCESS");
528 }
529 if ((flags & XAResource.TMSUSPEND) == XAResource.TMSUSPEND) {
530 if (sb.length() > 0) {
531 sb.append('|');
532 }
533 sb.append("TMSUSPEND");
534 }
535 if (sb.length() == 0 && flags == XAResource.TMNOFLAGS) {
536 sb.append("TMNOFLAGS");
537 }
538 if (sb.length() == 0) {
539 sb.append("unknown flag: ").append(flags);
540 }
541
542 return sb.toString();
543 }
544
545 @Override
546 public String toString() {
547 return "EhcacheXAResourceImpl of cache " + cache.getName();
548 }
549 }