1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.Ehcache;
20 import net.sf.ehcache.Element;
21 import net.sf.ehcache.bootstrap.BootstrapCacheLoader;
22
23 import java.io.Serializable;
24 import java.rmi.RemoteException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Random;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /***
33 * Loads Elements from a random Cache Peer
34 *
35 * @author Greg Luck
36 * @version $Id: RMIBootstrapCacheLoader.html 13146 2011-08-01 17:12:39Z oletizi $
37 */
38 public class RMIBootstrapCacheLoader implements BootstrapCacheLoader, Cloneable {
39
40 private static final int ONE_SECOND = 1000;
41
42 private static final Logger LOG = LoggerFactory.getLogger(RMIBootstrapCacheLoader.class.getName());
43
44 /***
45 * Whether to load asynchronously
46 */
47 protected boolean asynchronous;
48
49 /***
50 * The maximum serialized size of the elements to request from a remote cache peer during bootstrap.
51 */
52 protected int maximumChunkSizeBytes;
53
54 /***
55 * Creates a boostrap cache loader that will work with RMI based distribution
56 *
57 * @param asynchronous Whether to load asynchronously
58 */
59 public RMIBootstrapCacheLoader(boolean asynchronous, int maximumChunkSize) {
60 this.asynchronous = asynchronous;
61 this.maximumChunkSizeBytes = maximumChunkSize;
62 }
63
64
65 /***
66 * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
67 * size. This balances memory use on each end and network performance.
68 *
69 * @throws RemoteCacheException
70 * if anything goes wrong with the remote call
71 */
72 public void load(Ehcache cache) throws RemoteCacheException {
73 if (asynchronous) {
74 BootstrapThread bootstrapThread = new BootstrapThread(cache);
75 bootstrapThread.start();
76 } else {
77 doLoad(cache);
78 }
79 }
80
81 /***
82 * @return true if this bootstrap loader is asynchronous
83 */
84 public boolean isAsynchronous() {
85 return asynchronous;
86 }
87
88
89 /***
90 * A background daemon thread that asynchronously calls doLoad
91 */
92 private final class BootstrapThread extends Thread {
93 private Ehcache cache;
94
95 public BootstrapThread(Ehcache cache) {
96 super("Bootstrap Thread for cache " + cache.getName());
97 this.cache = cache;
98 setDaemon(true);
99 setPriority(Thread.NORM_PRIORITY);
100 }
101
102 /***
103 * RemoteDebugger thread method.
104 */
105 public final void run() {
106 try {
107 doLoad(cache);
108 } catch (RemoteCacheException e) {
109 LOG.warn("Error asynchronously performing bootstrap. The cause was: " + e.getMessage(), e);
110 } finally {
111 cache = null;
112 }
113
114 }
115
116 }
117
118
119 /***
120 * Bootstraps the cache from a random CachePeer. Requests are done in chunks estimated at 5MB Serializable
121 * size. This balances memory use on each end and network performance.
122 * <p/>
123 * Bootstrapping requires the establishment of a cluster. This can be instantaneous for manually configued
124 * clusters or may take a number of seconds for multicast ones. This method waits up to 11 seconds for a cluster
125 * to form.
126 *
127 * @throws RemoteCacheException
128 * if anything goes wrong with the remote call
129 */
130 public void doLoad(Ehcache cache) throws RemoteCacheException {
131
132 List cachePeers = acquireCachePeers(cache);
133 if (cachePeers == null || cachePeers.size() == 0) {
134 LOG.debug("Empty list of cache peers for cache " + cache.getName() + ". No cache peer to bootstrap from.");
135 return;
136 }
137 Random random = new Random();
138 int randomPeerNumber = random.nextInt(cachePeers.size());
139 CachePeer cachePeer = (CachePeer) cachePeers.get(randomPeerNumber);
140 LOG.debug("Bootstrapping " + cache.getName() + " from " + cachePeer);
141
142 try {
143
144
145 Element sampleElement = null;
146 List keys = cachePeer.getKeys();
147 for (int i = 0; i < keys.size(); i++) {
148 Serializable key = (Serializable) keys.get(i);
149 sampleElement = cachePeer.getQuiet(key);
150 if (sampleElement != null && sampleElement.getSerializedSize() != 0) {
151 break;
152 }
153 }
154 if (sampleElement == null) {
155 LOG.debug("All cache peer elements were either null or empty. Nothing to bootstrap from. Cache was "
156 + cache.getName() + ". Cache peer was " + cachePeer);
157 return;
158 }
159 long size = sampleElement.getSerializedSize();
160 int chunkSize = (int) (maximumChunkSizeBytes / size);
161
162 List requestChunk = new ArrayList();
163 for (int i = 0; i < keys.size(); i++) {
164 Serializable serializable = (Serializable) keys.get(i);
165 requestChunk.add(serializable);
166 if (requestChunk.size() == chunkSize) {
167 fetchAndPutElements(cache, requestChunk, cachePeer);
168 requestChunk.clear();
169 }
170 }
171
172 fetchAndPutElements(cache, requestChunk, cachePeer);
173 LOG.debug("Bootstrap of " + cache.getName() + " from " + cachePeer + " finished. "
174 + keys.size() + " keys requested.");
175 } catch (Throwable t) {
176 throw new RemoteCacheException("Error bootstrapping from remote peer. Message was: " + t.getMessage(), t);
177 }
178 }
179
180 /***
181 * Acquires the cache peers for this cache.
182 *
183 * @param cache
184 */
185 protected List acquireCachePeers(Ehcache cache) {
186
187 long timeForClusterToForm = 0;
188 CacheManagerPeerProvider cacheManagerPeerProvider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
189 if (cacheManagerPeerProvider != null) {
190 timeForClusterToForm = cacheManagerPeerProvider.getTimeForClusterToForm();
191 }
192 if (LOG.isDebugEnabled()) {
193 LOG.debug("Attempting to acquire cache peers for cache " + cache.getName()
194 + " to bootstrap from. Will wait up to " + timeForClusterToForm + "ms for cache to join cluster.");
195 }
196 List cachePeers = null;
197 for (int i = 0; i <= timeForClusterToForm; i = i + ONE_SECOND) {
198 cachePeers = listRemoteCachePeers(cache);
199 if (cachePeers == null) {
200 break;
201 }
202 if (cachePeers.size() > 0) {
203 break;
204 }
205 try {
206 Thread.sleep(ONE_SECOND);
207 } catch (InterruptedException e) {
208 LOG.debug("doLoad for " + cache.getName() + " interrupted.");
209 }
210 }
211
212 LOG.debug("cache peers: {}", cachePeers);
213 return cachePeers;
214 }
215
216 /***
217 * Fetches a chunk of elements from a remote cache peer
218 *
219 * @param cache the cache to put elements in
220 * @param requestChunk the chunk of keys to request
221 * @param cachePeer the peer to fetch from
222 * @throws java.rmi.RemoteException
223 */
224 protected void fetchAndPutElements(Ehcache cache, List requestChunk, CachePeer cachePeer) throws RemoteException {
225 List receivedChunk = cachePeer.getElements(requestChunk);
226 for (int i = 0; i < receivedChunk.size(); i++) {
227 Element element = (Element) receivedChunk.get(i);
228
229 if (element != null) {
230 cache.put(element, true);
231 }
232 }
233 }
234
235 /***
236 * Package protected List of cache peers
237 *
238 * @param cache
239 */
240 protected List listRemoteCachePeers(Ehcache cache) {
241 CacheManagerPeerProvider provider = cache.getCacheManager().getCacheManagerPeerProvider("RMI");
242 if (provider == null) {
243 return null;
244 } else {
245 return provider.listRemoteCachePeers(cache);
246 }
247
248 }
249
250 /***
251 * Gets the maximum chunk size
252 */
253 public int getMaximumChunkSizeBytes() {
254 return maximumChunkSizeBytes;
255 }
256
257 /***
258 * Clones this loader
259 */
260 @Override
261 public Object clone() throws CloneNotSupportedException {
262
263 return new RMIBootstrapCacheLoader(asynchronous, maximumChunkSizeBytes);
264 }
265
266 }