View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.terracotta;
18  
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  
22  import java.io.EOFException;
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileOutputStream;
26  import java.io.IOException;
27  import java.io.ObjectInputStream;
28  import java.io.ObjectOutputStream;
29  import java.util.Collections;
30  import java.util.HashSet;
31  import java.util.Set;
32  import java.util.concurrent.locks.Lock;
33  import java.util.concurrent.locks.ReadWriteLock;
34  import java.util.concurrent.locks.ReentrantReadWriteLock;
35  
36  /***
37   * A file will rotate on every write, so to never loose older values in case of a JVM crash
38   *
39   * @author Alex Snaps
40   */
41  class RotatingSnapshotFile {
42  
43      private static final Logger LOG = LoggerFactory.getLogger(RotatingSnapshotFile.class);
44  
45      private static final String SUFFIX_OK = ".keySet";
46      private static final String SUFFIX_PROGRESS = SUFFIX_OK + ".temp";
47      private static final String SUFFIX_MOVE = SUFFIX_OK + ".old";
48  
49      private volatile boolean shutdownOnThreadInterrupted;
50      private final File targetDirectory;
51      private final String baseName;
52  
53      private final Lock readLock;
54      private final Lock writeLock;
55  
56      {
57          ReadWriteLock rwl = new ReentrantReadWriteLock();
58          readLock = rwl.readLock();
59          writeLock = rwl.writeLock();
60      }
61  
62      /***
63       * Constructor
64       *
65       * @param directory the directory to write to
66       * @param baseName  the base name of the files
67       */
68      RotatingSnapshotFile(final String directory, final String baseName) {
69          this.baseName = baseName;
70          targetDirectory = new File(directory);
71          if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
72              throw new IllegalArgumentException("The specified target directory is not a directory: " + directory);
73          }
74          if (!targetDirectory.exists() && !targetDirectory.mkdirs()) {
75              throw new RuntimeException("Couldn't create the target directory: " + directory);
76          }
77      }
78  
79      /***
80       * Writes all values of the iterable to a new file and does the necessary clean up when done
81       *
82       * @param localKeys the iterable of entries to write to disk
83       * @throws IOException If the underlying OutputStream do throw
84       */
85      void writeAll(final Iterable localKeys) throws IOException {
86          writeLock.lock();
87          try {
88              File inProgress = newSnapshotFile();
89  
90              cleanUp(inProgress);
91              if (!inProgress.createNewFile()) {
92                  throw new AssertionError("The file '" + inProgress.getAbsolutePath() + "' exists already!");
93              }
94  
95              final FileOutputStream fileOutputStream = new FileOutputStream(inProgress);
96              final ObjectOutputStream oos = new ObjectOutputStream(fileOutputStream);
97  
98              try {
99                  for (Object localKey : localKeys) {
100                     if (shutdownOnThreadInterrupted && Thread.currentThread().isInterrupted()) {
101                         return;
102                     }
103                     oos.writeObject(localKey);
104                 }
105             } finally {
106                 fileOutputStream.close();
107             }
108 
109             swapForOldWithNewSnapshot(inProgress);
110         } finally {
111             writeLock.unlock();
112         }
113     }
114 
115     /***
116      * Reads all the keys from the file on disk, doing cleanup if required of previously unterminated file written to
117      *
118      * @param <T> the type of the each element
119      * @return the Set of all entries in the latest uncorrupted file on disk
120      * @throws IOException If the underlying FileInputStream does throw
121      */
122     <T> Set<T> readAll() throws IOException {
123 
124         cleanUp();
125 
126         readLock.lock();
127         try {
128 
129             final File currentSnapshot = currentSnapshotFile();
130             if (!currentSnapshot.exists()) {
131                 return Collections.emptySet();
132             }
133 
134             final Set<T> values = new HashSet<T>();
135             FileInputStream fis = new FileInputStream(currentSnapshot);
136             try {
137                 ObjectInputStream ois = new ObjectInputStream(fis);
138                 boolean eof = false;
139                 while (!eof) {
140                     try {
141                         values.add((T)ois.readObject());
142                     } catch (Exception e) {
143                         if (e instanceof EOFException) {
144                             eof = true;
145                         }
146                         // Ignore all other errors, and keep on trying to load keys
147                     }
148                 }
149                 try {
150                     ois.close();
151                 } catch (IOException e) {
152                     LOG.error("Error closing ObjectInputStream", e);
153                     closeAndDeleteAssociatedFileOnFailure(fis, currentSnapshot);
154                 }
155 
156             } catch (IOException e) {
157                 closeAndDeleteAssociatedFileOnFailure(fis, currentSnapshot);
158             }
159             return Collections.unmodifiableSet(values);
160         } finally {
161             readLock.unlock();
162         }
163     }
164 
165     private void cleanUp() {
166         if (requiresCleanUp()) {
167             writeLock.lock();
168             try {
169                 cleanUp(newSnapshotFile());
170             } finally {
171                 writeLock.unlock();
172             }
173         }
174     }
175 
176     private void cleanUp(final File inProgress) {
177         if (requiresCleanUp()) {
178             final File dest = currentSnapshotFile();
179             if (dest.exists() && !inProgress.delete()) {
180                 throw new RuntimeException("Couldn't cleanup old file " + inProgress.getAbsolutePath());
181             } else {
182                 final File tempFile = tempSnapshotFile();
183                 if (tempFile.exists() && !tempFile.delete()) {
184                     throw new RuntimeException("Couldn't cleanup temp file " + tempFile.getAbsolutePath());
185                 }
186                 if (inProgress.exists() && !inProgress.renameTo(dest)) {
187                     throw new RuntimeException("Couldn't rename new snapshot: " + dest.getAbsolutePath());
188                 }
189             }
190         }
191     }
192 
193     private boolean requiresCleanUp() {
194         return newSnapshotFile().exists();
195     }
196 
197     private void swapForOldWithNewSnapshot(final File inProgress) {
198         File currentSnapshot = currentSnapshotFile();
199         final File tempFile = tempSnapshotFile();
200         if (currentSnapshot.exists() && !currentSnapshot.renameTo(tempFile)) {
201             throw new RuntimeException("Couldn't rename previous snapshot: " + currentSnapshot.getAbsolutePath());
202         }
203         if (!inProgress.renameTo(currentSnapshot)) {
204             throw new RuntimeException("Couldn't rename new snapshot: " + currentSnapshot.getAbsolutePath());
205         }
206         if (tempFile.exists() && !tempFile.delete()) {
207             throw new RuntimeException("Couldn't delete temp file " + tempFile.getAbsolutePath());
208         }
209     }
210 
211     /***
212      * Creates a File representing the uncorrupted file on disk
213      *
214      * @return the file to read from
215      */
216     File currentSnapshotFile() {
217         return new File(targetDirectory, baseName + SUFFIX_OK);
218     }
219 
220     /***
221      * Creates a File representing the one to write new entries to
222      *
223      * @return the File to write to
224      */
225     File newSnapshotFile() {
226         return new File(targetDirectory, baseName + SUFFIX_PROGRESS);
227     }
228 
229     /***
230      * Creates a File representing the old uncorrupted file, when the new one has successfully been written to disk
231      *
232      * @return the File representing the previous successful snapshot (temp file to be deleted)
233      */
234     File tempSnapshotFile() {
235         return new File(targetDirectory, baseName + SUFFIX_MOVE);
236     }
237 
238     /***
239      * Whether to shutdown as soon as the writer Thread is interrupted, or to let all keys be written to disk first
240      *
241      * @param shutdownOnThreadInterrupted true, if shutdown needs to happen in the middle of a write
242      */
243     void setShutdownOnThreadInterrupted(final boolean shutdownOnThreadInterrupted) {
244         this.shutdownOnThreadInterrupted = shutdownOnThreadInterrupted;
245     }
246 
247     private void closeAndDeleteAssociatedFileOnFailure(final FileInputStream fis, final File associatedFile) {
248         try {
249             fis.close();
250         } catch (IOException e) {
251             LOG.error("Couldn't close FileInputStream on {}, deleting the file!", associatedFile.getAbsolutePath(), e);
252             if (associatedFile.exists() && !associatedFile.delete()) {
253                 LOG.error("Couldn't delete file {}", associatedFile.getAbsolutePath(), e);
254             }
255         }
256     }
257 
258     /***
259      * Calling this method will result in writing all keys to be written to disk
260      * or wait for the one in progress to finish
261      *
262      * @param localKeys the latest current local set
263      * @throws IOException On exception being thrown while doing the snapshot
264      */
265     void snapshotNowOrWaitForCurrentToFinish(final Set localKeys) throws IOException {
266         if (writeLock.tryLock()) {
267             try {
268                 writeAll(localKeys);
269             } finally {
270                 writeLock.unlock();
271             }
272         } else {
273             writeLock.lock();
274             writeLock.unlock();
275         }
276     }
277 }