View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.Ehcache;
20  import net.sf.ehcache.Element;
21  import net.sf.ehcache.bootstrap.BootstrapCacheLoader;
22  
23  import java.io.Serializable;
24  import java.rmi.RemoteException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Random;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  /***
33   * Loads Elements from a random Cache Peer
34   *
35   * @author Greg Luck
36   * @version $Id: RMIBootstrapCacheLoader.html 13146 2011-08-01 17:12:39Z oletizi $
37   */
38  public class RMIBootstrapCacheLoader implements BootstrapCacheLoader, Cloneable {
39  
40      private static final int ONE_SECOND = 1000;
41  
42      private static final Logger LOG = LoggerFactory.getLogger(RMIBootstrapCacheLoader.class.getName());
43  
44      /***
45       * Whether to load asynchronously
46       */
47      protected boolean asynchronous;
48  
49      /***
50       * The maximum serialized size of the elements to request from a remote cache peer during bootstrap.
51       */
52      protected int maximumChunkSizeBytes;
53  
54      /***
55       * Creates a boostrap cache loader that will work with RMI based distribution
56       *
57       * @param asynchronous Whether to load asynchronously
58       */
59      public RMIBootstrapCacheLoader(boolean asynchronous, int maximumChunkSize) {
60          this.asynchronous = asynchronous;
61          this.maximumChunkSizeBytes = maximumChunkSize;
62      }
63  
64  
65      /***
66       * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
67       * size. This balances memory use on each end and network performance.
68       *
69       * @throws RemoteCacheException
70       *          if anything goes wrong with the remote call
71       */
72      public void load(Ehcache cache) throws RemoteCacheException {
73          if (asynchronous) {
74              BootstrapThread bootstrapThread = new BootstrapThread(cache);
75              bootstrapThread.start();
76          } else {
77              doLoad(cache);
78          }
79      }
80  
81      /***
82       * @return true if this bootstrap loader is asynchronous
83       */
84      public boolean isAsynchronous() {
85          return asynchronous;
86      }
87  
88  
89      /***
90       * A background daemon thread that asynchronously calls doLoad
91       */
92      private final class BootstrapThread extends Thread {
93          private Ehcache cache;
94  
95          public BootstrapThread(Ehcache cache) {
96              super("Bootstrap Thread for cache " + cache.getName());
97              this.cache = cache;
98              setDaemon(true);
99              setPriority(Thread.NORM_PRIORITY);
100         }
101 
102         /***
103          * RemoteDebugger thread method.
104          */
105         public final void run() {
106             try {
107                 doLoad(cache);
108             } catch (RemoteCacheException e) {
109                 LOG.warn("Error asynchronously performing bootstrap. The cause was: " + e.getMessage(), e);
110             } finally {
111                 cache = null;
112             }
113 
114         }
115 
116     }
117 
118 
119     /***
120      * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
121      * size. This balances memory use on each end and network performance.
122      * <p/>
123      * Bootstrapping requires the establishment of a cluster. This can be instantaneous for manually configued
124      * clusters or may take a number of seconds for multicast ones. This method waits up to 11 seconds for a cluster
125      * to form.
126      *
127      * @throws RemoteCacheException
128      *          if anything goes wrong with the remote call
129      */
130     public void doLoad(Ehcache cache) throws RemoteCacheException {
131 
132         List cachePeers = acquireCachePeers(cache);
133         if (cachePeers == null || cachePeers.size() == 0) {
134             LOG.debug("Empty list of cache peers for cache " + cache.getName() + ". No cache peer to bootstrap from.");
135             return;
136         }
137         Random random = new Random();
138         int randomPeerNumber = random.nextInt(cachePeers.size());
139         CachePeer cachePeer = (CachePeer) cachePeers.get(randomPeerNumber);
140         LOG.debug("Bootstrapping " + cache.getName() + " from " + cachePeer);
141 
142         try {
143 
144             //Estimate element size
145             Element sampleElement = null;
146             List keys = cachePeer.getKeys();
147             for (int i = 0; i < keys.size(); i++) {
148                 Serializable key = (Serializable) keys.get(i);
149                 sampleElement = cachePeer.getQuiet(key);
150                 if (sampleElement != null && sampleElement.getSerializedSize() != 0) {
151                     break;
152                 }
153             }
154             if (sampleElement == null) {
155                 LOG.debug("All cache peer elements were either null or empty. Nothing to bootstrap from. Cache was "
156                         + cache.getName() + ". Cache peer was " + cachePeer);
157                 return;
158             }
159             long size = sampleElement.getSerializedSize();
160             int chunkSize = (int) (maximumChunkSizeBytes / size);
161 
162             List requestChunk = new ArrayList();
163             for (int i = 0; i < keys.size(); i++) {
164                 Serializable serializable = (Serializable) keys.get(i);
165                 requestChunk.add(serializable);
166                 if (requestChunk.size() == chunkSize) {
167                     fetchAndPutElements(cache, requestChunk, cachePeer);
168                     requestChunk.clear();
169                 }
170             }
171             //get leftovers
172             fetchAndPutElements(cache, requestChunk, cachePeer);
173             LOG.debug("Bootstrap of " + cache.getName() + " from " + cachePeer + " finished. "
174                     + keys.size() + " keys requested.");
175         } catch (Throwable t) {
176             throw new RemoteCacheException("Error bootstrapping from remote peer. Message was: " + t.getMessage(), t);
177         }
178     }
179 
180     /***
181      * Acquires the cache peers for this cache.
182      *
183      * @param cache
184      */
185     protected List acquireCachePeers(Ehcache cache) {
186 
187         long timeForClusterToForm = 0;
188         CacheManagerPeerProvider cacheManagerPeerProvider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
189         if (cacheManagerPeerProvider != null) {
190             timeForClusterToForm = cacheManagerPeerProvider.getTimeForClusterToForm();
191         }
192         if (LOG.isDebugEnabled()) {
193             LOG.debug("Attempting to acquire cache peers for cache " + cache.getName()
194                     + " to bootstrap from. Will wait up to " + timeForClusterToForm + "ms for cache to join cluster.");
195         }
196         List cachePeers = null;
197         for (int i = 0; i <= timeForClusterToForm; i = i + ONE_SECOND) {
198             cachePeers = listRemoteCachePeers(cache);
199             if (cachePeers == null) {
200                 break;
201             }
202             if (cachePeers.size() > 0) {
203                 break;
204             }
205             try {
206                 Thread.sleep(ONE_SECOND);
207             } catch (InterruptedException e) {
208                 LOG.debug("doLoad for " + cache.getName() + " interrupted.");
209             }
210         }
211 
212             LOG.debug("cache peers: {}", cachePeers);
213         return cachePeers;
214     }
215 
216     /***
217      * Fetches a chunk of elements from a remote cache peer
218      *
219      * @param cache        the cache to put elements in
220      * @param requestChunk the chunk of keys to request
221      * @param cachePeer    the peer to fetch from
222      * @throws java.rmi.RemoteException
223      */
224     protected void fetchAndPutElements(Ehcache cache, List requestChunk, CachePeer cachePeer) throws RemoteException {
225         List receivedChunk = cachePeer.getElements(requestChunk);
226         for (int i = 0; i < receivedChunk.size(); i++) {
227             Element element = (Element) receivedChunk.get(i);
228             // element could be expired at the peer
229             if (element != null) {
230                 cache.put(element, true);
231             }
232         }
233     }
234 
235     /***
236      * Package protected List of cache peers
237      *
238      * @param cache
239      */
240     protected List listRemoteCachePeers(Ehcache cache) {
241         CacheManagerPeerProvider provider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
242         if (provider == null) {
243             return null;
244         } else {
245             return provider.listRemoteCachePeers(cache);
246         }
247 
248     }
249 
250     /***
251      * Gets the maximum chunk size
252      */
253     public int getMaximumChunkSizeBytes() {
254         return maximumChunkSizeBytes;
255     }
256 
257     /***
258      * Clones this loader
259      */
260     @Override
261     public Object clone() throws CloneNotSupportedException {
262         //checkstyle
263         return new RMIBootstrapCacheLoader(asynchronous, maximumChunkSizeBytes);
264     }
265 
266 }