1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop.store;
18
19 import java.util.concurrent.Callable;
20
21 import net.sf.ehcache.config.TimeoutBehaviorConfiguration.TimeoutBehaviorType;
22 import net.sf.ehcache.constructs.nonstop.ClusterOperation;
23
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /***
28 * A {@link Callable} implementation that accepts another callable delegate for executing it in nonstop+rejoin context.
29 * Executing the {@link #call()} operation will execute the delegate callable and block until it returns. On rejoin, the delegate callable
30 * is executed again.
31 *
32 * @author Abhishek Sanoujam
33 *
34 * @param <V>
35 */
36 public class RejoinAwareBlockingOperation<V> implements Callable<V> {
37
38 private static final Logger LOGGER = LoggerFactory.getLogger(RejoinAwareBlockingOperation.class);
39
40 private final Callable<V> delegateCallable;
41 private final ExecutorServiceStore executorServiceStore;
42 private volatile Thread executingThread;
43 private volatile boolean rejoinHappened;
44
45 /***
46 * Public constructor
47 *
48 * @param executorServiceStore
49 * @param callable
50 */
51 public RejoinAwareBlockingOperation(ExecutorServiceStore executorServiceStore, Callable<V> callable) {
52 this.executorServiceStore = executorServiceStore;
53 this.delegateCallable = callable;
54 }
55
56 /***
57 * {@inheritDoc}.
58 * <p />
59 * Throws {@link InterruptedException} if the executing thread is interrupted before the call returns
60 */
61 public V call() throws Exception {
62 executingThread = Thread.currentThread();
63 return executeUntilComplete();
64 }
65
66 private V executeUntilComplete() throws Exception {
67 while (true) {
68 try {
69 rejoinHappened = false;
70 executorServiceStore.executeClusterOperationNoTimeout(new ClusterOperation<V>() {
71
72 public V performClusterOperation() throws Exception {
73 return delegateCallable.call();
74 }
75
76 public V performClusterOperationTimedOut(TimeoutBehaviorType configuredTimeoutBehavior) {
77 throw new AssertionError("This should never happen as executed with no-timeout");
78 }
79
80 });
81 return delegateCallable.call();
82 } catch (InterruptedException e) {
83 if (rejoinHappened) {
84 LOGGER.debug("Caught InterruptedException caused by rejoin. Executing callable again.");
85 continue;
86 } else {
87 throw e;
88 }
89
90 }
91 }
92 }
93
94 /***
95 * Called when cluster rejoin happens
96 */
97 public void clusterRejoined() {
98 rejoinHappened = true;
99
100 if (executingThread != null) {
101 LOGGER.debug("Interrupting executing thread (id=" + executingThread.getId() + ", name='" + executingThread.getName()
102 + "') as rejoin happened");
103 executingThread.interrupt();
104 }
105 }
106
107 }