1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop.store;
18
19 import net.sf.ehcache.CacheException;
20 import net.sf.ehcache.Element;
21 import net.sf.ehcache.cluster.CacheCluster;
22 import net.sf.ehcache.config.NonstopConfiguration;
23 import net.sf.ehcache.constructs.nonstop.NonstopActiveDelegateHolder;
24 import net.sf.ehcache.constructs.nonstop.concurrency.ExplicitLockingContextThreadLocal;
25 import net.sf.ehcache.store.ElementValueComparator;
26 import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
27 import net.sf.ehcache.writer.CacheWriterManager;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import javax.transaction.InvalidTransactionException;
32 import javax.transaction.SystemException;
33 import javax.transaction.Transaction;
34 import javax.transaction.TransactionManager;
35 import java.util.List;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.TimeoutException;
38
39 /***
40 * This implementation is identical to TransactionalExecutorServiceStore except that it ensures the transactional context
41 * gets propagated to the executor thread.
42 * <p/>
43 *
44 * @see ExecutorServiceStore
45 * @author Ludovic Orban
46 *
47 */
48 public class TransactionalExecutorServiceStore extends ExecutorServiceStore {
49
50 private static final Logger LOG = LoggerFactory.getLogger(TransactionalExecutorServiceStore.class.getName());
51
52 private final TransactionManager transactionManager;
53
54 /***
55 * Constructor
56 * @param explicitLockingContextThreadLocal
57 */
58 public TransactionalExecutorServiceStore(final NonstopActiveDelegateHolder nonstopActiveDelegateHolder,
59 final NonstopConfiguration nonstopConfiguration, final NonstopTimeoutBehaviorStoreResolver timeoutBehaviorResolver,
60 CacheCluster cacheCluster, TransactionManagerLookup transactionManagerLookup,
61 ExplicitLockingContextThreadLocal explicitLockingContextThreadLocal) {
62 super(nonstopActiveDelegateHolder, nonstopConfiguration, timeoutBehaviorResolver, cacheCluster, explicitLockingContextThreadLocal);
63 this.transactionManager = transactionManagerLookup.getTransactionManager();
64 }
65
66 /***
67 * {@inheritDoc}.
68 */
69 @Override
70 public boolean put(final Element element) throws CacheException {
71 boolean rv = false;
72 final Transaction tx = suspendCaller();
73 try {
74 rv = executeWithExecutor(new Callable<Boolean>() {
75 public Boolean call() throws Exception {
76 resumeCallee(tx);
77 try {
78 return underlyingTerracottaStore().put(element);
79 } finally {
80 suspendCallee();
81 }
82 }
83 });
84 } catch (TimeoutException e) {
85 return resolveTimeoutBehaviorStore().put(element);
86 } finally {
87 resumeCaller(tx);
88 }
89
90 return rv;
91 }
92
93 /***
94 * {@inheritDoc}.
95 */
96 @Override
97 public boolean putWithWriter(final Element element, final CacheWriterManager writerManager) throws CacheException {
98 boolean rv = false;
99 final Transaction tx = suspendCaller();
100 try {
101 rv = executeWithExecutor(new Callable<Boolean>() {
102 public Boolean call() throws Exception {
103 resumeCallee(tx);
104 try {
105 return underlyingTerracottaStore().putWithWriter(element, writerManager);
106 } finally {
107 suspendCallee();
108 }
109 }
110 });
111 } catch (TimeoutException e) {
112 return resolveTimeoutBehaviorStore().putWithWriter(element, writerManager);
113 } finally {
114 resumeCaller(tx);
115 }
116 return rv;
117 }
118
119 /***
120 * {@inheritDoc}.
121 */
122 @Override
123 public Element get(final Object key) {
124 Element rv = null;
125 final Transaction tx = suspendCaller();
126 try {
127 rv = executeWithExecutor(new Callable<Element>() {
128 public Element call() throws Exception {
129 resumeCallee(tx);
130 try {
131 return underlyingTerracottaStore().get(key);
132 } finally {
133 suspendCallee();
134 }
135 }
136 });
137 } catch (TimeoutException e) {
138 return resolveTimeoutBehaviorStore().get(key);
139 } finally {
140 resumeCaller(tx);
141 }
142 return rv;
143 }
144
145 /***
146 * {@inheritDoc}.
147 */
148 @Override
149 public Element getQuiet(final Object key) {
150 Element rv = null;
151 final Transaction tx = suspendCaller();
152 try {
153 rv = executeWithExecutor(new Callable<Element>() {
154 public Element call() throws Exception {
155 resumeCallee(tx);
156 try {
157 return underlyingTerracottaStore().getQuiet(key);
158 } finally {
159 suspendCallee();
160 }
161 }
162 });
163 } catch (TimeoutException e) {
164 return resolveTimeoutBehaviorStore().getQuiet(key);
165 } finally {
166 resumeCaller(tx);
167 }
168 return rv;
169 }
170
171 /***
172 * {@inheritDoc}.
173 */
174 @Override
175 public List getKeys() {
176 List rv = null;
177 final Transaction tx = suspendCaller();
178 try {
179 rv = executeWithExecutor(new Callable<List>() {
180 public List call() throws Exception {
181 resumeCallee(tx);
182 try {
183 return underlyingTerracottaStore().getKeys();
184 } finally {
185 suspendCallee();
186 }
187 }
188 });
189 } catch (TimeoutException e) {
190 return resolveTimeoutBehaviorStore().getKeys();
191 } finally {
192 resumeCaller(tx);
193 }
194 return rv;
195 }
196
197 /***
198 * {@inheritDoc}.
199 */
200 @Override
201 public Element remove(final Object key) {
202 Element rv = null;
203 final Transaction tx = suspendCaller();
204 try {
205 rv = executeWithExecutor(new Callable<Element>() {
206 public Element call() throws Exception {
207 resumeCallee(tx);
208 try {
209 return underlyingTerracottaStore().remove(key);
210 } finally {
211 suspendCallee();
212 }
213 }
214 });
215 } catch (TimeoutException e) {
216 return resolveTimeoutBehaviorStore().remove(key);
217 } finally {
218 resumeCaller(tx);
219 }
220 return rv;
221 }
222
223 /***
224 * {@inheritDoc}.
225 */
226 @Override
227 public Element removeWithWriter(final Object key, final CacheWriterManager writerManager) throws CacheException {
228 Element rv = null;
229 final Transaction tx = suspendCaller();
230 try {
231 rv = executeWithExecutor(new Callable<Element>() {
232 public Element call() throws Exception {
233 resumeCallee(tx);
234 try {
235 return underlyingTerracottaStore().removeWithWriter(key, writerManager);
236 } finally {
237 suspendCallee();
238 }
239 }
240 });
241 } catch (TimeoutException e) {
242 return resolveTimeoutBehaviorStore().removeWithWriter(key, writerManager);
243 } finally {
244 resumeCaller(tx);
245 }
246 return rv;
247 }
248
249 /***
250 * {@inheritDoc}.
251 * The timeout used by this method is {@link net.sf.ehcache.config.NonstopConfiguration#getBulkOpsTimeoutMultiplyFactor()} times the timeout value in the
252 * config.
253 */
254 @Override
255 public void removeAll() throws CacheException {
256 final Transaction tx = suspendCaller();
257 try {
258 executeWithExecutor(new Callable<Void>() {
259 public Void call() throws Exception {
260 resumeCallee(tx);
261 try {
262 underlyingTerracottaStore().removeAll();
263 return null;
264 } finally {
265 suspendCallee();
266 }
267 }
268 }, nonstopConfiguration.getTimeoutMillis() * nonstopConfiguration.getBulkOpsTimeoutMultiplyFactor());
269 } catch (TimeoutException e) {
270 resolveTimeoutBehaviorStore().removeAll();
271 } finally {
272 resumeCaller(tx);
273 }
274 }
275
276 /***
277 * {@inheritDoc}.
278 */
279 @Override
280 public Element putIfAbsent(final Element element) throws NullPointerException {
281 Element rv = null;
282 final Transaction tx = suspendCaller();
283 try {
284 rv = executeWithExecutor(new Callable<Element>() {
285 public Element call() throws Exception {
286 resumeCallee(tx);
287 try {
288 return underlyingTerracottaStore().putIfAbsent(element);
289 } finally {
290 suspendCallee();
291 }
292 }
293 });
294 } catch (TimeoutException e) {
295 return resolveTimeoutBehaviorStore().putIfAbsent(element);
296 } finally {
297 resumeCaller(tx);
298 }
299 return rv;
300 }
301
302 /***
303 * {@inheritDoc}.
304 */
305 @Override
306 public Element removeElement(final Element element, final ElementValueComparator comparator) throws NullPointerException {
307 Element rv = null;
308 final Transaction tx = suspendCaller();
309 try {
310 rv = executeWithExecutor(new Callable<Element>() {
311 public Element call() throws Exception {
312 resumeCallee(tx);
313 try {
314 return underlyingTerracottaStore().removeElement(element, comparator);
315 } finally {
316 suspendCallee();
317 }
318 }
319 });
320 } catch (TimeoutException e) {
321 return resolveTimeoutBehaviorStore().removeElement(element, comparator);
322 } finally {
323 resumeCaller(tx);
324 }
325 return rv;
326 }
327
328 /***
329 * {@inheritDoc}.
330 */
331 @Override
332 public boolean replace(final Element old, final Element element, final ElementValueComparator comparator) throws NullPointerException,
333 IllegalArgumentException {
334 boolean rv = false;
335 final Transaction tx = suspendCaller();
336 try {
337 rv = executeWithExecutor(new Callable<Boolean>() {
338 public Boolean call() throws Exception {
339 resumeCallee(tx);
340 try {
341 return underlyingTerracottaStore().replace(old, element, comparator);
342 } finally {
343 suspendCallee();
344 }
345 }
346 });
347 } catch (TimeoutException e) {
348 return resolveTimeoutBehaviorStore().replace(old, element, comparator);
349 } finally {
350 resumeCaller(tx);
351 }
352 return rv;
353 }
354
355 /***
356 * {@inheritDoc}.
357 */
358 @Override
359 public Element replace(final Element element) throws NullPointerException {
360 Element rv = null;
361 final Transaction tx = suspendCaller();
362 try {
363 rv = executeWithExecutor(new Callable<Element>() {
364 public Element call() throws Exception {
365 resumeCallee(tx);
366 try {
367 return underlyingTerracottaStore().replace(element);
368 } finally {
369 suspendCallee();
370 }
371 }
372 });
373 } catch (TimeoutException e) {
374 return resolveTimeoutBehaviorStore().replace(element);
375 } finally {
376 resumeCaller(tx);
377 }
378 return rv;
379 }
380
381 /***
382 * {@inheritDoc}.
383 */
384 @Override
385 public int getSize() {
386 int rv = 0;
387 final Transaction tx = suspendCaller();
388 try {
389 rv = executeWithExecutor(new Callable<Integer>() {
390 public Integer call() throws Exception {
391 resumeCallee(tx);
392 try {
393 return underlyingTerracottaStore().getSize();
394 } finally {
395 suspendCallee();
396 }
397 }
398 });
399 } catch (TimeoutException e) {
400 return resolveTimeoutBehaviorStore().getSize();
401 } finally {
402 resumeCaller(tx);
403 }
404 return rv;
405 }
406
407
408 /***
409 * {@inheritDoc}.
410 */
411 @Override
412 public int getTerracottaClusteredSize() {
413 int rv = 0;
414 final Transaction tx = suspendCaller();
415 try {
416 rv = executeWithExecutor(new Callable<Integer>() {
417 public Integer call() throws Exception {
418 resumeCallee(tx);
419 try {
420 return underlyingTerracottaStore().getTerracottaClusteredSize();
421 } finally {
422 suspendCallee();
423 }
424 }
425 });
426 } catch (TimeoutException e) {
427 return resolveTimeoutBehaviorStore().getTerracottaClusteredSize();
428 } finally {
429 resumeCaller(tx);
430 }
431 return rv;
432 }
433
434 /***
435 * {@inheritDoc}.
436 */
437 @Override
438 public boolean containsKey(final Object key) {
439 boolean rv = false;
440 final Transaction tx = suspendCaller();
441 try {
442 rv = executeWithExecutor(new Callable<Boolean>() {
443 public Boolean call() throws Exception {
444 resumeCallee(tx);
445 try {
446 return underlyingTerracottaStore().containsKey(key);
447 } finally {
448 suspendCallee();
449 }
450 }
451 });
452 } catch (TimeoutException e) {
453 return resolveTimeoutBehaviorStore().containsKey(key);
454 } finally {
455 resumeCaller(tx);
456 }
457 return rv;
458 }
459
460
461
462 private void resumeCaller(Transaction tx) {
463 if (tx == null) {
464 return;
465 }
466
467 try {
468 transactionManager.resume(tx);
469 } catch (IllegalStateException ise) {
470 LOG.warn("error resuming JTA transaction context on caller thread", ise);
471 } catch (InvalidTransactionException ite) {
472 LOG.warn("error resuming JTA transaction context on caller thread", ite);
473 } catch (SystemException se) {
474 LOG.warn("error resuming JTA transaction context on caller thread", se);
475 }
476 }
477
478 private Transaction suspendCaller() {
479 try {
480 return transactionManager.suspend();
481 } catch (SystemException se) {
482 throw new CacheException("error suspending JTA transaction context", se);
483 }
484 }
485
486 private void resumeCallee(Transaction tx) {
487 if (tx == null) {
488 return;
489 }
490
491 try {
492 transactionManager.resume(tx);
493 } catch (IllegalStateException ise) {
494 throw new CacheException("error resuming JTA transaction context on caller thread", ise);
495 } catch (InvalidTransactionException ite) {
496 throw new CacheException("error resuming JTA transaction context on caller thread", ite);
497 } catch (SystemException se) {
498 throw new CacheException("error resuming JTA transaction context on caller thread", se);
499 }
500 }
501
502 private Transaction suspendCallee() {
503 try {
504 return transactionManager.suspend();
505 } catch (SystemException se) {
506 LOG.warn("error suspending JTA transaction context", se);
507 }
508 return null;
509 }
510
511 }