1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package net.sf.ehcache.transaction.xa;
17
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import javax.transaction.RollbackException;
27 import javax.transaction.SystemException;
28 import javax.transaction.Transaction;
29 import javax.transaction.xa.XAException;
30
31 import net.sf.ehcache.CacheEntry;
32 import net.sf.ehcache.CacheException;
33 import net.sf.ehcache.Ehcache;
34 import net.sf.ehcache.Element;
35 import net.sf.ehcache.search.attribute.AttributeExtractor;
36 import net.sf.ehcache.store.ElementValueComparator;
37 import net.sf.ehcache.store.Store;
38 import net.sf.ehcache.store.compound.ReadWriteCopyStrategy;
39 import net.sf.ehcache.transaction.AbstractTransactionStore;
40 import net.sf.ehcache.transaction.SoftLock;
41 import net.sf.ehcache.transaction.SoftLockFactory;
42 import net.sf.ehcache.transaction.TransactionAwareAttributeExtractor;
43 import net.sf.ehcache.transaction.TransactionException;
44 import net.sf.ehcache.transaction.TransactionIDFactory;
45 import net.sf.ehcache.transaction.TransactionInterruptedException;
46 import net.sf.ehcache.transaction.TransactionTimeoutException;
47 import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
48 import net.sf.ehcache.transaction.xa.commands.StorePutCommand;
49 import net.sf.ehcache.transaction.xa.commands.StoreRemoveCommand;
50 import net.sf.ehcache.util.LargeSet;
51 import net.sf.ehcache.util.SetWrapperList;
52 import net.sf.ehcache.writer.CacheWriterManager;
53
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /***
58 * @author Ludovic Orban
59 */
60 public class XATransactionStore extends AbstractTransactionStore {
61
62 private static final Logger LOG = LoggerFactory.getLogger(XATransactionStore.class.getName());
63 private static final long MILLISECOND_PER_SECOND = 1000L;
64
65 private final TransactionManagerLookup transactionManagerLookup;
66 private final TransactionIDFactory transactionIdFactory;
67 private final SoftLockFactory softLockFactory;
68 private final Ehcache cache;
69
70 private final ConcurrentHashMap<Transaction, EhcacheXAResource> transactionToXAResourceMap =
71 new ConcurrentHashMap<Transaction, EhcacheXAResource>();
72 private final ConcurrentHashMap<Transaction, Long> transactionToTimeoutMap = new ConcurrentHashMap<Transaction, Long>();
73
74 /***
75 * Constructor
76 * @param transactionManagerLookup the transaction manager lookup implementation
77 * @param softLockFactory the soft lock factory
78 * @param transactionIdFactory the transaction ID factory
79 * @param cache the cache
80 * @param store the underlying store
81 * @param copyStrategy the original copy strategy
82 */
83 public XATransactionStore(TransactionManagerLookup transactionManagerLookup, SoftLockFactory softLockFactory,
84 TransactionIDFactory transactionIdFactory, Ehcache cache, Store store,
85 ReadWriteCopyStrategy<Element> copyStrategy) {
86 super(store, copyStrategy);
87 this.transactionManagerLookup = transactionManagerLookup;
88 this.transactionIdFactory = transactionIdFactory;
89 if (transactionManagerLookup.getTransactionManager() == null) {
90 throw new TransactionException("no JTA transaction manager could be located, cannot bind twopc cache with JTA");
91 }
92 this.softLockFactory = softLockFactory;
93 this.cache = cache;
94 }
95
96 private Transaction getCurrentTransaction() throws SystemException {
97 Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
98 if (transaction == null) {
99 throw new TransactionException("JTA transaction not started");
100 }
101 return transaction;
102 }
103
104 /***
105 * Get or create the XAResource of this XA store
106 * @return the EhcacheXAResource of this store
107 * @throws SystemException when something goes wrong with the transaction manager
108 */
109 public EhcacheXAResourceImpl getOrCreateXAResource() throws SystemException {
110 Transaction transaction = getCurrentTransaction();
111 EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
112 if (xaResource == null) {
113 LOG.debug("creating new XAResource");
114 xaResource = new EhcacheXAResourceImpl(cache, underlyingStore, transactionManagerLookup,
115 softLockFactory, transactionIdFactory);
116 transactionToXAResourceMap.put(transaction, xaResource);
117 xaResource.addTwoPcExecutionListener(new CleanupXAResource(getCurrentTransaction()));
118 }
119 return xaResource;
120 }
121
122 private XATransactionContext getTransactionContext() {
123 try {
124 Transaction transaction = getCurrentTransaction();
125 EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
126 if (xaResource == null) {
127 return null;
128 }
129 XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
130
131 if (transactionContext == null) {
132 transactionManagerLookup.register(xaResource);
133 LOG.debug("creating new XA context");
134 transactionContext = xaResource.createTransactionContext();
135 xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
136 } else {
137 transactionContext = xaResource.getCurrentTransactionContext();
138 }
139
140 LOG.debug("using XA context {}", transactionContext);
141 return transactionContext;
142 } catch (SystemException e) {
143 throw new TransactionException("cannot get the current transaction", e);
144 } catch (RollbackException e) {
145 throw new TransactionException("transaction rolled back", e);
146 }
147 }
148
149 private XATransactionContext getOrCreateTransactionContext() {
150 try {
151 EhcacheXAResourceImpl xaResource = getOrCreateXAResource();
152 XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
153
154 if (transactionContext == null) {
155 transactionManagerLookup.register(xaResource);
156 LOG.debug("creating new XA context");
157 transactionContext = xaResource.createTransactionContext();
158 xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
159 } else {
160 transactionContext = xaResource.getCurrentTransactionContext();
161 }
162
163 LOG.debug("using XA context {}", transactionContext);
164 return transactionContext;
165 } catch (SystemException e) {
166 throw new TransactionException("cannot get the current transaction", e);
167 } catch (RollbackException e) {
168 throw new TransactionException("transaction rolled back", e);
169 }
170 }
171
172 /***
173 * This class is used to clean up the transactionToXAResourceMap after a transaction
174 * committed or rolled back.
175 */
176 private final class CleanupXAResource implements XAExecutionListener {
177 private final Transaction transaction;
178
179 private CleanupXAResource(Transaction transaction) {
180 this.transaction = transaction;
181 }
182
183 public void beforePrepare(EhcacheXAResource xaResource) {
184 }
185
186 public void afterCommitOrRollback(EhcacheXAResource xaResource) {
187 transactionToXAResourceMap.remove(transaction);
188 transactionToTimeoutMap.remove(transaction);
189 }
190 }
191
192 /***
193 * This class is used to unregister the XAResource after a transaction
194 * committed or rolled back.
195 */
196 private final class UnregisterXAResource implements XAExecutionListener {
197
198 public void beforePrepare(EhcacheXAResource xaResource) {
199 }
200
201 public void afterCommitOrRollback(EhcacheXAResource xaResource) {
202 transactionManagerLookup.unregister(xaResource);
203 }
204 }
205
206
207 /***
208 * @return milliseconds left before timeout
209 */
210 private long assertNotTimedOut() {
211 try {
212 if (Thread.interrupted()) {
213 throw new TransactionInterruptedException("transaction interrupted");
214 }
215
216 Transaction transaction = getCurrentTransaction();
217
218 EhcacheXAResource xaResource = transactionToXAResourceMap.get(transaction);
219 Long timeoutTimestamp = transactionToTimeoutMap.get(transaction);
220 if (xaResource != null && timeoutTimestamp == null) {
221 int xaResourceTimeout = xaResource.getTransactionTimeout();
222
223 timeoutTimestamp = System.currentTimeMillis() + (xaResourceTimeout * MILLISECOND_PER_SECOND);
224 transactionToTimeoutMap.put(transaction, timeoutTimestamp);
225 } else if (timeoutTimestamp == null) {
226 int defaultTransactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
227 timeoutTimestamp = System.currentTimeMillis() + (defaultTransactionTimeout * MILLISECOND_PER_SECOND);
228 transactionToTimeoutMap.put(transaction, timeoutTimestamp);
229 }
230
231 if (timeoutTimestamp <= System.currentTimeMillis()) {
232 throw new TransactionTimeoutException("transaction timed out");
233 }
234
235 return timeoutTimestamp - System.currentTimeMillis();
236 } catch (SystemException e) {
237 throw new TransactionException("cannot get the current transaction", e);
238 } catch (XAException e) {
239 throw new TransactionException("cannot get the XAResource transaction timeout", e);
240 }
241 }
242
243
244
245 /***
246 * {@inheritDoc}
247 */
248 public Element get(Object key) {
249 LOG.debug("cache {} get {}", cache.getName(), key);
250 XATransactionContext context = getTransactionContext();
251 Element element;
252 if (context == null) {
253 element = getFromUnderlyingStore(key);
254 } else {
255 element = context.get(key);
256 if (element == null && !context.isRemoved(key)) {
257 element = getFromUnderlyingStore(key);
258 }
259 }
260 return copyElementForRead(element);
261 }
262
263
264 /***
265 * {@inheritDoc}
266 */
267 public Element getQuiet(Object key) {
268 LOG.debug("cache {} getQuiet {}", cache.getName(), key);
269 XATransactionContext context = getTransactionContext();
270 Element element;
271 if (context == null) {
272 element = getQuietFromUnderlyingStore(key);
273 } else {
274 element = context.get(key);
275 if (element == null && !context.isRemoved(key)) {
276 element = getQuietFromUnderlyingStore(key);
277 }
278 }
279 return copyElementForRead(element);
280 }
281
282 /***
283 * {@inheritDoc}
284 */
285 public int getSize() {
286 LOG.debug("cache {} getSize", cache.getName());
287 XATransactionContext context = getOrCreateTransactionContext();
288 int size = underlyingStore.getSize();
289 return size + context.getSizeModifier();
290 }
291
292 /***
293 * {@inheritDoc}
294 */
295 public int getTerracottaClusteredSize() {
296 try {
297 Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
298 if (transaction == null) {
299 return underlyingStore.getTerracottaClusteredSize();
300 }
301 } catch (SystemException se) {
302 throw new TransactionException("cannot get the current transaction", se);
303 }
304
305 LOG.debug("cache {} getTerracottaClusteredSize", cache.getName());
306 XATransactionContext context = getOrCreateTransactionContext();
307 int size = underlyingStore.getTerracottaClusteredSize();
308 return size + context.getSizeModifier();
309 }
310
311 /***
312 * {@inheritDoc}
313 */
314 public boolean containsKey(Object key) {
315 LOG.debug("cache {} containsKey", cache.getName(), key);
316 XATransactionContext context = getOrCreateTransactionContext();
317 return !context.isRemoved(key) && (context.getAddedKeys().contains(key) || underlyingStore.containsKey(key));
318 }
319
320 /***
321 * {@inheritDoc}
322 */
323 public List getKeys() {
324 LOG.debug("cache {} getKeys", cache.getName());
325 XATransactionContext context = getOrCreateTransactionContext();
326 Set<Object> keys = new LargeSet<Object>() {
327
328 @Override
329 public int sourceSize() {
330 return underlyingStore.getSize();
331 }
332
333 @Override
334 public Iterator<Object> sourceIterator() {
335 return underlyingStore.getKeys().iterator();
336 }
337 };
338 keys.addAll(context.getAddedKeys());
339 keys.removeAll(context.getRemovedKeys());
340 return new SetWrapperList(keys);
341 }
342
343
344 private Element getFromUnderlyingStore(final Object key) {
345 while (true) {
346 long timeLeft = assertNotTimedOut();
347 LOG.debug("cache {} underlying.get key {} not timed out, time left: " + timeLeft, cache.getName(), key);
348
349 Element element = underlyingStore.get(key);
350 if (element == null) {
351 return null;
352 }
353 Object value = element.getObjectValue();
354 if (value instanceof SoftLock) {
355 SoftLock softLock = (SoftLock) value;
356 try {
357 LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
358 boolean gotLock = softLock.tryLock(timeLeft);
359 if (gotLock) {
360 softLock.clearTryLock();
361 }
362 } catch (InterruptedException e) {
363 Thread.currentThread().interrupt();
364 }
365 } else {
366 return element;
367 }
368 }
369 }
370
371 private Element getQuietFromUnderlyingStore(final Object key) {
372 while (true) {
373 long timeLeft = assertNotTimedOut();
374 LOG.debug("cache {} underlying.getQuiet key {} not timed out, time left: " + timeLeft, cache.getName(), key);
375
376 Element element = underlyingStore.getQuiet(key);
377 if (element == null) {
378 return null;
379 }
380 Object value = element.getObjectValue();
381 if (value instanceof SoftLock) {
382 SoftLock softLock = (SoftLock) value;
383 try {
384 LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
385 boolean gotLock = softLock.tryLock(timeLeft);
386 if (gotLock) {
387 softLock.clearTryLock();
388 }
389 } catch (InterruptedException e) {
390 Thread.currentThread().interrupt();
391 }
392 } else {
393 return element;
394 }
395 }
396 }
397
398 private Element getCurrentElement(final Object key, final XATransactionContext context) {
399 Element previous = context.get(key);
400 if (previous == null && !context.isRemoved(key)) {
401 previous = getQuietFromUnderlyingStore(key);
402 }
403 return previous;
404 }
405
406 /***
407 * {@inheritDoc}
408 */
409 public boolean put(Element element) throws CacheException {
410 LOG.debug("cache {} put {}", cache.getName(), element);
411
412 getOrCreateTransactionContext();
413
414 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
415 return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
416 }
417
418 /***
419 * {@inheritDoc}
420 */
421 public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException {
422 LOG.debug("cache {} putWithWriter {}", cache.getName(), element);
423
424 getOrCreateTransactionContext();
425
426 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
427 if (writerManager != null) {
428 writerManager.put(element);
429 } else {
430 cache.getWriterManager().put(element);
431 }
432 return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
433 }
434
435 private boolean internalPut(final StorePutCommand putCommand) {
436 final Element element = putCommand.getElement();
437 boolean isNull;
438 if (element == null) {
439 return true;
440 }
441 XATransactionContext context = getOrCreateTransactionContext();
442
443 isNull = underlyingStore.get(element.getKey()) == null;
444 if (isNull) {
445 isNull = context.get(element.getKey()) == null;
446 }
447 context.addCommand(putCommand, element);
448 return isNull;
449 }
450
451
452 /***
453 * {@inheritDoc}
454 */
455 public Element remove(Object key) {
456 LOG.debug("cache {} remove {}", cache.getName(), key);
457
458 getOrCreateTransactionContext();
459
460 Element oldElement = getQuietFromUnderlyingStore(key);
461 return removeInternal(new StoreRemoveCommand(key, oldElement));
462 }
463
464 private Element removeInternal(final StoreRemoveCommand command) {
465 Element element = command.getEntry().getElement();
466 getOrCreateTransactionContext().addCommand(command, element);
467 return copyElementForRead(element);
468 }
469
470 /***
471 * {@inheritDoc}
472 */
473 public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException {
474 LOG.debug("cache {} removeWithWriter {}", cache.getName(), key);
475
476 getOrCreateTransactionContext();
477
478 Element oldElement = getQuietFromUnderlyingStore(key);
479 if (writerManager != null) {
480 writerManager.remove(new CacheEntry(key, null));
481 } else {
482 cache.getWriterManager().remove(new CacheEntry(key, null));
483 }
484 return removeInternal(new StoreRemoveCommand(key, oldElement));
485 }
486
487 /***
488 * {@inheritDoc}
489 */
490 public void removeAll() throws CacheException {
491 LOG.debug("cache {} removeAll", cache.getName());
492 List keys = getKeys();
493 for (Object key : keys) {
494 remove(key);
495 }
496 }
497
498 /***
499 * {@inheritDoc}
500 */
501 public Element putIfAbsent(Element element) throws NullPointerException {
502 LOG.debug("cache {} putIfAbsent {}", cache.getName(), element);
503 XATransactionContext context = getOrCreateTransactionContext();
504 Element previous = getCurrentElement(element.getObjectKey(), context);
505
506 if (previous == null) {
507 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
508 Element elementForWrite = copyElementForWrite(element);
509 context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
510 }
511
512 return copyElementForRead(previous);
513 }
514
515 /***
516 * {@inheritDoc}
517 */
518 public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
519 LOG.debug("cache {} removeElement {}", cache.getName(), element);
520 XATransactionContext context = getOrCreateTransactionContext();
521 Element previous = getCurrentElement(element.getKey(), context);
522
523 Element elementForWrite = copyElementForWrite(element);
524 if (previous != null && comparator.equals(previous, elementForWrite)) {
525 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
526 context.addCommand(new StoreRemoveCommand(element.getObjectKey(), oldElement), elementForWrite);
527 return copyElementForRead(previous);
528 }
529 return null;
530 }
531
532 /***
533 * {@inheritDoc}
534 */
535 public boolean replace(Element old, Element element, ElementValueComparator comparator)
536 throws NullPointerException, IllegalArgumentException {
537 LOG.debug("cache {} replace2 {}", cache.getName(), element);
538 XATransactionContext context = getOrCreateTransactionContext();
539 Element previous = getCurrentElement(element.getKey(), context);
540
541 boolean replaced = false;
542 if (previous != null && comparator.equals(previous, copyElementForWrite(old))) {
543 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
544 Element elementForWrite = copyElementForWrite(element);
545 context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
546 replaced = true;
547 }
548 return replaced;
549 }
550
551 /***
552 * {@inheritDoc}
553 */
554 public Element replace(Element element) throws NullPointerException {
555 LOG.debug("cache {} replace1 {}", cache.getName(), element);
556 XATransactionContext context = getOrCreateTransactionContext();
557 Element previous = getCurrentElement(element.getKey(), context);
558
559 if (previous != null) {
560 Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
561 Element elementForWrite = copyElementForWrite(element);
562 context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
563 }
564 return copyElementForRead(previous);
565 }
566
567 /***
568 * {@inheritDoc}
569 */
570 @Override
571 public void setAttributeExtractors(Map<String, AttributeExtractor> extractors) {
572 Map<String, AttributeExtractor> wrappedExtractors = new HashMap(extractors.size());
573 for (Entry<String, AttributeExtractor> e : extractors.entrySet()) {
574 wrappedExtractors.put(e.getKey(), new TransactionAwareAttributeExtractor(copyStrategy, e.getValue()));
575 }
576 underlyingStore.setAttributeExtractors(wrappedExtractors);
577 }
578
579 }