1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package net.sf.ehcache.transaction.local;
17
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import javax.transaction.RollbackException;
23 import javax.transaction.Synchronization;
24 import javax.transaction.SystemException;
25 import javax.transaction.Transaction;
26 import javax.transaction.TransactionManager;
27 import javax.transaction.xa.XAException;
28 import javax.transaction.xa.XAResource;
29 import javax.transaction.xa.Xid;
30
31 import net.sf.ehcache.CacheEntry;
32 import net.sf.ehcache.CacheException;
33 import net.sf.ehcache.Ehcache;
34 import net.sf.ehcache.Element;
35 import net.sf.ehcache.TransactionController;
36 import net.sf.ehcache.store.ElementValueComparator;
37 import net.sf.ehcache.store.compound.NullReadWriteCopyStrategy;
38 import net.sf.ehcache.transaction.AbstractTransactionStore;
39 import net.sf.ehcache.transaction.TransactionException;
40 import net.sf.ehcache.transaction.TransactionID;
41 import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
42 import net.sf.ehcache.transaction.xa.EhcacheXAResource;
43 import net.sf.ehcache.transaction.xa.XAExecutionListener;
44 import net.sf.ehcache.transaction.xa.XATransactionContext;
45 import net.sf.ehcache.writer.CacheWriterManager;
46
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 /***
51 * A Store implementation with support for local transactions driven by a JTA transaction manager
52 *
53 * @author Ludovic Orban
54 */
55 public class JtaLocalTransactionStore extends AbstractTransactionStore {
56
57 private static final Logger LOG = LoggerFactory.getLogger(JtaLocalTransactionStore.class.getName());
58 private static final String ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME = "net.sf.ehcache.transaction.xa.alternativeTerminationMode";
59 private static final AtomicBoolean ATOMIKOS_WARNING_ISSUED = new AtomicBoolean(false);
60
61 private static final ThreadLocal<Transaction> BOUND_JTA_TRANSACTIONS = new ThreadLocal<Transaction>();
62
63 private final TransactionManagerLookup transactionManagerLookup;
64 private final TransactionController transactionController;
65 private final TransactionManager transactionManager;
66 private final Ehcache cache;
67
68 /***
69 * Create a new JtaLocalTransactionStore instance
70 * @param underlyingStore the underlying LocalTransactionStore
71 * @param transactionManagerLookup the TransactionManagerLookup
72 * @param transactionController the TransactionController
73 */
74 public JtaLocalTransactionStore(LocalTransactionStore underlyingStore, TransactionManagerLookup transactionManagerLookup,
75 TransactionController transactionController) {
76 super(underlyingStore, new NullReadWriteCopyStrategy());
77 this.transactionManagerLookup = transactionManagerLookup;
78 this.transactionController = transactionController;
79 this.transactionManager = transactionManagerLookup.getTransactionManager();
80 if (this.transactionManager == null) {
81 throw new TransactionException("no JTA transaction manager could be located");
82 }
83 this.cache = underlyingStore.getCache();
84
85 if (transactionManager.getClass().getName().contains("atomikos")) {
86 System.setProperty(ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME, "true");
87 if (ATOMIKOS_WARNING_ISSUED.compareAndSet(false, true)) {
88 LOG.warn("Atomikos transaction manager detected, make sure you configured com.atomikos.icatch.threaded_2pc=false");
89 }
90 }
91 }
92
93 private void registerInJtaContext() {
94 try {
95 if (transactionController.getCurrentTransactionContext() != null) {
96
97
98
99 Transaction tx = transactionManager.getTransaction();
100 if (!BOUND_JTA_TRANSACTIONS.get().equals(tx)) {
101 throw new TransactionException("Invalid JTA transaction context, cache was first used in transaction ["
102 + BOUND_JTA_TRANSACTIONS.get() + "]" +
103 " but is now used in transaction [" + tx + "].");
104 }
105 } else {
106 Transaction tx = transactionManager.getTransaction();
107 if (tx == null) {
108 throw new TransactionException("no JTA transaction context started, xa caches cannot be used outside of" +
109 " JTA transactions");
110 }
111 BOUND_JTA_TRANSACTIONS.set(tx);
112
113 transactionController.begin();
114
115
116 if (Boolean.getBoolean(ALTERNATIVE_TERMINATION_MODE_SYS_PROPERTY_NAME)) {
117
118 JtaLocalEhcacheXAResource xaRes = new JtaLocalEhcacheXAResource(transactionController,
119 transactionController.getCurrentTransactionContext().getTransactionId());
120 transactionManagerLookup.register(xaRes);
121 tx.enlistResource(xaRes);
122 } else {
123 tx.registerSynchronization(new JtaLocalEhcacheSynchronization(transactionController,
124 transactionController.getCurrentTransactionContext().getTransactionId()));
125 }
126 }
127 } catch (SystemException e) {
128 throw new TransactionException("internal JTA transaction manager error, cannot bind xa cache with it", e);
129 } catch (RollbackException e) {
130 throw new TransactionException("JTA transaction rolled back, cannot bind xa cache with it", e);
131 }
132 }
133
134 /***
135 * A Synchronization used to terminate the local transaction and clean it up
136 */
137 private static final class JtaLocalEhcacheSynchronization implements Synchronization {
138 private final TransactionController transactionController;
139 private final TransactionID transactionId;
140
141 private JtaLocalEhcacheSynchronization(TransactionController transactionController, TransactionID transactionId) {
142 this.transactionController = transactionController;
143 this.transactionId = transactionId;
144 }
145
146 public void beforeCompletion() {
147
148 }
149
150 public void afterCompletion(int status) {
151 JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
152 if (status == javax.transaction.Status.STATUS_COMMITTED) {
153 transactionController.commit(true);
154 } else if (status == javax.transaction.Status.STATUS_ROLLEDBACK) {
155 transactionController.rollback();
156 } else {
157 transactionController.rollback();
158 LOG.warn("The transaction manager reported UNKNOWN transaction status upon termination." +
159 " The ehcache transaction has been rolled back!");
160 }
161 }
162
163 @Override
164 public String toString() {
165 return "JtaLocalEhcacheSynchronization of transaction [" + transactionId + "]";
166 }
167 }
168
169 /***
170 * A XAResource implementation used to terminate the local transaction and clean it up.
171 *
172 * It should only be used with transaction managers providing a defective Synchronization
173 * mechanism with which rollback/commit cannot be reliably differentiated as it relies
174 * on the fact that the same thread is used to call Ehcache methods as well as XAResource
175 * which isn't guaranteed by the specification.
176 * This mechanism also has a slight performance impact as there is one extra resource
177 * participating in the 2PC.
178 */
179 private static final class JtaLocalEhcacheXAResource implements EhcacheXAResource {
180 private final TransactionController transactionController;
181 private final TransactionID transactionId;
182
183 private JtaLocalEhcacheXAResource(TransactionController transactionController, TransactionID transactionId) {
184 this.transactionController = transactionController;
185 this.transactionId = transactionId;
186 }
187
188 public void commit(Xid xid, boolean onePhase) throws XAException {
189 transactionController.commit(true);
190 JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
191 }
192
193 public void end(Xid xid, int flag) throws XAException {
194
195 }
196
197 public void forget(Xid xid) throws XAException {
198
199 }
200
201 public int getTransactionTimeout() throws XAException {
202 return 0;
203 }
204
205 public boolean isSameRM(XAResource xaResource) throws XAException {
206 return xaResource == this;
207 }
208
209 public int prepare(Xid xid) throws XAException {
210 return XA_OK;
211 }
212
213 public Xid[] recover(int flags) throws XAException {
214 return new Xid[0];
215 }
216
217 public void rollback(Xid xid) throws XAException {
218 transactionController.rollback();
219 JtaLocalTransactionStore.BOUND_JTA_TRANSACTIONS.remove();
220 }
221
222 public boolean setTransactionTimeout(int timeout) throws XAException {
223 return false;
224 }
225
226 public void start(Xid xid, int flag) throws XAException {
227
228 }
229
230 public void addTwoPcExecutionListener(XAExecutionListener listener) {
231 throw new UnsupportedOperationException();
232 }
233
234 public String getCacheName() {
235 return transactionId.toString();
236 }
237
238 public XATransactionContext createTransactionContext() throws SystemException, RollbackException {
239 throw new UnsupportedOperationException();
240 }
241
242 public XATransactionContext getCurrentTransactionContext() {
243 throw new UnsupportedOperationException();
244 }
245
246 @Override
247 public String toString() {
248 return "JtaLocalEhcacheXAResource of transaction [" + transactionId + "]";
249 }
250 }
251
252 private void setRollbackOnly() {
253 try {
254 BOUND_JTA_TRANSACTIONS.get().setRollbackOnly();
255 transactionController.setRollbackOnly();
256 } catch (SystemException e) {
257 LOG.warn("internal JTA transaction manager error", e);
258 }
259 }
260
261
262
263 /***
264 * {@inheritDoc}
265 */
266 public boolean put(Element element) throws CacheException {
267 registerInJtaContext();
268 try {
269 return underlyingStore.put(element);
270 } catch (CacheException e) {
271 setRollbackOnly();
272 throw e;
273 }
274 }
275
276 /***
277 * {@inheritDoc}
278 */
279 public void putAll(Collection<Element> elements) throws CacheException {
280 registerInJtaContext();
281 try {
282 underlyingStore.putAll(elements);
283 } catch (CacheException e) {
284 setRollbackOnly();
285 throw e;
286 }
287 }
288
289 /***
290 * {@inheritDoc}
291 */
292 public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
293 registerInJtaContext();
294 try {
295 boolean put = underlyingStore.put(element);
296 transactionManager.getTransaction().registerSynchronization(new Synchronization() {
297 public void beforeCompletion() {
298 if (writerManager != null) {
299 writerManager.put(element);
300 } else {
301 cache.getWriterManager().put(element);
302 }
303 }
304 public void afterCompletion(int status) {
305
306 }
307 });
308 return put;
309 } catch (CacheException e) {
310 setRollbackOnly();
311 throw e;
312 } catch (RollbackException e) {
313 throw new TransactionException("error registering writer synchronization", e);
314 } catch (SystemException e) {
315 throw new TransactionException("error registering writer synchronization", e);
316 }
317 }
318
319 /***
320 * {@inheritDoc}
321 */
322 public Element get(Object key) {
323 registerInJtaContext();
324 try {
325 return underlyingStore.get(key);
326 } catch (CacheException e) {
327 setRollbackOnly();
328 throw e;
329 }
330 }
331
332 /***
333 * {@inheritDoc}
334 */
335 public Element getQuiet(Object key) {
336 registerInJtaContext();
337 try {
338 return underlyingStore.getQuiet(key);
339 } catch (CacheException e) {
340 setRollbackOnly();
341 throw e;
342 }
343 }
344
345 /***
346 * {@inheritDoc}
347 */
348 public List getKeys() {
349 registerInJtaContext();
350 try {
351 return underlyingStore.getKeys();
352 } catch (CacheException e) {
353 setRollbackOnly();
354 throw e;
355 }
356 }
357
358 /***
359 * {@inheritDoc}
360 */
361 public Element remove(Object key) {
362 registerInJtaContext();
363 try {
364 return underlyingStore.remove(key);
365 } catch (CacheException e) {
366 setRollbackOnly();
367 throw e;
368 }
369 }
370
371
372 /***
373 * {@inheritDoc}
374 */
375 public void removeAll(Collection<Object> keys) {
376 registerInJtaContext();
377 try {
378 underlyingStore.removeAll(keys);
379 } catch (CacheException e) {
380 setRollbackOnly();
381 throw e;
382 }
383 }
384
385 /***
386 * {@inheritDoc}
387 */
388 public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
389 registerInJtaContext();
390 try {
391 Element removed = underlyingStore.remove(key);
392 final CacheEntry cacheEntry = new CacheEntry(key, getQuiet(key));
393 transactionManager.getTransaction().registerSynchronization(new Synchronization() {
394 public void beforeCompletion() {
395 if (writerManager != null) {
396 writerManager.remove(cacheEntry);
397 } else {
398 cache.getWriterManager().remove(cacheEntry);
399 }
400 }
401 public void afterCompletion(int status) {
402
403 }
404 });
405 return removed;
406 } catch (CacheException e) {
407 setRollbackOnly();
408 throw e;
409 } catch (RollbackException e) {
410 throw new TransactionException("error registering writer synchronization", e);
411 } catch (SystemException e) {
412 throw new TransactionException("error registering writer synchronization", e);
413 }
414 }
415
416 /***
417 * {@inheritDoc}
418 */
419 public void removeAll() throws CacheException {
420 registerInJtaContext();
421 try {
422 underlyingStore.removeAll();
423 } catch (CacheException e) {
424 setRollbackOnly();
425 throw e;
426 }
427 }
428
429 /***
430 * {@inheritDoc}
431 */
432 public Element putIfAbsent(Element element) throws NullPointerException {
433 registerInJtaContext();
434 try {
435 return underlyingStore.putIfAbsent(element);
436 } catch (CacheException e) {
437 setRollbackOnly();
438 throw e;
439 }
440 }
441
442 /***
443 * {@inheritDoc}
444 */
445 public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
446 registerInJtaContext();
447 try {
448 return underlyingStore.removeElement(element, comparator);
449 } catch (CacheException e) {
450 setRollbackOnly();
451 throw e;
452 }
453 }
454
455 /***
456 * {@inheritDoc}
457 */
458 public boolean replace(Element old, Element element, ElementValueComparator comparator)
459 throws NullPointerException, IllegalArgumentException {
460 registerInJtaContext();
461 try {
462 return underlyingStore.replace(old, element, comparator);
463 } catch (CacheException e) {
464 setRollbackOnly();
465 throw e;
466 }
467 }
468
469 /***
470 * {@inheritDoc}
471 */
472 public Element replace(Element element) throws NullPointerException {
473 registerInJtaContext();
474 try {
475 return underlyingStore.replace(element);
476 } catch (CacheException e) {
477 setRollbackOnly();
478 throw e;
479 }
480 }
481
482 /***
483 * {@inheritDoc}
484 */
485 public int getSize() {
486 registerInJtaContext();
487 try {
488 return underlyingStore.getSize();
489 } catch (CacheException e) {
490 setRollbackOnly();
491 throw e;
492 }
493 }
494
495 /***
496 * {@inheritDoc}
497 */
498 public int getTerracottaClusteredSize() {
499 if (transactionController.getCurrentTransactionContext() == null) {
500 return underlyingStore.getTerracottaClusteredSize();
501 }
502
503 registerInJtaContext();
504 try {
505 return underlyingStore.getTerracottaClusteredSize();
506 } catch (CacheException e) {
507 setRollbackOnly();
508 throw e;
509 }
510 }
511
512 /***
513 * {@inheritDoc}
514 */
515 public boolean containsKey(Object key) {
516 registerInJtaContext();
517 try {
518 return underlyingStore.containsKey(key);
519 } catch (CacheException e) {
520 setRollbackOnly();
521 throw e;
522 }
523 }
524
525
526
527 }
528