1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop;
18
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import net.sf.ehcache.CacheException;
30 import net.sf.ehcache.constructs.nonstop.concurrency.InvalidLockStateAfterRejoinException;
31
32 /***
33 * Class used by NonStopCache for executing tasks within a timeout limit.
34 *
35 * @author Abhishek Sanoujam
36 *
37 */
38 public class NonstopExecutorServiceImpl implements NonstopExecutorService {
39
40 private static final Logger LOGGER = LoggerFactory.getLogger(NonstopExecutorServiceImpl.class);
41 private final NonstopThreadPool nonstopThreadPool;
42
43 /***
44 * Constructor accepting a {@link ThreadFactory} that will be used to create threads for the pool
45 *
46 * @param threadFactory
47 */
48 public NonstopExecutorServiceImpl(final ThreadFactory threadFactory) {
49 this.nonstopThreadPool = new NonstopThreadPool(threadFactory);
50 }
51
52 /***
53 * {@inheritDoc}
54 */
55 public <V> V execute(final Callable<V> callable, final long timeoutValueInMillis) throws TimeoutException, CacheException,
56 InterruptedException {
57 int attempt = 0;
58 V result = null;
59 long startTime = System.nanoTime();
60 while (true) {
61 try {
62 attempt++;
63 result = nonstopThreadPool.submit(callable).get(timeoutValueInMillis, TimeUnit.MILLISECONDS);
64 break;
65 } catch (InterruptedException e) {
66
67 throw e;
68 } catch (RejectedExecutionException e) {
69
70 long now = System.nanoTime();
71 if (now - startTime > TimeUnit.NANOSECONDS.convert(timeoutValueInMillis, TimeUnit.MILLISECONDS)) {
72 throw new TaskNotSubmittedTimeoutException(attempt);
73 } else {
74 continue;
75 }
76 } catch (ExecutionException e) {
77 Throwable rootCause = getRootCause(e);
78 if (rootCause.getClass().getSimpleName().equals("TCNotRunningException")) {
79 throw new TimeoutException(rootCause.getMessage());
80 }
81
82 if (rootCause instanceof ThrowTimeoutException) {
83
84 throw new TimeoutException("Callable threw " + rootCause.getClass().getName());
85 }
86
87 if (rootCause instanceof InterruptedException) {
88
89 throw new TimeoutException("Callable threw " + rootCause.getClass().getName());
90 }
91
92 if (e.getCause() instanceof InvalidLockStateAfterRejoinException) {
93 throw new InvalidLockStateAfterRejoinException(e.getCause());
94 }
95
96 throw new CacheException(e.getCause());
97 } catch (TimeoutException e) {
98
99 throw e;
100 }
101 }
102 return result;
103 }
104
105 private Throwable getRootCause(final Throwable exception) {
106 Throwable e = exception;
107 while (e.getCause() != null) {
108 e = e.getCause();
109 }
110 return e;
111 }
112
113 /***
114 * {@inheritDoc}
115 */
116 public void shutdown() {
117 LOGGER.debug("Shutting down NonstopExecutorService");
118 nonstopThreadPool.shutdownNow();
119 }
120 }