1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.transaction.xa.processor;
18
19 import net.sf.ehcache.transaction.xa.EhcacheXAException;
20 import net.sf.ehcache.transaction.xa.EhcacheXAResourceImpl;
21
22 import javax.transaction.xa.XAException;
23 import javax.transaction.xa.XAResource;
24 import javax.transaction.xa.Xid;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29
30 /***
31 * Default implementation for XARequestProcessor.
32 *
33 * This class ties an Xid to an Executor service. This is necessary so that
34 * locking for 2pc by the same thread.
35 *
36 * {@link XARequestProcessor xaRequestProcessor}.
37 *
38 * @author Nabib El-Rahman
39 */
40 public class XARequestProcessor {
41
42 private static volatile XAThreadPool xaProcessorPool;
43
44 private final ConcurrentMap<Xid, XAThreadPool.MultiRunner> executorMap =
45 new ConcurrentHashMap<Xid, XAThreadPool.MultiRunner>();
46 private EhcacheXAResourceImpl resourceImpl;
47
48 /***
49 * Constructor
50 *
51 * @param resourceImpl The EhcacheXAResourceImpl instance this processor will perform against
52 */
53 public XARequestProcessor(EhcacheXAResourceImpl resourceImpl) {
54 this.resourceImpl = resourceImpl;
55 if (xaProcessorPool == null) {
56 xaProcessorPool = new XAThreadPool();
57 }
58 }
59
60 /***
61 * Release resources shared by all XARequestProcessors
62 */
63 public static void shutdown() {
64 if (xaProcessorPool != null) {
65 xaProcessorPool.shutdown();
66 xaProcessorPool = null;
67 }
68 }
69
70 /***
71 * Process a XARequest
72 * @param request the XARequest
73 * @return the XAResource response code
74 * @throws XAException the XAException thrown by the XAResource
75 */
76 public int process(XARequest request) throws XAException {
77 XAThreadPool.MultiRunner multiRunner = getOrCreateThread(request.getXid());
78
79 XAResponse xaResponse;
80 try {
81 xaResponse = (XAResponse) multiRunner.execute(new XARequestCallable(resourceImpl, request, request.getXid()));
82 } catch (InterruptedException e) {
83 cleanupThread(request.getXid());
84 throw new EhcacheXAException(e.getMessage(), XAException.XAER_RMERR, e);
85 } catch (ExecutionException e) {
86 cleanupThread(request.getXid());
87 throw new EhcacheXAException(e.getMessage(), XAException.XAER_RMERR, e);
88 }
89 if (xaResponse.getXaException() != null) {
90 cleanupThread(request.getXid());
91 throw new EhcacheXAException("XA " + request.getRequestType().toString().toLowerCase() +
92 " request failed on [" + request.getXid() + "]", xaResponse.getXaException().errorCode,
93 xaResponse.getXaException());
94 }
95
96 if (request.getRequestType().equals(XARequest.RequestType.COMMIT) ||
97 request.getRequestType().equals(XARequest.RequestType.ROLLBACK) ||
98 request.getRequestType().equals(XARequest.RequestType.FORGET) ||
99 (request.getRequestType().equals(XARequest.RequestType.PREPARE) && xaResponse.getFlags() == XAResource.XA_RDONLY)) {
100 cleanupThread(request.getXid());
101 }
102
103 return xaResponse.getFlags();
104 }
105
106 /***
107 * Gets the executor service for a Transaction, either by creating a new one if none exists, or returning the
108 * existing one
109 * @param xid The Xid of the Transaction
110 * @return the ExecutorService for that Transaction
111 */
112 private XAThreadPool.MultiRunner getOrCreateThread(Xid xid) {
113 XAThreadPool.MultiRunner service = executorMap.get(xid);
114 if (service == null) {
115 service = xaProcessorPool.getMultiRunner();
116 executorMap.put(xid, service);
117 }
118 return service;
119 }
120
121 /***
122 * Removes the ExecutorService from the map and shuts it down
123 * @param xid The Xid of the Transaction
124 */
125 private void cleanupThread(Xid xid) {
126 XAThreadPool.MultiRunner service = executorMap.remove(xid);
127 service.release();
128 }
129
130 /***
131 * Class to furnish
132 * @author Nabib El-Rahman
133 *
134 */
135 private static class XARequestCallable implements Callable<XAResponse> {
136 private final EhcacheXAResourceImpl resourceImpl;
137 private final XARequest request;
138 private Xid xid;
139
140 /***
141 * Constructor
142 * @param resourceImpl the EhcacheXAResourceImpl this Request will be used for
143 * @param request the actual Request
144 * @param xid
145 */
146 public XARequestCallable(EhcacheXAResourceImpl resourceImpl, XARequest request, Xid xid) {
147 this.resourceImpl = resourceImpl;
148 this.request = request;
149 this.xid = xid;
150 }
151
152 /***
153 *
154 */
155 public XAResponse call() throws Exception {
156 Thread.currentThread().setName("XA-Request processor Thread Xid [ " + xid + " ]");
157
158 int returnFlag = XAResource.TMNOFLAGS;
159 XAException xaException = null;
160 try {
161 switch(request.getRequestType()) {
162
163 case FORGET:
164 resourceImpl.forgetInternal(request.getXid());
165 break;
166
167 case PREPARE:
168 returnFlag = resourceImpl.prepareInternal(request.getXid());
169 break;
170
171 case ROLLBACK:
172 resourceImpl.rollbackInternal(request.getXid());
173 break;
174
175 case COMMIT:
176 resourceImpl.commitInternal(request.getXid(), request.isOnePhase());
177 break;
178
179 default:
180 throw new EhcacheXAException("Unknown enum type: " + request.getRequestType(), XAException.XAER_RMERR);
181 }
182 } catch (XAException xaE) {
183 xaException = xaE;
184 }
185
186 return new XAResponse(returnFlag, xaException);
187 }
188
189 }
190
191 /***
192 *
193 * @author nelrahma
194 *
195 */
196 private static class XAResponse {
197
198 private final int flags;
199 private final XAException xaException;
200
201 /***
202 * Constructor
203 * @param flags flags returned by the actual call against the XAResource
204 * @param xaException Exception thrown by the call, otherwise null
205 */
206 public XAResponse(int flags, XAException xaException) {
207 this.flags = flags;
208 this.xaException = xaException;
209 }
210
211 /***
212 * Gets the flags returned by the actual call against the XAResource
213 * @return the flags
214 */
215 public int getFlags() {
216 return flags;
217 }
218
219 /***
220 * Gets the Exception thrown by the actual call against the XAResource
221 * @return the exception, null if none
222 */
223 public XAException getXaException() {
224 return xaException;
225 }
226
227 }
228
229 }