1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.terracotta;
18
19 import net.sf.ehcache.Cache;
20 import net.sf.ehcache.CacheManager;
21 import net.sf.ehcache.CacheStoreHelper;
22 import net.sf.ehcache.Ehcache;
23 import net.sf.ehcache.store.Store;
24 import net.sf.ehcache.store.TerracottaStore;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import java.io.IOException;
29 import java.util.Collection;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 /***
35 * A class that will snapshot the local keySet of a Terracotta clustered cache to disk
36 *
37 * @author Alex Snaps
38 */
39 class KeySnapshotter implements Runnable {
40
41 private static final Logger LOG = LoggerFactory.getLogger(KeySnapshotter.class.getName());
42 private static final int POOL_SIZE = Integer.getInteger("net.sf.ehcache.terracotta.KeySnapshotter.threadPoolSize", 10);
43
44 private static final WeakIdentityConcurrentMap<CacheManager, ScheduledExecutorService> INSTANCES =
45 new WeakIdentityConcurrentMap<CacheManager, ScheduledExecutorService>(
46 new WeakIdentityConcurrentMap.CleanUpTask<ScheduledExecutorService>() {
47 public void cleanUp(final ScheduledExecutorService executor) {
48 executor.shutdownNow();
49 }
50 });
51
52 private final String cacheName;
53 private final TerracottaStore tcStore;
54 private final RotatingSnapshotFile rotatingWriter;
55 private final Thread thread;
56
57 /***
58 * Default Constructor
59 *
60 * @param cache the Terracotta clustered Cache to snapshot
61 * @param interval the interval to do the snapshots on
62 * @param doKeySnapshotOnDedicatedThread whether the snapshots have to be done on a dedicated thread
63 * @param rotatingWriter the RotatingSnapshotFile to write to
64 * @throws IllegalArgumentException if interval is less than or equal to zero
65 */
66 KeySnapshotter(final Ehcache cache, final long interval,
67 final boolean doKeySnapshotOnDedicatedThread,
68 final RotatingSnapshotFile rotatingWriter)
69 throws IllegalArgumentException {
70 final Store store = new CacheStoreHelper((Cache)cache).getStore();
71 if (!(store instanceof TerracottaStore)) {
72 throw new IllegalArgumentException("Cache '" + cache.getName() + "' isn't backed by a " + TerracottaStore.class.getSimpleName()
73 + " but uses a " + store.getClass().getName() + " instead");
74 }
75
76 if (interval <= 0) {
77 throw new IllegalArgumentException("Interval needs to be a positive & non-zero value");
78 }
79
80 if (rotatingWriter == null) {
81 throw new NullPointerException();
82 }
83
84 this.cacheName = cache.getName();
85 this.rotatingWriter = rotatingWriter;
86 this.tcStore = (TerracottaStore)store;
87
88 if (doKeySnapshotOnDedicatedThread) {
89 thread = new SnapShottingThread(this, interval, "KeySnapshotter for cache " + cacheName);
90 thread.start();
91 } else {
92 ScheduledExecutorService scheduledExecutorService = INSTANCES.get(cache.getCacheManager());
93 if (scheduledExecutorService == null) {
94 scheduledExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);
95 final ScheduledExecutorService previous = INSTANCES.putIfAbsent(cache.getCacheManager(), scheduledExecutorService);
96 if (previous != null) {
97 scheduledExecutorService.shutdownNow();
98 scheduledExecutorService = previous;
99 }
100 }
101 scheduledExecutorService.scheduleWithFixedDelay(this, interval, interval, TimeUnit.SECONDS);
102 thread = null;
103 }
104 }
105
106 /***
107 * Shuts down the writer thread and cleans up resources
108 *
109 * @param immediately whether to leave the writer finish or shut down immediately
110 */
111 void dispose(boolean immediately) {
112 if (thread != null) {
113 rotatingWriter.setShutdownOnThreadInterrupted(immediately);
114 thread.interrupt();
115 try {
116 thread.join();
117 } catch (InterruptedException e) {
118 Thread.currentThread().interrupt();
119 }
120 }
121 }
122
123 /***
124 * {@inheritDoc}
125 */
126 public void run() {
127 try {
128 INSTANCES.cleanUp();
129 rotatingWriter.writeAll(tcStore.getLocalKeys());
130 } catch (Throwable e) {
131 LOG.error("Couldn't snapshot local keySet for Cache {}", cacheName, e);
132 }
133 }
134
135 /***
136 * Accessor to all known cacheManagers (which are also bound to a ScheduledExecutorService)
137 *
138 * @return the collection of known CacheManagers
139 */
140 static Collection<CacheManager> getKnownCacheManagers() {
141 return INSTANCES.keySet();
142 }
143
144 /***
145 * Calling this method will result in a snapshot being taken or wait for the one in progress to finish
146 *
147 * @throws IOException On exception being thrown while doing the snapshot
148 */
149 void doSnapshot() throws IOException {
150 rotatingWriter.snapshotNowOrWaitForCurrentToFinish(tcStore.getLocalKeys());
151 }
152
153 /***
154 * Returns the name of the underlying cache for which this snapshots
155 * @return The name of the cache
156 */
157 public String getCacheName() {
158 return cacheName;
159 }
160
161 /***
162 * Thread doing background snapshots of the local key set
163 */
164 private static class SnapShottingThread extends Thread {
165
166 private long lastRun;
167 private final long interval;
168
169 public SnapShottingThread(final Runnable runnable, final long interval, final String threadName) {
170 super(runnable, threadName);
171 this.interval = interval;
172 lastRun = System.currentTimeMillis();
173 this.setDaemon(true);
174 }
175
176 @Override
177 public void run() {
178 while (!isInterrupted()) {
179 final long nextTime = lastRun + TimeUnit.SECONDS.toMillis(interval);
180 final long now = System.currentTimeMillis();
181 if (nextTime <= now) {
182 super.run();
183 lastRun = System.currentTimeMillis();
184 } else {
185 try {
186 sleep(nextTime - now);
187 } catch (InterruptedException e) {
188 interrupt();
189 }
190 }
191 }
192 }
193 }
194 }