1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.terracotta;
18
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.io.EOFException;
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.Set;
32 import java.util.concurrent.locks.Lock;
33 import java.util.concurrent.locks.ReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock;
35
36 /***
37 * A file will rotate on every write, so to never loose older values in case of a JVM crash
38 *
39 * @author Alex Snaps
40 */
41 class RotatingSnapshotFile {
42
43 private static final Logger LOG = LoggerFactory.getLogger(RotatingSnapshotFile.class);
44
45 private static final String SUFFIX_OK = ".keySet";
46 private static final String SUFFIX_PROGRESS = SUFFIX_OK + ".temp";
47 private static final String SUFFIX_MOVE = SUFFIX_OK + ".old";
48
49 private volatile boolean shutdownOnThreadInterrupted;
50 private final File targetDirectory;
51 private final String baseName;
52
53 private final Lock readLock;
54 private final Lock writeLock;
55
56 {
57 ReadWriteLock rwl = new ReentrantReadWriteLock();
58 readLock = rwl.readLock();
59 writeLock = rwl.writeLock();
60 }
61
62 /***
63 * Constructor
64 *
65 * @param directory the directory to write to
66 * @param baseName the base name of the files
67 */
68 RotatingSnapshotFile(final String directory, final String baseName) {
69 this.baseName = baseName;
70 targetDirectory = new File(directory);
71 if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
72 throw new IllegalArgumentException("The specified target directory is not a directory: " + directory);
73 }
74 if (!targetDirectory.exists() && !targetDirectory.mkdirs()) {
75 throw new RuntimeException("Couldn't create the target directory: " + directory);
76 }
77 }
78
79 /***
80 * Writes all values of the iterable to a new file and does the necessary clean up when done
81 *
82 * @param localKeys the iterable of entries to write to disk
83 * @throws IOException If the underlying OutputStream do throw
84 */
85 void writeAll(final Iterable localKeys) throws IOException {
86 writeLock.lock();
87 try {
88 File inProgress = newSnapshotFile();
89
90 cleanUp(inProgress);
91 if (!inProgress.createNewFile()) {
92 throw new AssertionError("The file '" + inProgress.getAbsolutePath() + "' exists already!");
93 }
94
95 final FileOutputStream fileOutputStream = new FileOutputStream(inProgress);
96 final ObjectOutputStream oos = new ObjectOutputStream(fileOutputStream);
97
98 try {
99 for (Object localKey : localKeys) {
100 if (shutdownOnThreadInterrupted && Thread.currentThread().isInterrupted()) {
101 return;
102 }
103 oos.writeObject(localKey);
104 }
105 } finally {
106 fileOutputStream.close();
107 }
108
109 swapForOldWithNewSnapshot(inProgress);
110 } finally {
111 writeLock.unlock();
112 }
113 }
114
115 /***
116 * Reads all the keys from the file on disk, doing cleanup if required of previously unterminated file written to
117 *
118 * @param <T> the type of the each element
119 * @return the Set of all entries in the latest uncorrupted file on disk
120 * @throws IOException If the underlying FileInputStream does throw
121 */
122 <T> Set<T> readAll() throws IOException {
123
124 cleanUp();
125
126 readLock.lock();
127 try {
128
129 final File currentSnapshot = currentSnapshotFile();
130 if (!currentSnapshot.exists()) {
131 return Collections.emptySet();
132 }
133
134 final Set<T> values = new HashSet<T>();
135 FileInputStream fis = new FileInputStream(currentSnapshot);
136 try {
137 ObjectInputStream ois = new ObjectInputStream(fis);
138 boolean eof = false;
139 while (!eof) {
140 try {
141 values.add((T)ois.readObject());
142 } catch (Exception e) {
143 if (e instanceof EOFException) {
144 eof = true;
145 }
146
147 }
148 }
149 try {
150 ois.close();
151 } catch (IOException e) {
152 LOG.error("Error closing ObjectInputStream", e);
153 closeAndDeleteAssociatedFileOnFailure(fis, currentSnapshot);
154 }
155
156 } catch (IOException e) {
157 closeAndDeleteAssociatedFileOnFailure(fis, currentSnapshot);
158 }
159 return Collections.unmodifiableSet(values);
160 } finally {
161 readLock.unlock();
162 }
163 }
164
165 private void cleanUp() {
166 if (requiresCleanUp()) {
167 writeLock.lock();
168 try {
169 cleanUp(newSnapshotFile());
170 } finally {
171 writeLock.unlock();
172 }
173 }
174 }
175
176 private void cleanUp(final File inProgress) {
177 if (requiresCleanUp()) {
178 final File dest = currentSnapshotFile();
179 if (dest.exists() && !inProgress.delete()) {
180 throw new RuntimeException("Couldn't cleanup old file " + inProgress.getAbsolutePath());
181 } else {
182 final File tempFile = tempSnapshotFile();
183 if (tempFile.exists() && !tempFile.delete()) {
184 throw new RuntimeException("Couldn't cleanup temp file " + tempFile.getAbsolutePath());
185 }
186 if (inProgress.exists() && !inProgress.renameTo(dest)) {
187 throw new RuntimeException("Couldn't rename new snapshot: " + dest.getAbsolutePath());
188 }
189 }
190 }
191 }
192
193 private boolean requiresCleanUp() {
194 return newSnapshotFile().exists();
195 }
196
197 private void swapForOldWithNewSnapshot(final File inProgress) {
198 File currentSnapshot = currentSnapshotFile();
199 final File tempFile = tempSnapshotFile();
200 if (currentSnapshot.exists() && !currentSnapshot.renameTo(tempFile)) {
201 throw new RuntimeException("Couldn't rename previous snapshot: " + currentSnapshot.getAbsolutePath());
202 }
203 if (!inProgress.renameTo(currentSnapshot)) {
204 throw new RuntimeException("Couldn't rename new snapshot: " + currentSnapshot.getAbsolutePath());
205 }
206 if (tempFile.exists() && !tempFile.delete()) {
207 throw new RuntimeException("Couldn't delete temp file " + tempFile.getAbsolutePath());
208 }
209 }
210
211 /***
212 * Creates a File representing the uncorrupted file on disk
213 *
214 * @return the file to read from
215 */
216 File currentSnapshotFile() {
217 return new File(targetDirectory, baseName + SUFFIX_OK);
218 }
219
220 /***
221 * Creates a File representing the one to write new entries to
222 *
223 * @return the File to write to
224 */
225 File newSnapshotFile() {
226 return new File(targetDirectory, baseName + SUFFIX_PROGRESS);
227 }
228
229 /***
230 * Creates a File representing the old uncorrupted file, when the new one has successfully been written to disk
231 *
232 * @return the File representing the previous successful snapshot (temp file to be deleted)
233 */
234 File tempSnapshotFile() {
235 return new File(targetDirectory, baseName + SUFFIX_MOVE);
236 }
237
238 /***
239 * Whether to shutdown as soon as the writer Thread is interrupted, or to let all keys be written to disk first
240 *
241 * @param shutdownOnThreadInterrupted true, if shutdown needs to happen in the middle of a write
242 */
243 void setShutdownOnThreadInterrupted(final boolean shutdownOnThreadInterrupted) {
244 this.shutdownOnThreadInterrupted = shutdownOnThreadInterrupted;
245 }
246
247 private void closeAndDeleteAssociatedFileOnFailure(final FileInputStream fis, final File associatedFile) {
248 try {
249 fis.close();
250 } catch (IOException e) {
251 LOG.error("Couldn't close FileInputStream on {}, deleting the file!", associatedFile.getAbsolutePath(), e);
252 if (associatedFile.exists() && !associatedFile.delete()) {
253 LOG.error("Couldn't delete file {}", associatedFile.getAbsolutePath(), e);
254 }
255 }
256 }
257
258 /***
259 * Calling this method will result in writing all keys to be written to disk
260 * or wait for the one in progress to finish
261 *
262 * @param localKeys the latest current local set
263 * @throws IOException On exception being thrown while doing the snapshot
264 */
265 void snapshotNowOrWaitForCurrentToFinish(final Set localKeys) throws IOException {
266 if (writeLock.tryLock()) {
267 try {
268 writeAll(localKeys);
269 } finally {
270 writeLock.unlock();
271 }
272 } else {
273 writeLock.lock();
274 writeLock.unlock();
275 }
276 }
277 }