View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.writer.writebehind;
18  
19  import net.sf.ehcache.CacheEntry;
20  import net.sf.ehcache.CacheException;
21  import net.sf.ehcache.Element;
22  import net.sf.ehcache.config.CacheConfiguration;
23  import net.sf.ehcache.config.CacheWriterConfiguration;
24  import net.sf.ehcache.writer.CacheWriter;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.locks.ReentrantReadWriteLock;
29  
30  /***
31   * @author Alex Snaps
32   */
33  public class WriteBehindQueueManager implements WriteBehind {
34  
35      private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
36      private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
37      private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();
38  
39      private List<WriteBehindQueue> queues = new ArrayList<WriteBehindQueue>();
40  
41      /***
42       * Create a new write behind queue manager. Which in turn will create as many queues as
43       * required by the {@link net.sf.ehcache.config.CacheWriterConfiguration#getWriteBehindConcurrency}
44       *
45       * @param config the configuration for the queue
46       */
47      public WriteBehindQueueManager(CacheConfiguration config) {
48          CacheWriterConfiguration cacheWriterConfiguration = config.getCacheWriterConfiguration();
49          int writeBehindConcurrency = cacheWriterConfiguration.getWriteBehindConcurrency();
50          for (int i = 0; i < writeBehindConcurrency; i++) {
51              this.queues.add(new WriteBehindQueue(config));
52          }
53      }
54  
55      /***
56       * {@inheritDoc}
57       */
58      public void start(final CacheWriter writer) throws CacheException {
59          writeLock.lock();
60          try {
61              for (WriteBehindQueue queue : queues) {
62                  queue.start(writer);
63              }
64          } finally {
65              writeLock.unlock();
66          }
67      }
68  
69      /***
70       * {@inheritDoc}
71       */
72      public void write(final Element element) {
73          readLock.lock();
74          try {
75              getQueue(element.getKey()).write(element);
76          } finally {
77              readLock.unlock();
78          }
79      }
80  
81      private WriteBehindQueue getQueue(final Object key) {
82          return queues.get(Math.abs(key.hashCode() % queues.size()));
83      }
84  
85      /***
86       * {@inheritDoc}
87       */
88      public void delete(final CacheEntry entry) {
89          readLock.lock();
90          try {
91              getQueue(entry.getKey()).delete(entry);
92          } finally {
93              readLock.unlock();
94          }
95      }
96  
97      /***
98       * {@inheritDoc}
99       */
100     public void setOperationsFilter(final OperationsFilter filter) {
101         readLock.lock();
102         try {
103             for (WriteBehindQueue queue : queues) {
104                 queue.setOperationsFilter(filter);
105             }
106         } finally {
107             readLock.unlock();
108         }
109     }
110 
111     /***
112      * {@inheritDoc}
113      */
114     public void stop() throws CacheException {
115         writeLock.lock();
116         try {
117             for (WriteBehindQueue queue : queues) {
118                 queue.stop();
119             }
120         } finally {
121             writeLock.unlock();
122         }
123     }
124 
125     /***
126      * {@inheritDoc}
127      */
128     public long getQueueSize() {
129         int size = 0;
130         readLock.lock();
131         try {
132             for (WriteBehindQueue queue : queues) {
133                 size += queue.getQueueSize();
134             }
135         } finally {
136             readLock.unlock();
137         }
138         return size;
139     }
140 }