1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.rmi.RemoteException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import java.util.zip.GZIPInputStream;
31 import java.util.zip.GZIPOutputStream;
32
33 /***
34 * This class provides utility methods for assembling and disassembling a heartbeat payload.
35 * <p/>
36 * Care is taken to fit the payload into the MTU of ethernet, which is 1500 bytes. The algorithms in this class are capable of creating
37 * payloads for CacheManagers containing approximately 500 cache peers to be replicated.
38 *
39 * @author <a href="mailto:gluck@thoughtworks.com">Greg Luck</a>
40 * @version $Id: PayloadUtil.html 13146 2011-08-01 17:12:39Z oletizi $
41 */
42 final class PayloadUtil {
43
44 /***
45 * The maximum transmission unit. This varies by link layer. For ethernet, fast ethernet and
46 * gigabit ethernet it is 1500 bytes, the value chosen.
47 * <p/>
48 * Payloads are limited to this so that there is no fragmentation and no necessity for a complex reassembly protocol.
49 */
50 public static final int MTU = 1500;
51
52 /***
53 * Delmits URLS sent via heartbeats over sockets
54 */
55 public static final String URL_DELIMITER = "|";
56
57 /***
58 * {@link #URL_DELIMITER} as a regular expression. Package protected, used in tests only
59 */
60 static final String URL_DELIMITER_REGEXP = "//|";
61
62 private static final Logger LOG = LoggerFactory.getLogger(PayloadUtil.class.getName());
63
64 /***
65 * Utility class therefore precent construction
66 */
67 private PayloadUtil() {
68
69 }
70
71 /***
72 * Creates a list of compressed (using gzip) url list. Breaks up the list of urlList such that size of each compressed entry in the list
73 * does not exceed the {@link #MTU} and the number of url's in each compressed entry does not exceed the maximumPeersPerSend parameter
74 *
75 * @param localCachePeers
76 * List containing the peers
77 * @param maximumPeersPerSend
78 * The maximum number of peers that can be present in one compressed entry
79 * @return List of compressed entries containing the peers urlList
80 */
81 public static List<byte[]> createCompressedPayloadList(final List<CachePeer> localCachePeers, final int maximumPeersPerSend) {
82 List<byte[]> rv = new ArrayList<byte[]>();
83 int iters = (int) Math.ceil((double) localCachePeers.size() / maximumPeersPerSend);
84 for (int i = 0; i < iters; i++) {
85 int fromIndex = maximumPeersPerSend * i;
86 int toIndex = Math.min(maximumPeersPerSend * (i + 1), localCachePeers.size());
87 List<CachePeer> subList = localCachePeers.subList(fromIndex, toIndex);
88 rv.addAll(createCompressedPayload(subList, MTU));
89 }
90 return rv;
91 }
92
93 /***
94 * Generates a list of compressed urlList's for the input CachePeers list. Each compressed payload is limited by size by the
95 * maxSizePerPayload parameter and will break up into multiple payloads if necessary to limit the payload size
96 *
97 * @param list The list of CachePeers whose payload needs to be generated
98 * @param maxSizePerPayload The maximum size each payload can have
99 * @return A list of compressed urlList's, each compressed entry not exceeding maxSizePerPayload
100 */
101 private static List<byte[]> createCompressedPayload(final List<CachePeer> list, final int maxSizePerPayload) {
102 List<byte[]> rv = new ArrayList<byte[]>();
103 byte[] compressed = gzip(assembleUrlList(list));
104 if (compressed.length <= maxSizePerPayload) {
105
106 rv.add(compressed);
107 } else {
108
109 if (list.size() == 1) {
110
111 String url = null;
112 try {
113 url = list.get(0).getUrl();
114 } catch (RemoteException e) {
115 LOG.error("This should never be thrown as it is called locally");
116 }
117 LOG.error("The replicated cache url is too long. Unless configured with a smaller name, " +
118 "heartbeat won't work for this cache. " +
119 "Compressed url size: " + compressed.length + " MTU: " + maxSizePerPayload + " URL: " + url);
120 return Collections.EMPTY_LIST;
121 }
122 List<CachePeer> list1 = list.subList(0, list.size() / 2);
123 List<CachePeer> list2 = list.subList(list.size() / 2, list.size());
124 rv.addAll(createCompressedPayload(list1, maxSizePerPayload));
125 rv.addAll(createCompressedPayload(list2, maxSizePerPayload));
126 }
127 return rv;
128 }
129
130 /***
131 * Assembles a list of URLs
132 *
133 * @param localCachePeers
134 * @return an uncompressed payload with catenated rmiUrls.
135 */
136 public static byte[] assembleUrlList(List localCachePeers) {
137 StringBuilder sb = new StringBuilder();
138 for (int i = 0; i < localCachePeers.size(); i++) {
139 CachePeer cachePeer = (CachePeer) localCachePeers.get(i);
140 String rmiUrl = null;
141 try {
142 rmiUrl = cachePeer.getUrl();
143 } catch (RemoteException e) {
144 LOG.error("This should never be thrown as it is called locally");
145 }
146 if (i != localCachePeers.size() - 1) {
147 sb.append(rmiUrl).append(URL_DELIMITER);
148 } else {
149 sb.append(rmiUrl);
150 }
151 }
152
153 LOG.debug("Cache peers for this CacheManager to be advertised: {}", sb);
154 return sb.toString().getBytes();
155 }
156
157 /***
158 * Gzips a byte[]. For text, approximately 10:1 compression is achieved.
159 *
160 * @param ungzipped
161 * the bytes to be gzipped
162 * @return gzipped bytes
163 */
164 public static byte[] gzip(byte[] ungzipped) {
165 final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
166 try {
167 final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bytes);
168 gzipOutputStream.write(ungzipped);
169 gzipOutputStream.close();
170 } catch (IOException e) {
171 LOG.error("Could not gzip " + Arrays.toString(ungzipped));
172 }
173 return bytes.toByteArray();
174 }
175
176 /***
177 * The fastest Ungzip implementation. See PageInfoTest in ehcache-constructs.
178 * A high performance implementation, although not as fast as gunzip3.
179 * gunzips 100000 of ungzipped content in 9ms on the reference machine.
180 * It does not use a fixed size buffer and is therefore suitable for arbitrary
181 * length arrays.
182 *
183 * @param gzipped
184 * @return a plain, uncompressed byte[]
185 */
186 public static byte[] ungzip(final byte[] gzipped) {
187 byte[] ungzipped = new byte[0];
188 try {
189 final GZIPInputStream inputStream = new GZIPInputStream(new ByteArrayInputStream(gzipped));
190 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(gzipped.length);
191 final byte[] buffer = new byte[PayloadUtil.MTU];
192 int bytesRead = 0;
193 while (bytesRead != -1) {
194 bytesRead = inputStream.read(buffer, 0, PayloadUtil.MTU);
195 if (bytesRead != -1) {
196 byteArrayOutputStream.write(buffer, 0, bytesRead);
197 }
198 }
199 ungzipped = byteArrayOutputStream.toByteArray();
200 inputStream.close();
201 byteArrayOutputStream.close();
202 } catch (IOException e) {
203 LOG.error("Could not ungzip. Heartbeat will not be working. " + e.getMessage());
204 }
205 return ungzipped;
206 }
207
208 }