001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.util; 018 019import java.io.DataOutput; 020import java.io.IOException; 021import java.io.OutputStream; 022import java.io.UTFDataFormatException; 023 024import org.apache.activemq.store.kahadb.disk.page.PageFile; 025import org.apache.activemq.util.ByteSequence; 026 027/** 028 * Optimized ByteArrayOutputStream 029 */ 030public class DataByteArrayOutputStream extends OutputStream implements DataOutput, AutoCloseable { 031 private static final int DEFAULT_SIZE = PageFile.DEFAULT_PAGE_SIZE; 032 protected byte buf[]; 033 protected int pos; 034 035 /** 036 * Creates a new byte array output stream, with a buffer capacity of the 037 * specified size, in bytes. 038 * 039 * @param size the initial size. 040 * @exception IllegalArgumentException if size is negative. 041 */ 042 public DataByteArrayOutputStream(int size) { 043 if (size < 0) { 044 throw new IllegalArgumentException("Invalid size: " + size); 045 } 046 buf = new byte[size]; 047 } 048 049 /** 050 * Creates a new byte array output stream. 051 */ 052 public DataByteArrayOutputStream() { 053 this(DEFAULT_SIZE); 054 } 055 056 /** 057 * start using a fresh byte array 058 * 059 * @param size 060 */ 061 public void restart(int size) { 062 buf = new byte[size]; 063 pos = 0; 064 } 065 066 /** 067 * start using a fresh byte array 068 */ 069 public void restart() { 070 restart(DEFAULT_SIZE); 071 } 072 073 /** 074 * Get a ByteSequence from the stream 075 * 076 * @return the byte sequence 077 */ 078 public ByteSequence toByteSequence() { 079 return new ByteSequence(buf, 0, pos); 080 } 081 082 /** 083 * Writes the specified byte to this byte array output stream. 084 * 085 * @param b the byte to be written. 086 * @throws IOException 087 */ 088 @Override 089 public void write(int b) throws IOException { 090 int newcount = pos + 1; 091 ensureEnoughBuffer(newcount); 092 buf[pos] = (byte)b; 093 pos = newcount; 094 onWrite(); 095 } 096 097 /** 098 * Writes <code>len</code> bytes from the specified byte array starting at 099 * offset <code>off</code> to this byte array output stream. 100 * 101 * @param b the data. 102 * @param off the start offset in the data. 103 * @param len the number of bytes to write. 104 * @throws IOException 105 */ 106 @Override 107 public void write(byte b[], int off, int len) throws IOException { 108 if (len == 0) { 109 return; 110 } 111 int newcount = pos + len; 112 ensureEnoughBuffer(newcount); 113 System.arraycopy(b, off, buf, pos, len); 114 pos = newcount; 115 onWrite(); 116 } 117 118 /** 119 * @return the underlying byte[] buffer 120 */ 121 public byte[] getData() { 122 return buf; 123 } 124 125 /** 126 * reset the output stream 127 */ 128 public void reset() { 129 pos = 0; 130 } 131 132 /** 133 * Set the current position for writing 134 * 135 * @param offset 136 * @throws IOException 137 */ 138 public void position(int offset) throws IOException { 139 ensureEnoughBuffer(offset); 140 pos = offset; 141 onWrite(); 142 } 143 144 public int size() { 145 return pos; 146 } 147 148 @Override 149 public void writeBoolean(boolean v) throws IOException { 150 ensureEnoughBuffer(pos + 1); 151 buf[pos++] = (byte)(v ? 1 : 0); 152 onWrite(); 153 } 154 155 @Override 156 public void writeByte(int v) throws IOException { 157 ensureEnoughBuffer(pos + 1); 158 buf[pos++] = (byte)(v >>> 0); 159 onWrite(); 160 } 161 162 @Override 163 public void writeShort(int v) throws IOException { 164 ensureEnoughBuffer(pos + 2); 165 buf[pos++] = (byte)(v >>> 8); 166 buf[pos++] = (byte)(v >>> 0); 167 onWrite(); 168 } 169 170 @Override 171 public void writeChar(int v) throws IOException { 172 ensureEnoughBuffer(pos + 2); 173 buf[pos++] = (byte)(v >>> 8); 174 buf[pos++] = (byte)(v >>> 0); 175 onWrite(); 176 } 177 178 @Override 179 public void writeInt(int v) throws IOException { 180 ensureEnoughBuffer(pos + 4); 181 buf[pos++] = (byte)(v >>> 24); 182 buf[pos++] = (byte)(v >>> 16); 183 buf[pos++] = (byte)(v >>> 8); 184 buf[pos++] = (byte)(v >>> 0); 185 onWrite(); 186 } 187 188 @Override 189 public void writeLong(long v) throws IOException { 190 ensureEnoughBuffer(pos + 8); 191 buf[pos++] = (byte)(v >>> 56); 192 buf[pos++] = (byte)(v >>> 48); 193 buf[pos++] = (byte)(v >>> 40); 194 buf[pos++] = (byte)(v >>> 32); 195 buf[pos++] = (byte)(v >>> 24); 196 buf[pos++] = (byte)(v >>> 16); 197 buf[pos++] = (byte)(v >>> 8); 198 buf[pos++] = (byte)(v >>> 0); 199 onWrite(); 200 } 201 202 @Override 203 public void writeFloat(float v) throws IOException { 204 writeInt(Float.floatToIntBits(v)); 205 } 206 207 @Override 208 public void writeDouble(double v) throws IOException { 209 writeLong(Double.doubleToLongBits(v)); 210 } 211 212 @Override 213 public void writeBytes(String s) throws IOException { 214 int length = s.length(); 215 for (int i = 0; i < length; i++) { 216 write((byte)s.charAt(i)); 217 } 218 } 219 220 @Override 221 public void writeChars(String s) throws IOException { 222 int length = s.length(); 223 for (int i = 0; i < length; i++) { 224 int c = s.charAt(i); 225 write((c >>> 8) & 0xFF); 226 write((c >>> 0) & 0xFF); 227 } 228 } 229 230 @Override 231 public void writeUTF(String str) throws IOException { 232 int strlen = str.length(); 233 int encodedsize = 0; 234 int c; 235 for (int i = 0; i < strlen; i++) { 236 c = str.charAt(i); 237 if ((c >= 0x0001) && (c <= 0x007F)) { 238 encodedsize++; 239 } else if (c > 0x07FF) { 240 encodedsize += 3; 241 } else { 242 encodedsize += 2; 243 } 244 } 245 if (encodedsize > 65535) { 246 throw new UTFDataFormatException("encoded string too long: " + encodedsize + " bytes"); 247 } 248 ensureEnoughBuffer(pos + encodedsize + 2); 249 writeShort(encodedsize); 250 for (int i = 0; i < strlen; i++) { 251 int charValue = str.charAt(i); 252 if (charValue > 0 && charValue <= 127) { 253 buf[pos++] = (byte) charValue; 254 } else if (charValue <= 2047) { 255 buf[pos++] = (byte) (0xc0 | (0x1f & (charValue >> 6))); 256 buf[pos++] = (byte) (0x80 | (0x3f & charValue)); 257 } else { 258 buf[pos++] = (byte) (0xe0 | (0x0f & (charValue >> 12))); 259 buf[pos++] = (byte) (0x80 | (0x3f & (charValue >> 6))); 260 buf[pos++] = (byte) (0x80 | (0x3f & charValue)); 261 } 262 } 263 onWrite(); 264 } 265 266 private void ensureEnoughBuffer(int newcount) { 267 if (newcount > buf.length) { 268 byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; 269 System.arraycopy(buf, 0, newbuf, 0, pos); 270 buf = newbuf; 271 } 272 } 273 274 /** 275 * This method is called after each write to the buffer. This should allow subclasses 276 * to take some action based on the writes, for example flushing data to an external system based on size. 277 */ 278 protected void onWrite() throws IOException { 279 } 280 281 public void skip(int size) throws IOException { 282 ensureEnoughBuffer(pos + size); 283 pos+=size; 284 onWrite(); 285 } 286 287 public ByteSequence getByteSequence() { 288 return new ByteSequence(buf, 0, pos); 289 } 290}