View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.transaction.xa.processor;
18  
19  import net.sf.ehcache.transaction.xa.EhcacheXAException;
20  import net.sf.ehcache.transaction.xa.EhcacheXAResourceImpl;
21  
22  import javax.transaction.xa.XAException;
23  import javax.transaction.xa.XAResource;
24  import javax.transaction.xa.Xid;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentMap;
28  import java.util.concurrent.ExecutionException;
29  
30  /***
31   * Default implementation for XARequestProcessor.
32   * 
33   * This class ties an Xid to an Executor service. This is necessary so that
34   * locking for 2pc by the same thread.
35   * 
36   * {@link XARequestProcessor xaRequestProcessor}.
37   *  
38   * @author Nabib El-Rahman
39   */
40  public class XARequestProcessor {
41  
42      private static volatile XAThreadPool xaProcessorPool;
43  
44      private final ConcurrentMap<Xid, XAThreadPool.MultiRunner> executorMap =
45              new ConcurrentHashMap<Xid, XAThreadPool.MultiRunner>();
46      private EhcacheXAResourceImpl resourceImpl;
47  
48      /***
49       * Constructor
50       * 
51       * @param resourceImpl The EhcacheXAResourceImpl instance this processor will perform against
52       */
53      public XARequestProcessor(EhcacheXAResourceImpl resourceImpl) {
54          this.resourceImpl = resourceImpl;
55          if (xaProcessorPool == null) {
56              xaProcessorPool = new XAThreadPool();
57          }
58      }
59  
60      /***
61       * Release resources shared by all XARequestProcessors
62       */
63      public static void shutdown() {
64          if (xaProcessorPool != null) {
65              xaProcessorPool.shutdown();
66              xaProcessorPool = null;
67          }
68      }
69  
70      /***
71       * Process a XARequest
72       * @param request the XARequest
73       * @return the XAResource response code
74       * @throws XAException the XAException thrown by the XAResource
75       */
76      public int process(XARequest request) throws XAException {
77          XAThreadPool.MultiRunner multiRunner = getOrCreateThread(request.getXid());
78  
79          XAResponse xaResponse;
80          try {
81              xaResponse = (XAResponse) multiRunner.execute(new XARequestCallable(resourceImpl, request, request.getXid()));
82          } catch (InterruptedException e) {
83              cleanupThread(request.getXid());
84              throw new EhcacheXAException(e.getMessage(), XAException.XAER_RMERR, e);
85          } catch (ExecutionException e) {
86              cleanupThread(request.getXid());
87              throw new EhcacheXAException(e.getMessage(), XAException.XAER_RMERR, e);
88          }
89          if (xaResponse.getXaException() != null) {
90              cleanupThread(request.getXid());
91              throw new EhcacheXAException("XA " + request.getRequestType().toString().toLowerCase() +
92                      " request failed on [" + request.getXid() + "]", xaResponse.getXaException().errorCode,
93                      xaResponse.getXaException());
94          }
95          
96          if (request.getRequestType().equals(XARequest.RequestType.COMMIT) ||
97              request.getRequestType().equals(XARequest.RequestType.ROLLBACK) ||
98              request.getRequestType().equals(XARequest.RequestType.FORGET) ||
99              (request.getRequestType().equals(XARequest.RequestType.PREPARE) && xaResponse.getFlags() == XAResource.XA_RDONLY)) {
100             cleanupThread(request.getXid());
101         }
102 
103         return xaResponse.getFlags();
104     }
105     
106     /***
107      * Gets the executor service for a Transaction, either by creating a new one if none exists, or returning the
108      * existing one
109      * @param xid The Xid of the Transaction
110      * @return the ExecutorService for that Transaction
111      */
112     private XAThreadPool.MultiRunner getOrCreateThread(Xid xid) {
113         XAThreadPool.MultiRunner service = executorMap.get(xid);
114         if (service == null) {
115             service = xaProcessorPool.getMultiRunner();
116             executorMap.put(xid, service);
117         }
118         return service;
119     }
120 
121     /***
122      * Removes the ExecutorService from the map and shuts it down
123      * @param xid The Xid of the Transaction
124      */
125     private void cleanupThread(Xid xid) {
126         XAThreadPool.MultiRunner service = executorMap.remove(xid);
127         service.release();
128     }
129 
130     /***
131      * Class to furnish
132      * @author Nabib El-Rahman
133      *
134      */
135     private static class XARequestCallable implements Callable<XAResponse> {
136         private final EhcacheXAResourceImpl resourceImpl;
137         private final XARequest request;
138         private Xid xid;
139 
140         /***
141          * Constructor
142          * @param resourceImpl the EhcacheXAResourceImpl this Request will be used for
143          * @param request the actual Request
144          * @param xid
145          */
146         public XARequestCallable(EhcacheXAResourceImpl resourceImpl, XARequest request, Xid xid) {
147             this.resourceImpl = resourceImpl;
148             this.request = request;
149             this.xid = xid;
150         }
151              
152         /***
153          * 
154          */
155         public XAResponse call() throws Exception {
156             Thread.currentThread().setName("XA-Request processor Thread Xid [ " + xid + " ]");
157 
158             int returnFlag = XAResource.TMNOFLAGS;
159             XAException xaException = null;
160             try {
161             switch(request.getRequestType()) {
162                 
163                 case FORGET:
164                     resourceImpl.forgetInternal(request.getXid());
165                     break;
166                     
167                 case PREPARE:
168                     returnFlag = resourceImpl.prepareInternal(request.getXid());
169                     break;
170                     
171                 case ROLLBACK:
172                     resourceImpl.rollbackInternal(request.getXid());
173                     break;
174                     
175                 case COMMIT:
176                     resourceImpl.commitInternal(request.getXid(), request.isOnePhase());
177                     break;
178                 
179                 default:
180                     throw new EhcacheXAException("Unknown enum type: " + request.getRequestType(), XAException.XAER_RMERR);
181             }
182             } catch (XAException xaE) {
183                 xaException = xaE;
184             }
185             
186             return new XAResponse(returnFlag, xaException);
187         }
188         
189     }
190     
191     /***
192      * 
193      * @author nelrahma
194      *
195      */
196     private static class XAResponse {
197         
198         private final int flags;
199         private final XAException xaException;
200         
201         /***
202          * Constructor
203          * @param flags flags returned by the actual call against the XAResource
204          * @param xaException Exception thrown by the call, otherwise null
205          */
206         public XAResponse(int flags, XAException xaException) {
207             this.flags = flags;
208             this.xaException = xaException;
209         }
210 
211         /***
212          * Gets the flags returned by the actual call against the XAResource
213          * @return the flags
214          */
215         public int getFlags() {
216             return flags;
217         }
218 
219         /***
220          * Gets the Exception thrown by the actual call against the XAResource
221          * @return the exception, null if none
222          */
223         public XAException getXaException() {
224             return xaException;
225         }   
226         
227     }
228 
229 }