View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop.store;
18  
19  import java.util.concurrent.Callable;
20  
21  import net.sf.ehcache.config.TimeoutBehaviorConfiguration.TimeoutBehaviorType;
22  import net.sf.ehcache.constructs.nonstop.ClusterOperation;
23  
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  /***
28   * A {@link Callable} implementation that accepts another callable delegate for executing it in nonstop+rejoin context.
29   * Executing the {@link #call()} operation will execute the delegate callable and block until it returns. On rejoin, the delegate callable
30   * is executed again.
31   *
32   * @author Abhishek Sanoujam
33   *
34   * @param <V>
35   */
36  public class RejoinAwareBlockingOperation<V> implements Callable<V> {
37  
38      private static final Logger LOGGER = LoggerFactory.getLogger(RejoinAwareBlockingOperation.class);
39  
40      private final Callable<V> delegateCallable;
41      private final ExecutorServiceStore executorServiceStore;
42      private volatile Thread executingThread;
43      private volatile boolean rejoinHappened;
44  
45      /***
46       * Public constructor
47       *
48       * @param executorServiceStore
49       * @param callable
50       */
51      public RejoinAwareBlockingOperation(ExecutorServiceStore executorServiceStore, Callable<V> callable) {
52          this.executorServiceStore = executorServiceStore;
53          this.delegateCallable = callable;
54      }
55  
56      /***
57       * {@inheritDoc}.
58       * <p />
59       * Throws {@link InterruptedException} if the executing thread is interrupted before the call returns
60       */
61      public V call() throws Exception {
62          executingThread = Thread.currentThread();
63          return executeUntilComplete();
64      }
65  
66      private V executeUntilComplete() throws Exception {
67          while (true) {
68              try {
69                  rejoinHappened = false;
70                  executorServiceStore.executeClusterOperationNoTimeout(new ClusterOperation<V>() {
71  
72                      public V performClusterOperation() throws Exception {
73                          return delegateCallable.call();
74                      }
75  
76                      public V performClusterOperationTimedOut(TimeoutBehaviorType configuredTimeoutBehavior) {
77                          throw new AssertionError("This should never happen as executed with no-timeout");
78                      }
79  
80                  });
81                  return delegateCallable.call();
82              } catch (InterruptedException e) {
83                  if (rejoinHappened) {
84                      LOGGER.debug("Caught InterruptedException caused by rejoin. Executing callable again.");
85                      continue;
86                  } else {
87                      throw e;
88                  }
89  
90              }
91          }
92      }
93  
94      /***
95       * Called when cluster rejoin happens
96       */
97      public void clusterRejoined() {
98          rejoinHappened = true;
99          // interrupt the executing thread so that it can retry again
100         if (executingThread != null) {
101             LOGGER.debug("Interrupting executing thread (id=" + executingThread.getId() + ", name='" + executingThread.getName()
102                     + "') as rejoin happened");
103             executingThread.interrupt();
104         }
105     }
106 
107 }