View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.constructs.nonstop;
18  
19  import java.lang.ref.ReferenceQueue;
20  import java.lang.ref.WeakReference;
21  import java.util.HashSet;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.WeakHashMap;
25  import java.util.Map.Entry;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.FutureTask;
29  import java.util.concurrent.RejectedExecutionException;
30  import java.util.concurrent.ThreadFactory;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  /***
34   * A thread pool that creates another thread pool per requesting thread.
35   *
36   * @author Abhishek Sanoujam
37   *
38   */
39  public class NonstopThreadPool {
40  
41      private static final long POLL_TIME_MILLIS = 1000;
42      private static final long NUM_OF_POLLS_BEFORE_CHECK_THREADS_ALIVE = 100;
43  
44      private final ThreadFactory threadFactory;
45      private final Map<Thread, WorkerThreadLocal> workers = new WeakHashMap<Thread, WorkerThreadLocal>();
46      private final Object workersLock = new Object();
47      private final AtomicReference<State> state = new AtomicReference<State>(State.RUNNING);
48      private final ReferenceQueue<Thread> gcedThreadsReferenceQueue = new ReferenceQueue<Thread>();
49      private final ThreadLocal<WorkerThreadLocal> workerThreadLocal = new ThreadLocal<WorkerThreadLocal>() {
50          @Override
51          protected WorkerThreadLocal initialValue() {
52              WorkerThreadLocal local = new WorkerThreadLocal(threadFactory, gcedThreadsReferenceQueue);
53              synchronized (workersLock) {
54                  if (state.get() == State.SHUTDOWN) {
55                      rejectExecutionAfterShutdown();
56                  }
57                  workers.put(Thread.currentThread(), local);
58              }
59              return local;
60          }
61  
62      };
63  
64      /***
65       * Constructor accepting the threadFactory
66       *
67       * @param threadFactory
68       */
69      public NonstopThreadPool(ThreadFactory threadFactory) {
70          this.threadFactory = threadFactory;
71          startReaperThread();
72      }
73  
74      private void startReaperThread() {
75          Thread reaperThread = new Thread(new ReaperThread(), "non stop reaper thread");
76          reaperThread.start();
77      }
78  
79      /***
80       * class which manages the alive non stop threads
81       *
82       * @author Raghvendra Singh
83       *
84       */
85      private class ReaperThread implements Runnable {
86  
87          public void run() {
88              int pollCount = 0;
89              while (state.get() != State.SHUTDOWN) {
90                  WeakWorkerReference gcedThreadReference = null;
91                  try {
92                      gcedThreadReference = (WeakWorkerReference) gcedThreadsReferenceQueue.remove(POLL_TIME_MILLIS);
93                      // check if threads are alive every 10 loop and shut them down
94                      if (++pollCount == NUM_OF_POLLS_BEFORE_CHECK_THREADS_ALIVE) {
95                          Set<Thread> deadThreads = new HashSet<Thread>();
96                          pollCount = 0;
97                          synchronized (workersLock) {
98                              for (Entry<Thread, WorkerThreadLocal> entry : workers.entrySet()) {
99                                  if (!entry.getKey().isAlive()) {
100                                     entry.getValue().shutdownNow();
101                                     deadThreads.add(entry.getKey());
102                                 }
103                             }
104 
105                             for (Thread th : deadThreads) {
106                                 workers.remove(th);
107                             }
108                         }
109                     }
110                 } catch (InterruptedException e) {
111                     // ignored
112                 }
113                 if (gcedThreadReference != null) {
114                     gcedThreadReference.getWorker().shutdownNow();
115                 }
116             }
117         }
118     }
119 
120     private void rejectExecutionAfterShutdown() {
121         throw new RejectedExecutionException("The thread pool has already shut down.");
122     }
123 
124     /***
125      * Submit a callable task to be executed by the thread pool
126      *
127      * @param <T>
128      * @param task
129      * @return Future of the task
130      */
131     public <T> Future<T> submit(Callable<T> task) {
132         if (task == null) {
133             throw new NullPointerException("Task cannot be null");
134         }
135         return workerThreadLocal.get().submit(task);
136     }
137 
138     /***
139      * Shuts down the thread pool
140      */
141     public void shutdownNow() {
142         state.set(State.SHUTDOWN);
143         synchronized (workersLock) {
144             for (WorkerThreadLocal worker : workers.values()) {
145                 worker.shutdownNow();
146             }
147         }
148     }
149 
150     /***
151      * Private class
152      *
153      * @author Abhishek Sanoujam
154      *
155      */
156     private static class WorkerThreadLocal {
157 
158         private final Worker worker;
159         private final WeakWorkerReference appThreadReference;
160 
161         public WorkerThreadLocal(ThreadFactory threadFactory, ReferenceQueue<Thread> gcedThreadsReferenceQueue) {
162             this.worker = new Worker();
163             threadFactory.newThread(worker).start();
164             this.appThreadReference = new WeakWorkerReference(this.worker, Thread.currentThread(), gcedThreadsReferenceQueue);
165         }
166 
167         public void shutdownNow() {
168             worker.shutdownNow();
169         }
170 
171         public <T> Future<T> submit(Callable<T> task) {
172             FutureTask<T> ftask = new FutureTask<T>(task);
173             worker.addTask(ftask);
174             return ftask;
175         }
176     }
177 
178     /***
179      * Worker class
180      *
181      * @author Abhishek Sanoujam
182      *
183      */
184     private static class Worker implements Runnable {
185         private final WorkerTaskHolder workerTaskHolder;
186         private volatile boolean shutdown;
187         private volatile Thread workerThread;
188         private volatile boolean runningTask;
189 
190         public Worker() {
191             this.workerTaskHolder = new WorkerTaskHolder();
192         }
193 
194         public void run() {
195             workerThread = Thread.currentThread();
196             while (!shutdown) {
197                 waitUntilTaskAvailable();
198                 if (shutdown) {
199                     break;
200                 }
201                 Runnable task = workerTaskHolder.consumeTask();
202                 if (task != null) {
203                     synchronized (this) {
204                         runningTask = true;
205                         if (shutdown) {
206                             break;
207                         }
208                     }
209                     task.run();
210                     synchronized (this) {
211                         runningTask = false;
212                     }
213                 }
214             }
215         }
216 
217         public void shutdownNow() {
218             shutdown = true;
219             synchronized (this) {
220                 this.notifyAll();
221                 if (runningTask) {
222                     // interrupt if running task already
223                     workerThread.interrupt();
224                 }
225             }
226         }
227 
228         public void addTask(Runnable runnable) {
229             synchronized (this) {
230                 workerTaskHolder.addTask(runnable);
231                 this.notifyAll();
232             }
233         }
234 
235         private void waitUntilTaskAvailable() {
236             synchronized (this) {
237                 while (!workerTaskHolder.isTaskAvailable()) {
238                     if (shutdown) {
239                         return;
240                     }
241                     try {
242                         this.wait();
243                     } catch (InterruptedException e) {
244                         // ignore
245                     }
246                 }
247             }
248         }
249     }
250 
251     /***
252      * Private class maintaining single pending task
253      *
254      * @author Abhishek Sanoujam
255      *
256      */
257     private static class WorkerTaskHolder {
258         private Runnable task;
259 
260         public synchronized void addTask(Runnable runnable) {
261             // keep only 1 pending task
262             this.task = runnable;
263         }
264 
265         public synchronized Runnable consumeTask() {
266             if (task == null) {
267                 return null;
268             }
269             Runnable rv = task;
270             task = null;
271             return rv;
272         }
273 
274         public synchronized boolean isTaskAvailable() {
275             return task != null;
276         }
277     }
278 
279     /***
280      * private class maintaining the app thread and its corresponding worker thread
281      *
282      * @author Raghvendra Singh
283      */
284     private static class WeakWorkerReference extends WeakReference<Thread> {
285 
286         private final Worker worker;
287 
288         public WeakWorkerReference(Worker worker, Thread referent, ReferenceQueue<? super Thread> q) {
289             super(referent, q);
290             this.worker = worker;
291         }
292 
293         public Worker getWorker() {
294             return this.worker;
295         }
296 
297     }
298 
299     /***
300      * Private enum maintaining state of the pool
301      *
302      * @author Abhishek Sanoujam
303      *
304      */
305     private static enum State {
306         RUNNING, SHUTDOWN;
307     }
308 
309 }