View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.terracotta;
18  
19  import net.sf.ehcache.Cache;
20  import net.sf.ehcache.CacheManager;
21  import net.sf.ehcache.CacheStoreHelper;
22  import net.sf.ehcache.Ehcache;
23  import net.sf.ehcache.store.Store;
24  import net.sf.ehcache.store.TerracottaStore;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import java.io.IOException;
29  import java.util.Collection;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.ScheduledThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  /***
35   * A class that will snapshot the local keySet of a Terracotta clustered cache to disk
36   *
37   * @author Alex Snaps
38   */
39  class KeySnapshotter implements Runnable {
40  
41      private static final Logger LOG = LoggerFactory.getLogger(KeySnapshotter.class.getName());
42      private static final int POOL_SIZE = Integer.getInteger("net.sf.ehcache.terracotta.KeySnapshotter.threadPoolSize", 10);
43  
44      private static final WeakIdentityConcurrentMap<CacheManager, ScheduledExecutorService> INSTANCES =
45          new WeakIdentityConcurrentMap<CacheManager, ScheduledExecutorService>(
46              new WeakIdentityConcurrentMap.CleanUpTask<ScheduledExecutorService>() {
47                  public void cleanUp(final ScheduledExecutorService executor) {
48                      executor.shutdownNow();
49                  }
50              });
51  
52      private final String cacheName;
53      private final TerracottaStore tcStore;
54      private final RotatingSnapshotFile rotatingWriter;
55      private final Thread thread;
56  
57      /***
58       * Default Constructor
59       *
60       * @param cache                          the Terracotta clustered Cache to snapshot
61       * @param interval                       the interval to do the snapshots on
62       * @param doKeySnapshotOnDedicatedThread whether the snapshots have to be done on a dedicated thread
63       * @param rotatingWriter                 the RotatingSnapshotFile to write to
64       * @throws IllegalArgumentException if interval is less than or equal to zero
65       */
66      KeySnapshotter(final Ehcache cache, final long interval,
67                     final boolean doKeySnapshotOnDedicatedThread,
68                     final RotatingSnapshotFile rotatingWriter)
69          throws IllegalArgumentException {
70          final Store store = new CacheStoreHelper((Cache)cache).getStore();
71          if (!(store instanceof TerracottaStore)) {
72              throw new IllegalArgumentException("Cache '" + cache.getName() + "' isn't backed by a " + TerracottaStore.class.getSimpleName()
73                                                 + " but uses a " + store.getClass().getName() + " instead");
74          }
75  
76          if (interval <= 0) {
77              throw new IllegalArgumentException("Interval needs to be a positive & non-zero value");
78          }
79  
80          if (rotatingWriter == null) {
81              throw new NullPointerException();
82          }
83  
84          this.cacheName = cache.getName();
85          this.rotatingWriter = rotatingWriter;
86          this.tcStore = (TerracottaStore)store;
87  
88          if (doKeySnapshotOnDedicatedThread) {
89              thread = new SnapShottingThread(this, interval, "KeySnapshotter for cache " + cacheName);
90              thread.start();
91          } else {
92              ScheduledExecutorService scheduledExecutorService = INSTANCES.get(cache.getCacheManager());
93              if (scheduledExecutorService == null) {
94                  scheduledExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);
95                  final ScheduledExecutorService previous = INSTANCES.putIfAbsent(cache.getCacheManager(), scheduledExecutorService);
96                  if (previous != null) {
97                      scheduledExecutorService.shutdownNow();
98                      scheduledExecutorService = previous;
99                  }
100             }
101             scheduledExecutorService.scheduleWithFixedDelay(this, interval, interval, TimeUnit.SECONDS);
102             thread = null;
103         }
104     }
105 
106     /***
107      * Shuts down the writer thread and cleans up resources
108      *
109      * @param immediately whether to leave the writer finish or shut down immediately
110      */
111     void dispose(boolean immediately) {
112         if (thread != null) {
113             rotatingWriter.setShutdownOnThreadInterrupted(immediately);
114             thread.interrupt();
115             try {
116                 thread.join();
117             } catch (InterruptedException e) {
118                 Thread.currentThread().interrupt();
119             }
120         }
121     }
122 
123     /***
124      * {@inheritDoc}
125      */
126     public void run() {
127         try {
128             INSTANCES.cleanUp();
129             rotatingWriter.writeAll(tcStore.getLocalKeys());
130         } catch (Throwable e) {
131             LOG.error("Couldn't snapshot local keySet for Cache {}", cacheName, e);
132         }
133     }
134 
135     /***
136      * Accessor to all known cacheManagers (which are also bound to a ScheduledExecutorService)
137      *
138      * @return the collection of known CacheManagers
139      */
140     static Collection<CacheManager> getKnownCacheManagers() {
141         return INSTANCES.keySet();
142     }
143 
144     /***
145      * Calling this method will result in a snapshot being taken or wait for the one in progress to finish
146      *
147      * @throws IOException On exception being thrown while doing the snapshot
148      */
149     void doSnapshot() throws IOException {
150         rotatingWriter.snapshotNowOrWaitForCurrentToFinish(tcStore.getLocalKeys());
151     }
152 
153     /***
154      * Returns the name of the underlying cache for which this snapshots
155      * @return The name of the cache
156      */
157     public String getCacheName() {
158         return cacheName;
159     }
160 
161     /***
162      * Thread doing background snapshots of the local key set
163      */
164     private static class SnapShottingThread extends Thread {
165 
166         private long lastRun;
167         private final long interval;
168 
169         public SnapShottingThread(final Runnable runnable, final long interval, final String threadName) {
170             super(runnable, threadName);
171             this.interval = interval;
172             lastRun = System.currentTimeMillis();
173             this.setDaemon(true);
174         }
175 
176         @Override
177         public void run() {
178             while (!isInterrupted()) {
179                 final long nextTime = lastRun + TimeUnit.SECONDS.toMillis(interval);
180                 final long now = System.currentTimeMillis();
181                 if (nextTime <= now) {
182                     super.run();
183                     lastRun = System.currentTimeMillis();
184                 } else {
185                     try {
186                         sleep(nextTime - now);
187                     } catch (InterruptedException e) {
188                         interrupt();
189                     }
190                 }
191             }
192         }
193     }
194 }