View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop;
18  
19  import java.util.concurrent.Callable;
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.RejectedExecutionException;
22  import java.util.concurrent.ThreadFactory;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import net.sf.ehcache.CacheException;
30  import net.sf.ehcache.constructs.nonstop.concurrency.InvalidLockStateAfterRejoinException;
31  
32  /***
33   * Class used by NonStopCache for executing tasks within a timeout limit.
34   *
35   * @author Abhishek Sanoujam
36   *
37   */
38  public class NonstopExecutorServiceImpl implements NonstopExecutorService {
39  
40      private static final Logger LOGGER = LoggerFactory.getLogger(NonstopExecutorServiceImpl.class);
41      private final NonstopThreadPool nonstopThreadPool;
42  
43      /***
44       * Constructor accepting a {@link ThreadFactory} that will be used to create threads for the pool
45       *
46       * @param threadFactory
47       */
48      public NonstopExecutorServiceImpl(final ThreadFactory threadFactory) {
49          this.nonstopThreadPool = new NonstopThreadPool(threadFactory);
50      }
51  
52      /***
53       * {@inheritDoc}
54       */
55      public <V> V execute(final Callable<V> callable, final long timeoutValueInMillis) throws TimeoutException, CacheException,
56              InterruptedException {
57          int attempt = 0;
58          V result = null;
59          long startTime = System.nanoTime();
60          while (true) {
61              try {
62                  attempt++;
63                  result = nonstopThreadPool.submit(callable).get(timeoutValueInMillis, TimeUnit.MILLISECONDS);
64                  break;
65              } catch (InterruptedException e) {
66                  // XXX: do something here?
67                  throw e;
68              } catch (RejectedExecutionException e) {
69                  // if the executor rejects (too many tasks executing), try until timed out
70                  long now = System.nanoTime();
71                  if (now - startTime > TimeUnit.NANOSECONDS.convert(timeoutValueInMillis, TimeUnit.MILLISECONDS)) {
72                      throw new TaskNotSubmittedTimeoutException(attempt);
73                  } else {
74                      continue;
75                  }
76              } catch (ExecutionException e) {
77                  Throwable rootCause = getRootCause(e);
78                  if (rootCause.getClass().getSimpleName().equals("TCNotRunningException")) {
79                      throw new TimeoutException(rootCause.getMessage());
80                  }
81  
82                  if (rootCause instanceof ThrowTimeoutException) {
83                      // rethrow as TimeoutException
84                      throw new TimeoutException("Callable threw " + rootCause.getClass().getName());
85                  }
86  
87                  if (rootCause instanceof InterruptedException) {
88                      // the executor service itself is shutting down, rethrow as timeout exception
89                      throw new TimeoutException("Callable threw " + rootCause.getClass().getName());
90                  }
91  
92                  if (e.getCause() instanceof InvalidLockStateAfterRejoinException) {
93                      throw new InvalidLockStateAfterRejoinException(e.getCause());
94                  }
95  
96                  throw new CacheException(e.getCause());
97              } catch (TimeoutException e) {
98                  // rethrow timeout exception
99                  throw e;
100             }
101         }
102         return result;
103     }
104 
105     private Throwable getRootCause(final Throwable exception) {
106         Throwable e = exception;
107         while (e.getCause() != null) {
108             e = e.getCause();
109         }
110         return e;
111     }
112 
113     /***
114      * {@inheritDoc}
115      */
116     public void shutdown() {
117         LOGGER.debug("Shutting down NonstopExecutorService");
118         nonstopThreadPool.shutdownNow();
119     }
120 }