1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.constructs.nonstop;
18
19 import java.lang.ref.ReferenceQueue;
20 import java.lang.ref.WeakReference;
21 import java.util.HashSet;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.WeakHashMap;
25 import java.util.Map.Entry;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.FutureTask;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 /***
34 * A thread pool that creates another thread pool per requesting thread.
35 *
36 * @author Abhishek Sanoujam
37 *
38 */
39 public class NonstopThreadPool {
40
41 private static final long POLL_TIME_MILLIS = 1000;
42 private static final long NUM_OF_POLLS_BEFORE_CHECK_THREADS_ALIVE = 100;
43
44 private final ThreadFactory threadFactory;
45 private final Map<Thread, WorkerThreadLocal> workers = new WeakHashMap<Thread, WorkerThreadLocal>();
46 private final Object workersLock = new Object();
47 private final AtomicReference<State> state = new AtomicReference<State>(State.RUNNING);
48 private final ReferenceQueue<Thread> gcedThreadsReferenceQueue = new ReferenceQueue<Thread>();
49 private final ThreadLocal<WorkerThreadLocal> workerThreadLocal = new ThreadLocal<WorkerThreadLocal>() {
50 @Override
51 protected WorkerThreadLocal initialValue() {
52 WorkerThreadLocal local = new WorkerThreadLocal(threadFactory, gcedThreadsReferenceQueue);
53 synchronized (workersLock) {
54 if (state.get() == State.SHUTDOWN) {
55 rejectExecutionAfterShutdown();
56 }
57 workers.put(Thread.currentThread(), local);
58 }
59 return local;
60 }
61
62 };
63
64 /***
65 * Constructor accepting the threadFactory
66 *
67 * @param threadFactory
68 */
69 public NonstopThreadPool(ThreadFactory threadFactory) {
70 this.threadFactory = threadFactory;
71 startReaperThread();
72 }
73
74 private void startReaperThread() {
75 Thread reaperThread = new Thread(new ReaperThread(), "non stop reaper thread");
76 reaperThread.start();
77 }
78
79 /***
80 * class which manages the alive non stop threads
81 *
82 * @author Raghvendra Singh
83 *
84 */
85 private class ReaperThread implements Runnable {
86
87 public void run() {
88 int pollCount = 0;
89 while (state.get() != State.SHUTDOWN) {
90 WeakWorkerReference gcedThreadReference = null;
91 try {
92 gcedThreadReference = (WeakWorkerReference) gcedThreadsReferenceQueue.remove(POLL_TIME_MILLIS);
93
94 if (++pollCount == NUM_OF_POLLS_BEFORE_CHECK_THREADS_ALIVE) {
95 Set<Thread> deadThreads = new HashSet<Thread>();
96 pollCount = 0;
97 synchronized (workersLock) {
98 for (Entry<Thread, WorkerThreadLocal> entry : workers.entrySet()) {
99 if (!entry.getKey().isAlive()) {
100 entry.getValue().shutdownNow();
101 deadThreads.add(entry.getKey());
102 }
103 }
104
105 for (Thread th : deadThreads) {
106 workers.remove(th);
107 }
108 }
109 }
110 } catch (InterruptedException e) {
111
112 }
113 if (gcedThreadReference != null) {
114 gcedThreadReference.getWorker().shutdownNow();
115 }
116 }
117 }
118 }
119
120 private void rejectExecutionAfterShutdown() {
121 throw new RejectedExecutionException("The thread pool has already shut down.");
122 }
123
124 /***
125 * Submit a callable task to be executed by the thread pool
126 *
127 * @param <T>
128 * @param task
129 * @return Future of the task
130 */
131 public <T> Future<T> submit(Callable<T> task) {
132 if (task == null) {
133 throw new NullPointerException("Task cannot be null");
134 }
135 return workerThreadLocal.get().submit(task);
136 }
137
138 /***
139 * Shuts down the thread pool
140 */
141 public void shutdownNow() {
142 state.set(State.SHUTDOWN);
143 synchronized (workersLock) {
144 for (WorkerThreadLocal worker : workers.values()) {
145 worker.shutdownNow();
146 }
147 }
148 }
149
150 /***
151 * Private class
152 *
153 * @author Abhishek Sanoujam
154 *
155 */
156 private static class WorkerThreadLocal {
157
158 private final Worker worker;
159 private final WeakWorkerReference appThreadReference;
160
161 public WorkerThreadLocal(ThreadFactory threadFactory, ReferenceQueue<Thread> gcedThreadsReferenceQueue) {
162 this.worker = new Worker();
163 threadFactory.newThread(worker).start();
164 this.appThreadReference = new WeakWorkerReference(this.worker, Thread.currentThread(), gcedThreadsReferenceQueue);
165 }
166
167 public void shutdownNow() {
168 worker.shutdownNow();
169 }
170
171 public <T> Future<T> submit(Callable<T> task) {
172 FutureTask<T> ftask = new FutureTask<T>(task);
173 worker.addTask(ftask);
174 return ftask;
175 }
176 }
177
178 /***
179 * Worker class
180 *
181 * @author Abhishek Sanoujam
182 *
183 */
184 private static class Worker implements Runnable {
185 private final WorkerTaskHolder workerTaskHolder;
186 private volatile boolean shutdown;
187 private volatile Thread workerThread;
188 private volatile boolean runningTask;
189
190 public Worker() {
191 this.workerTaskHolder = new WorkerTaskHolder();
192 }
193
194 public void run() {
195 workerThread = Thread.currentThread();
196 while (!shutdown) {
197 waitUntilTaskAvailable();
198 if (shutdown) {
199 break;
200 }
201 Runnable task = workerTaskHolder.consumeTask();
202 if (task != null) {
203 synchronized (this) {
204 runningTask = true;
205 if (shutdown) {
206 break;
207 }
208 }
209 task.run();
210 synchronized (this) {
211 runningTask = false;
212 }
213 }
214 }
215 }
216
217 public void shutdownNow() {
218 shutdown = true;
219 synchronized (this) {
220 this.notifyAll();
221 if (runningTask) {
222
223 workerThread.interrupt();
224 }
225 }
226 }
227
228 public void addTask(Runnable runnable) {
229 synchronized (this) {
230 workerTaskHolder.addTask(runnable);
231 this.notifyAll();
232 }
233 }
234
235 private void waitUntilTaskAvailable() {
236 synchronized (this) {
237 while (!workerTaskHolder.isTaskAvailable()) {
238 if (shutdown) {
239 return;
240 }
241 try {
242 this.wait();
243 } catch (InterruptedException e) {
244
245 }
246 }
247 }
248 }
249 }
250
251 /***
252 * Private class maintaining single pending task
253 *
254 * @author Abhishek Sanoujam
255 *
256 */
257 private static class WorkerTaskHolder {
258 private Runnable task;
259
260 public synchronized void addTask(Runnable runnable) {
261
262 this.task = runnable;
263 }
264
265 public synchronized Runnable consumeTask() {
266 if (task == null) {
267 return null;
268 }
269 Runnable rv = task;
270 task = null;
271 return rv;
272 }
273
274 public synchronized boolean isTaskAvailable() {
275 return task != null;
276 }
277 }
278
279 /***
280 * private class maintaining the app thread and its corresponding worker thread
281 *
282 * @author Raghvendra Singh
283 */
284 private static class WeakWorkerReference extends WeakReference<Thread> {
285
286 private final Worker worker;
287
288 public WeakWorkerReference(Worker worker, Thread referent, ReferenceQueue<? super Thread> q) {
289 super(referent, q);
290 this.worker = worker;
291 }
292
293 public Worker getWorker() {
294 return this.worker;
295 }
296
297 }
298
299 /***
300 * Private enum maintaining state of the pool
301 *
302 * @author Abhishek Sanoujam
303 *
304 */
305 private static enum State {
306 RUNNING, SHUTDOWN;
307 }
308
309 }