View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.store.disk;
18  
19  import net.sf.ehcache.CacheException;
20  import net.sf.ehcache.Ehcache;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.concurrent.ConcurrencyUtil;
23  import net.sf.ehcache.config.CacheConfiguration;
24  import net.sf.ehcache.config.PinningConfiguration;
25  import net.sf.ehcache.event.RegisteredEventListeners;
26  import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
27  import net.sf.ehcache.store.disk.ods.FileAllocationTree;
28  import net.sf.ehcache.store.disk.ods.Region;
29  import net.sf.ehcache.util.MemoryEfficientByteArrayOutputStream;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import java.io.ByteArrayInputStream;
34  import java.io.EOFException;
35  import java.io.File;
36  import java.io.FileInputStream;
37  import java.io.FileNotFoundException;
38  import java.io.FileOutputStream;
39  import java.io.IOException;
40  import java.io.ObjectInputStream;
41  import java.io.ObjectOutputStream;
42  import java.io.ObjectStreamClass;
43  import java.io.RandomAccessFile;
44  import java.io.Serializable;
45  import java.util.ConcurrentModificationException;
46  import java.util.List;
47  import java.util.concurrent.BlockingQueue;
48  import java.util.concurrent.Callable;
49  import java.util.concurrent.Future;
50  import java.util.concurrent.ScheduledThreadPoolExecutor;
51  import java.util.concurrent.ThreadFactory;
52  import java.util.concurrent.TimeUnit;
53  import java.util.concurrent.atomic.AtomicInteger;
54  import java.util.concurrent.locks.Lock;
55  
56  import static java.util.concurrent.TimeUnit.MILLISECONDS;
57  
58  /***
59   * A mock-up of a on-disk element proxy factory.
60   *
61   * @author Chris Dennis
62   * @author Ludovic Orban
63   */
64  @IgnoreSizeOf
65  public class DiskStorageFactory {
66  
67      /***
68       * Path stub used to create unique ehcache directories.
69       */
70      protected static final String AUTO_DISK_PATH_DIRECTORY_PREFIX = "ehcache_auto_created";
71      private static final int SERIALIZATION_CONCURRENCY_DELAY = 250;
72      private static final int SHUTDOWN_GRACE_PERIOD = 60;
73      private static final int MEGABYTE = 1024 * 1024;
74      private static final int MAX_EVICT = 5;
75      private static final int SAMPLE_SIZE = 30;
76  
77      private static final Logger LOG = LoggerFactory.getLogger(DiskStorageFactory.class.getName());
78  
79      /***
80       * The store bound to this factory.
81       */
82      protected volatile DiskStore                  store;
83  
84      private final BlockingQueue<Runnable> diskQueue;
85      /***
86       * Executor service used to write elements to disk
87       */
88      private final ScheduledThreadPoolExecutor diskWriter;
89  
90      private final long queueCapacity;
91  
92      private final File             file;
93      private final RandomAccessFile[] dataAccess;
94  
95      private final FileAllocationTree allocator;
96  
97      private final RegisteredEventListeners eventService;
98  
99      private volatile int elementSize;
100 
101     private final ElementSubstituteFilter onDiskFilter = new OnDiskFilter();
102 
103     private final AtomicInteger onDisk = new AtomicInteger();
104 
105     private final File indexFile;
106 
107     private final IndexWriteTask flushTask;
108 
109     private volatile int diskCapacity;
110 
111     private volatile boolean pinningEnabled;
112 
113     private final RegisteredEventListeners cacheEventNotificationService;
114     private final boolean diskPersistent;
115 
116     /***
117      * Constructs an disk persistent factory for the given cache and disk path.
118      *
119      * @param cache cache that fronts this factory
120      * @param diskPath path to store data in
121      * @param cacheEventNotificationService the notification service
122      */
123     public DiskStorageFactory(Ehcache cache, String diskPath, RegisteredEventListeners cacheEventNotificationService) {
124         this.file = getDataFile(diskPath, cache);
125         this.indexFile = new File(getDataFile().getParentFile(), getIndexFileName(cache));
126         this.pinningEnabled = determineCachePinned(cache.getCacheConfiguration());
127 
128         diskPersistent = cache.getCacheConfiguration().isDiskPersistent();
129         if (!diskPersistent) {
130             deleteFile(file);
131             deleteFile(indexFile);
132         }
133 
134         try {
135             dataAccess = allocateRandomAccessFiles(file, cache.getCacheConfiguration().getDiskAccessStripes());
136         } catch (FileNotFoundException e) {
137             throw new CacheException(e);
138         }
139         this.allocator = new FileAllocationTree(Long.MAX_VALUE, dataAccess[0]);
140 
141         diskWriter = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
142             public Thread newThread(Runnable r) {
143                 Thread t = new Thread(r, file.getName());
144                 t.setDaemon(false);
145                 return t;
146             }
147         });
148         this.diskQueue = diskWriter.getQueue();
149         this.eventService = cache.getCacheEventNotificationService();
150         this.queueCapacity = cache.getCacheConfiguration().getDiskSpoolBufferSizeMB() * MEGABYTE;
151         this.diskCapacity = cache.getCacheConfiguration().getMaxElementsOnDisk();
152 
153         diskWriter.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
154         diskWriter.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
155         long expiryInterval = cache.getCacheConfiguration().getDiskExpiryThreadIntervalSeconds();
156         diskWriter.scheduleWithFixedDelay(new DiskExpiryTask(), expiryInterval, expiryInterval, TimeUnit.SECONDS);
157 
158         this.cacheEventNotificationService = cacheEventNotificationService;
159 
160         flushTask = new IndexWriteTask(indexFile, cache.getCacheConfiguration().isClearOnFlush());
161 
162         if (!getDataFile().exists() || (getDataFile().length() == 0)) {
163             LOG.debug("Matching data file missing (or empty) for index file. Deleting index file " + indexFile);
164             deleteFile(indexFile);
165         } else if (getDataFile().exists() && indexFile.exists()) {
166             if (getDataFile().lastModified() > (indexFile.lastModified() + TimeUnit.SECONDS.toMillis(1))) {
167                 LOG.warn("The index for data file {} is out of date, probably due to an unclean shutdown. "
168                         + "Deleting index file {}", getDataFile(), indexFile);
169                 deleteFile(indexFile);
170             }
171         }
172     }
173 
174     private boolean determineCachePinned(CacheConfiguration cacheConfiguration) {
175         PinningConfiguration pinningConfiguration = cacheConfiguration.getPinningConfiguration();
176         if (pinningConfiguration == null) {
177             return false;
178         }
179 
180         switch (pinningConfiguration.getStore()) {
181             case LOCALHEAP:
182             case LOCALMEMORY:
183                 return false;
184 
185             case INCACHE:
186                 return true;
187 
188             default:
189                 throw new IllegalArgumentException();
190         }
191     }
192 
193     private static RandomAccessFile[] allocateRandomAccessFiles(File f, int stripes) throws FileNotFoundException {
194         int roundedStripes = stripes;
195         while ((roundedStripes & (roundedStripes - 1)) != 0) {
196             ++roundedStripes;
197         }
198 
199         RandomAccessFile [] result = new RandomAccessFile[roundedStripes];
200         for (int i = 0; i < result.length; ++i) {
201             result[i] = new RandomAccessFile(f, "rw");
202         }
203 
204         return result;
205     }
206 
207     private RandomAccessFile getDataAccess(Object key) {
208         return this.dataAccess[ConcurrencyUtil.selectLock(key, dataAccess.length)];
209     }
210 
211     /***
212      * Return this size in bytes of this factory
213      *
214      * @return this size in bytes of this factory
215      */
216     public long getOnDiskSizeInBytes() {
217         synchronized (dataAccess[0]) {
218             try {
219                 return dataAccess[0].length();
220             } catch (IOException e) {
221                 LOG.warn("Exception trying to determine store size", e);
222                 return 0;
223             }
224         }
225     }
226 
227     /***
228      * Bind a store instance to this factory.
229      *
230      * @param store store to bind
231      */
232     public void bind(DiskStore store) {
233         this.store = store;
234         loadIndex();
235     }
236 
237     /***
238      * Free any manually managed resources used by this {@link DiskSubstitute}.
239      *
240      * @param lock the lock protecting the DiskSubstitute
241      * @param substitute DiskSubstitute being freed.
242      */
243     public void free(Lock lock, DiskSubstitute substitute) {
244         free(lock, substitute, false);
245     }
246 
247     /***
248      * Free any manually managed resources used by this {@link DiskSubstitute}.
249      *
250      * @param lock the lock protecting the DiskSubstitute
251      * @param substitute DiskSubstitute being freed.
252      * @param faultFailure true if this DiskSubstitute should be freed because of a disk failure
253      */
254     public void free(Lock lock, DiskSubstitute substitute, boolean faultFailure) {
255         if (substitute instanceof DiskStorageFactory.DiskMarker) {
256             if (!faultFailure) {
257                 onDisk.decrementAndGet();
258             }
259             //free done asynchronously under the relevant segment lock...
260             DiskFreeTask free = new DiskFreeTask(lock, (DiskMarker) substitute);
261             if (lock.tryLock()) {
262                 try {
263                     free.call();
264                 } finally {
265                     lock.unlock();
266                 }
267             } else {
268                 schedule(free);
269             }
270         }
271     }
272 
273     /***
274      * Mark this on-disk marker as used (hooks into the file space allocation structure).
275      *
276      * @param marker on-disk marker to mark as used
277      */
278     protected void markUsed(DiskMarker marker) {
279         allocator.mark(new Region(marker.getPosition(), marker.getPosition() + marker.getSize() - 1));
280     }
281 
282     /***
283      * Shrink this store's data file down to a minimal size for its contents.
284      */
285     protected void shrinkDataFile() {
286         synchronized (dataAccess[0]) {
287             try {
288                 dataAccess[0].setLength(allocator.getFileSize());
289             } catch (IOException e) {
290                 LOG.error("Exception trying to shrink data file to size", e);
291             }
292         }
293     }
294     /***
295      * Shuts down this disk factory.
296      * <p>
297      * This shuts down the executor and then waits for its termination, before closing the data file.
298      * @throws java.io.IOException if an IO error occurred
299      */
300     protected void shutdown() throws IOException {
301         diskWriter.shutdown();
302         for (int i = 0; i < SHUTDOWN_GRACE_PERIOD; i++) {
303             try {
304                 if (diskWriter.awaitTermination(1, TimeUnit.SECONDS)) {
305                     break;
306                 } else {
307                     LOG.info("Waited " + (i + 1) + " seconds for shutdown of [" + file.getName() + "]");
308                 }
309             } catch (InterruptedException e) {
310                 LOG.warn("Received exception while waiting for shutdown", e);
311             }
312         }
313 
314         for (final RandomAccessFile raf : dataAccess) {
315             synchronized (raf) {
316                 raf.close();
317             }
318         }
319 
320         if (!diskPersistent) {
321             deleteFile(file);
322             deleteFile(indexFile);
323         }
324     }
325 
326     /***
327      * Deletes the data file for this factory.
328      */
329     protected void delete() {
330         deleteFile(file);
331         allocator.clear();
332         if (file.getAbsolutePath().contains(AUTO_DISK_PATH_DIRECTORY_PREFIX)) {
333             //try to delete the auto_createtimestamp directory. Will work when the last Disk Store deletes
334             //the last files and the directory becomes empty.
335             File dataDirectory = file.getParentFile();
336             if (dataDirectory != null && dataDirectory.exists()) {
337                 if (dataDirectory.delete()) {
338                     LOG.debug("Deleted directory " + dataDirectory.getName());
339                 }
340             }
341 
342         }
343     }
344 
345     /***
346      * Schedule to given task on the disk writer executor service.
347      *
348      * @param <U> return type of the callable
349      * @param call callable to call
350      * @return Future representing the return of this call
351      */
352     protected <U> Future<U> schedule(Callable<U> call) {
353         return diskWriter.submit(call);
354     }
355 
356     /***
357      * Read the data at the given marker, and return the associated deserialized Element.
358      *
359      * @param marker marker to read
360      * @return deserialized Element
361      * @throws java.io.IOException on read error
362      * @throws ClassNotFoundException on deserialization error
363      */
364     protected Element read(DiskMarker marker) throws IOException, ClassNotFoundException {
365         final byte[] buffer = new byte[marker.getSize()];
366         final RandomAccessFile data = getDataAccess(marker.getKey());
367         synchronized (data) {
368             // Load the element
369             data.seek(marker.getPosition());
370             data.readFully(buffer);
371         }
372 
373         ObjectInputStream objstr = new ObjectInputStream(new ByteArrayInputStream(buffer)) {
374             /***
375              * Overridden because of:
376              * Bug 1324221 ehcache DiskStore has issues when used in Tomcat
377              */
378             @Override
379             protected Class resolveClass(ObjectStreamClass clazz) throws ClassNotFoundException, IOException {
380                 try {
381                     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
382                     return Class.forName(clazz.getName(), false, classLoader);
383                 } catch (ClassNotFoundException e) {
384                     // Use the default as a fallback because of
385                     // bug 1517565 - DiskStore loadElementFromDiskElement
386                     return super.resolveClass(clazz);
387                 }
388             }
389         };
390 
391         try {
392             return (Element) objstr.readObject();
393         } finally {
394             objstr.close();
395         }
396     }
397 
398     /***
399      * Write the given element to disk, and return the associated marker.
400      *
401      * @param element to write
402      * @return marker representing the element
403      * @throws java.io.IOException on write error
404      */
405     protected DiskMarker write(Element element) throws IOException {
406         MemoryEfficientByteArrayOutputStream buffer = serializeElement(element);
407         int bufferLength = buffer.size();
408         elementSize = bufferLength;
409         DiskMarker marker = alloc(element, bufferLength);
410         // Write the record
411         final RandomAccessFile data = getDataAccess(element.getObjectKey());
412         synchronized (data) {
413             data.seek(marker.getPosition());
414             data.write(buffer.toByteArray(), 0, bufferLength);
415         }
416         return marker;
417     }
418 
419     private MemoryEfficientByteArrayOutputStream serializeElement(Element element) throws IOException {
420         // try two times to Serialize. A ConcurrentModificationException can occur because Java's serialization
421         // mechanism is not threadsafe and POJOs are seldom implemented in a threadsafe way.
422         // e.g. we are serializing an ArrayList field while another thread somewhere in the application is appending to it.
423         // The best we can do is try again and then give up.
424         ConcurrentModificationException exception = null;
425         for (int retryCount = 0; retryCount < 2; retryCount++) {
426             try {
427                 return MemoryEfficientByteArrayOutputStream.serialize(element);
428             } catch (ConcurrentModificationException e) {
429                 exception = e;
430                 try {
431                     // wait for the other thread(s) to finish
432                     MILLISECONDS.sleep(SERIALIZATION_CONCURRENCY_DELAY);
433                 } catch (InterruptedException e1) {
434                     //no-op
435                 }
436             }
437         }
438         throw exception;
439     }
440 
441     private DiskMarker alloc(Element element, int size) throws IOException {
442         //check for a matching chunk
443         Region r = allocator.alloc(size);
444         return createMarker(r.start(), size, element);
445     }
446 
447     /***
448      * Free the given marker to be used by a subsequent write.
449      *
450      * @param marker marker to be free'd
451      */
452     protected void free(DiskMarker marker) {
453         allocator.free(new Region(marker.getPosition(), marker.getPosition() + marker.getSize() - 1));
454     }
455 
456     /***
457      * Return {@code true} if the disk write queue is full.
458      *
459      * @return {@code true} if the disk write queue is full.
460      */
461     public boolean bufferFull() {
462         return (diskQueue.size() * elementSize) > queueCapacity;
463     }
464 
465     /***
466      * Return a reference to the data file backing this factory.
467      *
468      * @return a reference to the data file backing this factory.
469      */
470     public File getDataFile() {
471         return file;
472     }
473 
474     /***
475      * DiskWriteTasks are used to serialize elements
476      * to disk and fault in the resultant DiskMarker
477      * instance.
478      */
479     abstract class DiskWriteTask implements Callable<DiskMarker> {
480 
481         private final Placeholder placeholder;
482 
483         /***
484          * Create a disk-write task for the given placeholder.
485          *
486          * @param p a disk-write task for the given placeholder.
487          */
488         DiskWriteTask(Placeholder p) {
489             this.placeholder = p;
490         }
491 
492         /***
493          * Return the placeholder that this task will write.
494          *
495          * @return the placeholder that this task will write.
496          */
497         Placeholder getPlaceholder() {
498             return placeholder;
499         }
500 
501         /***
502          * {@inheritDoc}
503          */
504         public DiskMarker call() {
505             try {
506                 DiskMarker marker = write(placeholder.getElement());
507                 if (marker != null && store.fault(placeholder.getKey(), placeholder, marker)) {
508                     return marker;
509                 } else {
510                     return null;
511                 }
512             } catch (Throwable e) {
513                 LOG.error("Disk Write of " + placeholder.getKey() + " failed (it will be evicted instead): ", e);
514                 store.evict(placeholder.getKey(), placeholder);
515                 return null;
516             }
517         }
518     }
519 
520     /***
521      * Disk free tasks are used to asynchronously free DiskMarker instances under the correct
522      * exclusive write lock.  This ensure markers are not free'd until no more readers can be
523      * holding references to them.
524      */
525     private final class DiskFreeTask implements Callable<Void> {
526         private final Lock lock;
527         private final DiskMarker marker;
528 
529         private DiskFreeTask(Lock lock, DiskMarker marker) {
530             this.lock = lock;
531             this.marker = marker;
532         }
533 
534         /***
535          * {@inheritDoc}
536          */
537         public Void call() {
538             lock.lock();
539             try {
540                 DiskStorageFactory.this.free(marker);
541             } finally {
542                 lock.unlock();
543             }
544             return null;
545         }
546     }
547 
548     /***
549      * Abstract superclass for all disk substitutes.
550      */
551     public abstract static class DiskSubstitute {
552 
553         @IgnoreSizeOf
554         private transient volatile DiskStorageFactory factory;
555 
556         /***
557          * Create a disk substitute bound to no factory.  This constructor is used during
558          * de-serialization.
559          */
560         public DiskSubstitute() {
561             this.factory = null;
562         }
563 
564         /***
565          * Create a disk substitute bound to the given factory.
566          *
567          * @param factory the factory to bind to.
568          */
569         DiskSubstitute(DiskStorageFactory factory) {
570             this.factory = factory;
571         }
572 
573         /***
574          * Return the key to which this marker is (or should be) mapped.
575          *
576          * @return the key to which this marker is (or should be) mapped.
577          */
578         abstract Object getKey();
579 
580         /***
581          * Return the total number of hits on this marker
582          *
583          * @return the total number of hits on this marker
584          */
585         abstract long getHitCount();
586 
587         /***
588          * Return the time at which this marker expires.
589          *
590          * @return the time at which this marker expires.
591          */
592         abstract long getExpirationTime();
593 
594         /***
595          * Mark the disk substitute as installed
596          */
597         abstract void installed();
598 
599         /***
600          * Returns the {@link DiskStorageFactory} instance that generated this <code>DiskSubstitute</code>
601          *
602          * @return an <code>ElementProxyFactory</code>
603          */
604         public final DiskStorageFactory getFactory() {
605             return factory;
606         }
607 
608         /***
609          * Bind this marker to a given factory.
610          * <p>
611          * Used during deserialization of markers to associate them with the deserializing factory.
612          * @param factory the factory to bind to
613          */
614         void bindFactory(DiskStorageFactory factory) {
615             this.factory = factory;
616         }
617     }
618 
619     /***
620      * Placeholder instances are put in place to prevent
621      * duplicate write requests while Elements are being
622      * written to disk.
623      */
624     final class Placeholder extends DiskSubstitute {
625         @IgnoreSizeOf
626         private final Object key;
627         private final Element element;
628 
629         /***
630          * Create a Placeholder wrapping the given element and key.
631          *
632          * @param element the element to wrap
633          */
634         Placeholder(Element element) {
635             super(DiskStorageFactory.this);
636             this.key = element.getObjectKey();
637             this.element = element;
638         }
639 
640 
641         /***
642          * {@inheritDoc}
643          */
644         @Override
645         public void installed() {
646             DiskStorageFactory.this.schedule(new PersistentDiskWriteTask(this));
647         }
648 
649         /***
650          * {@inheritDoc}
651          */
652         @Override
653         Object getKey() {
654             return key;
655         }
656 
657         /***
658          * {@inheritDoc}
659          */
660         @Override
661         long getHitCount() {
662             return getElement().getHitCount();
663         }
664 
665         @Override
666         long getExpirationTime() {
667             return getElement().getExpirationTime();
668         }
669 
670         /***
671          * Return the element that this Placeholder is wrapping.
672          * @return the element that this Placeholder is wrapping.
673          */
674         Element getElement() {
675             return element;
676         }
677     }
678 
679     /***
680      * DiskMarker instances point to the location of their
681      * associated serialized Element instance.
682      */
683     public static class DiskMarker extends DiskSubstitute implements Serializable {
684 
685         @IgnoreSizeOf
686         private final Object key;
687 
688         private final long position;
689         private final int size;
690 
691         private volatile long hitCount;
692 
693         private volatile long expiry;
694 
695         /***
696          * Create a new marker tied to the given factory instance.
697          *
698          * @param factory factory responsible for this marker
699          * @param position position on disk where the element will be stored
700          * @param size size of the serialized element
701          * @param element element being stored
702          */
703         DiskMarker(DiskStorageFactory factory, long position, int size, Element element) {
704             super(factory);
705             this.position = position;
706             this.size = size;
707 
708             this.key = element.getObjectKey();
709             this.hitCount = element.getHitCount();
710             this.expiry = element.getExpirationTime();
711         }
712 
713         /***
714          * Create a new marker tied to the given factory instance.
715          *
716          * @param factory factory responsible for this marker
717          * @param position position on disk where the element will be stored
718          * @param size size of the serialized element
719          * @param key key to which this element is mapped
720          * @param hits hit count for this element
721          */
722         DiskMarker(DiskStorageFactory factory, long position, int size, Object key, long hits) {
723             super(factory);
724             this.position = position;
725             this.size = size;
726 
727             this.key = key;
728             this.hitCount = hits;
729         }
730 
731         /***
732          * Key to which this Element is mapped.
733          *
734          * @return key for this Element
735          */
736         @Override
737         Object getKey() {
738             return key;
739         }
740 
741         /***
742          * Number of hits on this Element.
743          */
744         @Override
745         long getHitCount() {
746             return hitCount;
747         }
748 
749         /***
750          * Disk offset at which this element is stored.
751          *
752          * @return disk offset
753          */
754         private long getPosition() {
755             return position;
756         }
757 
758         /***
759          * Returns the size of the currently occupying element.
760          *
761          * @return size of the stored element
762          */
763         public int getSize() {
764             return size;
765         }
766 
767         /***
768          * {@inheritDoc}
769          * <p>
770          * A No-Op
771          */
772         @Override
773         public void installed() {
774             //no-op
775         }
776 
777         /***
778          * {@inheritDoc}
779          */
780         @Override
781         long getExpirationTime() {
782             return expiry;
783         }
784 
785         /***
786          * Increment statistic associated with a hit on this cache.
787          *
788          * @param e element deserialized from disk
789          */
790         void hit(Element e) {
791             hitCount++;
792             expiry = e.getExpirationTime();
793         }
794     }
795 
796 
797     /***
798      * Remove elements created by this factory if they have expired.
799      */
800     public void expireElements() {
801         new DiskExpiryTask().run();
802     }
803 
804     /***
805      * Causes removal of all expired elements (and fires the relevant events).
806      */
807     private final class DiskExpiryTask implements Runnable {
808 
809         /***
810          * {@inheritDoc}
811          */
812         public void run() {
813             long now = System.currentTimeMillis();
814             for (Object key : store.keySet()) {
815                 Object value = store.unretrievedGet(key);
816                 if (created(value) && value instanceof DiskStorageFactory.DiskMarker) {
817                     checkExpiry((DiskMarker) value, now);
818                 }
819             }
820         }
821 
822         private void checkExpiry(DiskMarker marker, long now) {
823             if (marker.getExpirationTime() < now) {
824                 if (eventService.hasCacheEventListeners()) {
825                     try {
826                         Element element = read(marker);
827                         if (store.evict(marker.getKey(), marker)) {
828                             eventService.notifyElementExpiry(element, false);
829                         }
830                     } catch (Exception e) {
831                         // ignore
832                     }
833                 } else {
834                     store.evict(marker.getKey(), marker);
835                 }
836             }
837         }
838     }
839 
840     /***
841      * Attempt to delete the corresponding file and log an error on failure.
842      * @param f the file to delete
843      */
844     protected static void deleteFile(File f) {
845         if (!f.delete()) {
846             LOG.debug("Failed to delete file {}", f.getName());
847         }
848     }
849 
850 
851     private static File getDataFile(String diskPath, Ehcache cache) {
852         if (diskPath == null) {
853             throw new CacheException(cache.getName() + " Cache: Could not create disk store. "
854                     + "This CacheManager configuration does not allow creation of DiskStores. "
855                     + "If you wish to create DiskStores, please configure a diskStore path.");
856         }
857 
858         final File diskDir = new File(diskPath);
859         // Make sure the cache directory exists
860         if (diskDir.exists() && !diskDir.isDirectory()) {
861             throw new CacheException("Store directory \"" + diskDir.getAbsolutePath() + "\" exists and is not a directory.");
862         }
863         if (!diskDir.exists() && !diskDir.mkdirs()) {
864             throw new CacheException("Could not create cache directory \"" + diskDir.getAbsolutePath() + "\".");
865         }
866 
867         File data = new File(diskDir, getDataFileName(cache));
868 
869         //if diskpath contains auto generated string
870         if (diskPath.contains(AUTO_DISK_PATH_DIRECTORY_PREFIX)) {
871             LOG.warn("Data in persistent disk stores is ignored for stores from automatically created directories"
872                     + " (they start with " + AUTO_DISK_PATH_DIRECTORY_PREFIX + ").\n"
873                     + "Remove diskPersistent or resolve the conflicting disk paths in cache configuration.\n"
874                     + "Deleting data file " + data.getAbsolutePath());
875             deleteFile(data);
876         }
877 
878         return data;
879     }
880 
881     private static String getDataFileName(Ehcache cache) {
882         String safeName = cache.getName().replace('/', '_');
883         return safeName + ".data";
884     }
885 
886     private static String getIndexFileName(Ehcache cache) {
887         String safeName = cache.getName().replace('/', '_');
888         return safeName + ".index";
889     }
890 
891     /***
892      * Create a disk substitute for an element
893      *
894      * @param element the element to create a disk substitute for
895      * @return The substitute element
896      * @throws IllegalArgumentException if element cannot be substituted
897      */
898     public DiskSubstitute create(Element element) throws IllegalArgumentException {
899         return new Placeholder(element);
900     }
901 
902     /***
903      * Decodes the supplied {@link DiskSubstitute}.
904      *
905      * @param object ElementSubstitute to decode
906      * @return the decoded element
907      */
908     public Element retrieve(DiskSubstitute object) {
909         if (object instanceof DiskMarker) {
910             try {
911                 DiskMarker marker = (DiskMarker) object;
912                 return read(marker);
913             } catch (IOException e) {
914                 throw new CacheException(e);
915             } catch (ClassNotFoundException e) {
916                 throw new CacheException(e);
917             }
918         } else if (object instanceof Placeholder) {
919             return ((Placeholder) object).getElement();
920         } else {
921             return null;
922         }
923     }
924 
925     /***
926      * Decodes the supplied {@link DiskSubstitute}, updating statistics.
927      *
928      * @param object ElementSubstitute to decode
929      * @return the decoded element
930      */
931     public Element retrieve(DiskSubstitute object, Segment segment) {
932         if (object instanceof DiskMarker) {
933             try {
934                 DiskMarker marker = (DiskMarker) object;
935                 segment.diskHit();
936                 Element e = read(marker);
937                 marker.hit(e);
938                 return e;
939             } catch (IOException e) {
940                 throw new CacheException(e);
941             } catch (ClassNotFoundException e) {
942                 throw new CacheException(e);
943             }
944         } else if (object instanceof DiskStorageFactory.Placeholder) {
945             segment.diskHit();
946             return ((Placeholder) object).getElement();
947         } else {
948             segment.miss();
949             return null;
950         }
951     }
952 
953     /***
954      * Returns <code>true</code> if this factory created the given object.
955      *
956      * @param object object to check
957      * @return <code>true</code> if object created by this factory
958      */
959     public boolean created(Object object) {
960         if (object instanceof DiskSubstitute) {
961             return ((DiskSubstitute) object).getFactory() == this;
962         } else {
963             return false;
964         }
965     }
966 
967     /***
968      * Unbinds a store instance from this factory
969      */
970     public void unbind() {
971         try {
972             flushTask.call();
973         } catch (Throwable t) {
974             LOG.error("Could not flush disk cache. Initial cause was " + t.getMessage(), t);
975         }
976 
977         try {
978             shutdown();
979             if (getDataFile().getAbsolutePath().contains(AUTO_DISK_PATH_DIRECTORY_PREFIX)) {
980                 deleteFile(indexFile);
981                 delete();
982             }
983         } catch (IOException e) {
984             LOG.error("Could not shut down disk cache. Initial cause was " + e.getMessage(), e);
985         }
986     }
987 
988     /***
989      * Schedule a flush (index write) for this factory.
990      * @return a Future
991      */
992     public Future<Void> flush() {
993         return schedule(flushTask);
994     }
995 
996     private DiskMarker createMarker(long position, int size, Element element) {
997         return new DiskMarker(this, position, size, element);
998     }
999 
1000     private boolean isPinningEnabled() {
1001         return pinningEnabled;
1002     }
1003 
1004     /***
1005      * Evict some elements, if possible
1006      *
1007      * @param count the number of elements to evict
1008      * @return the number of elements actually evicted
1009      */
1010     int evict(int count) {
1011         // see void onDiskEvict(int size, Object keyHint)
1012 
1013         if (isPinningEnabled()) {
1014             return 0;
1015         }
1016 
1017         int evicted = 0;
1018         for (int i = 0; i < count; i++) {
1019             DiskSubstitute target = this.getDiskEvictionTarget(null, count);
1020             if (target != null) {
1021                 Element evictedElement = store.evictElement(target.getKey(), null);
1022                 if (evictedElement != null && cacheEventNotificationService != null) {
1023                     evicted++;
1024                     cacheEventNotificationService.notifyElementEvicted(evictedElement, false);
1025                 }
1026             }
1027         }
1028         return evicted;
1029     }
1030 
1031     /***
1032      * Filters for on-disk elements created by this factory
1033      */
1034     private class OnDiskFilter implements ElementSubstituteFilter {
1035 
1036         /***
1037          * {@inheritDoc}
1038          */
1039         public boolean allows(Object object) {
1040             if (!created(object)) {
1041                 return false;
1042             }
1043 
1044             return object instanceof DiskMarker;
1045         }
1046     }
1047 
1048     /***
1049      * Return the number of on-disk elements
1050      *
1051      * @return the number of on-disk elements
1052      */
1053     public int getOnDiskSize() {
1054         return onDisk.get();
1055     }
1056 
1057     /***
1058      * Set the maximum on-disk capacity for this factory.
1059      *
1060      * @param capacity the maximum on-disk capacity for this factory.
1061      */
1062     public void setOnDiskCapacity(int capacity) {
1063         diskCapacity = capacity;
1064     }
1065 
1066     private void onDiskEvict(int size, Object keyHint) {
1067         if (diskCapacity > 0 && !isPinningEnabled()) {
1068             int overflow = size - diskCapacity;
1069             for (int i = 0; i < Math.min(MAX_EVICT, overflow); i++) {
1070                 DiskSubstitute target = getDiskEvictionTarget(keyHint, size);
1071                 if (target != null) {
1072                     if (store.evict(target.getKey(), target) && (onDisk.get() <= diskCapacity)) {
1073                         break;
1074                     }
1075                 }
1076             }
1077         }
1078     }
1079 
1080     private DiskSubstitute getDiskEvictionTarget(Object keyHint, int size) {
1081         List<DiskSubstitute> sample = store.getRandomSample(onDiskFilter, Math.min(SAMPLE_SIZE, size), keyHint);
1082         DiskSubstitute target = null;
1083         DiskSubstitute hintTarget = null;
1084         for (DiskSubstitute substitute : sample) {
1085             if ((target == null) || (substitute.getHitCount() < target.getHitCount())) {
1086                 if (substitute.getKey().equals(keyHint)) {
1087                     hintTarget = substitute;
1088                 } else {
1089                     target = substitute;
1090                 }
1091             }
1092         }
1093         return target != null ? target : hintTarget;
1094     }
1095 
1096     /***
1097      * Disk write task implementation for disk persistent stores.
1098      */
1099     private final class PersistentDiskWriteTask extends DiskWriteTask {
1100 
1101         /***
1102          * Create a disk persistent disk-write task for this placeholder.
1103          *
1104          * @param p the placeholder
1105          */
1106         PersistentDiskWriteTask(Placeholder p) {
1107             super(p);
1108         }
1109 
1110         /***
1111          * {@inheritDoc}
1112          */
1113         @Override
1114         public DiskMarker call() {
1115             DiskMarker result = super.call();
1116             if (result != null) {
1117                 int disk = onDisk.incrementAndGet();
1118                 onDiskEvict(disk, getPlaceholder().getKey());
1119             }
1120             return result;
1121         }
1122     }
1123 
1124     /***
1125      * Task that writes the index file for this factory.
1126      */
1127     class IndexWriteTask implements Callable<Void> {
1128 
1129         private final File index;
1130         private final boolean clearOnFlush;
1131 
1132         /***
1133          * Create a disk flush task that writes to the given file.
1134          *
1135          * @param index the file to write the index to
1136          * @param clear clear on flush flag
1137          */
1138         IndexWriteTask(File index, boolean clear) {
1139             this.index = index;
1140             this.clearOnFlush = clear;
1141         }
1142 
1143         /***
1144          * {@inheritDoc}
1145          */
1146         public synchronized Void call() throws IOException, InterruptedException {
1147             ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(index));
1148             try {
1149                 for (Object key : store.keySet()) {
1150                     Object o = store.unretrievedGet(key);
1151                     if (o instanceof Placeholder) {
1152                         o = new PersistentDiskWriteTask((Placeholder) o).call();
1153                         if (o == null) {
1154                             o = store.unretrievedGet(key);
1155                         }
1156                     }
1157 
1158                     if (o instanceof DiskMarker) {
1159                         DiskMarker marker = (DiskMarker) o;
1160                         oos.writeObject(key);
1161                         oos.writeObject(marker);
1162                     }
1163                 }
1164             } finally {
1165                 oos.close();
1166             }
1167             return null;
1168         }
1169 
1170     }
1171 
1172     private void loadIndex() {
1173         if (!indexFile.exists()) {
1174             return;
1175         }
1176 
1177         try {
1178             ObjectInputStream ois = new ObjectInputStream(new FileInputStream(indexFile));
1179             try {
1180                 Object key = ois.readObject();
1181                 Object value = ois.readObject();
1182 
1183                 DiskMarker marker = (DiskMarker) value;
1184                 while (true) {
1185                     marker.bindFactory(this);
1186                     markUsed(marker);
1187                     if (store.putRawIfAbsent(key, marker)) {
1188                         onDisk.incrementAndGet();
1189                     } else {
1190                         // the disk pool is full
1191                         return;
1192                     }
1193                     key = ois.readObject();
1194                     marker = (DiskMarker) ois.readObject();
1195                 }
1196             } finally {
1197                 ois.close();
1198             }
1199         } catch (EOFException e) {
1200             // end of file reached, stop processing
1201         } catch (Exception e) {
1202             LOG.warn("Index file {} is corrupt, deleting and ignoring it : {}", indexFile, e);
1203             e.printStackTrace();
1204             store.removeAll();
1205             deleteFile(indexFile);
1206         } finally {
1207             shrinkDataFile();
1208         }
1209     }
1210 
1211     /***
1212      * Return the index file for this store.
1213      * @return the index file
1214      */
1215     public File getIndexFile() {
1216         return indexFile;
1217     }
1218 }