View Javadoc

1   package net.sf.ehcache.transaction.xa;
2   
3   import bitronix.tm.BitronixTransaction;
4   import bitronix.tm.TransactionManagerServices;
5   import bitronix.tm.internal.TransactionStatusChangeListener;
6   import junit.framework.AssertionFailedError;
7   import junit.framework.TestCase;
8   import net.sf.ehcache.CacheManager;
9   import net.sf.ehcache.Ehcache;
10  import net.sf.ehcache.Element;
11  import net.sf.ehcache.transaction.TransactionTimeoutException;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import javax.transaction.Status;
16  import javax.transaction.TransactionManager;
17  
18  /***
19   * @author Ludovic Orban
20   */
21  public class XATransactionTest extends TestCase {
22  
23      private static final Logger LOG = LoggerFactory.getLogger(XATransactionTest.class);
24  
25      private TransactionManager tm;
26      private CacheManager cacheManager;
27      private Ehcache cache1;
28      private Ehcache cache2;
29  
30      @Override
31      protected void setUp() throws Exception {
32          TransactionManagerServices.getConfiguration().setJournal("null").setGracefulShutdownInterval(0).setBackgroundRecoveryIntervalSeconds(1);
33  
34          tm = TransactionManagerServices.getTransactionManager();
35          cacheManager = new CacheManager(XATransactionTest.class.getResourceAsStream("/ehcache-tx-twopc.xml"));
36  
37          cache1 = cacheManager.getEhcache("txCache1");
38          cache2 = cacheManager.getEhcache("txCache2");
39          tm.begin();
40  
41          cache1.removeAll();
42          cache2.removeAll();
43  
44          tm.commit();
45      }
46  
47      @Override
48      protected void tearDown() throws Exception {
49          if (tm.getTransaction() != null) {
50              tm.rollback();
51          }
52          cacheManager.shutdown();
53          TransactionManagerServices.getTransactionManager().shutdown();
54      }
55  
56      public void testSimple() throws Exception {
57          LOG.info("******* START");
58  
59          tm.begin();
60          cache1.get(1);
61          cache1.put(new Element(1, "one"));
62          tm.commit();
63  
64          tm.begin();
65          Element e = cache1.get(1);
66          assertEquals("one", e.getObjectValue());
67          cache1.remove(1);
68          e = cache1.get(1);
69          assertNull(e);
70          int size = cache1.getSize();
71          assertEquals(0, size);
72          tm.rollback();
73  
74          tm.begin();
75          e = cache1.get(1);
76          assertEquals("one", e.getObjectValue());
77  
78          tm.rollback();
79  
80          LOG.info("******* END");
81      }
82  
83      public void testPutDuring2PC() throws Exception {
84          tm.begin();
85  
86          cache1.put(new Element(1, "one"));
87          // 1PC bypasses 1st phase -> enlist a 2nd resource to prevent it
88          cache2.put(new Element(1, "one"));
89  
90          BitronixTransaction tx = (BitronixTransaction) tm.getTransaction();
91  
92          tx.addTransactionStatusChangeListener(new TransactionStatusChangeListener() {
93              public void statusChanged(int oldStatus, int newStatus) {
94                  if (oldStatus == Status.STATUS_PREPARED) {
95  
96                      TxThread t = new TxThread() {
97                          @Override
98                          public void exec() throws Exception {
99                              tm.setTransactionTimeout(1);
100                             tm.begin();
101 
102                             try {
103                                 cache1.put(new Element(1, "one#2"));
104                                 fail("expected TransactionTimeoutException");
105                             } catch (TransactionTimeoutException e) {
106                                 // expected
107                             }
108                             tm.rollback();
109                         }
110                     };
111                     t.start();
112                     t.joinAndAssertNotFailed();
113                 }
114             }
115         });
116 
117         tm.commit();
118     }
119 
120     private static class TxThread extends Thread {
121         private volatile boolean failed;
122 
123         @Override
124         public final void run() {
125             try {
126                 exec();
127             } catch (Throwable t) {
128                 t.printStackTrace();
129                 failed = true;
130             }
131         }
132 
133         public void exec() throws Exception {
134         }
135 
136         public void joinAndAssertNotFailed() {
137             try {
138                 join();
139             } catch (InterruptedException e) {
140                 // ignore
141             }
142             assertNotFailed();
143         }
144 
145         public void assertNotFailed() {
146             if (failed) {
147                 throw new AssertionFailedError("TxThread failed");
148             }
149         }
150     }
151 }