001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.openwire; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.lang.reflect.Method; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.CommandTypes; 027import org.apache.activemq.command.DataStructure; 028import org.apache.activemq.command.WireFormatInfo; 029import org.apache.activemq.util.ByteSequence; 030import org.apache.activemq.util.ByteSequenceData; 031import org.apache.activemq.util.DataByteArrayInputStream; 032import org.apache.activemq.util.DataByteArrayOutputStream; 033import org.apache.activemq.wireformat.WireFormat; 034 035/** 036 * 037 * 038 */ 039public final class OpenWireFormat implements WireFormat { 040 041 public static final int DEFAULT_STORE_VERSION = CommandTypes.PROTOCOL_STORE_VERSION; 042 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION; 043 public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION; 044 public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; 045 046 static final byte NULL_TYPE = CommandTypes.NULL; 047 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2; 048 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 049 050 private DataStreamMarshaller dataMarshallers[]; 051 private int version; 052 private boolean stackTraceEnabled; 053 private boolean tcpNoDelayEnabled; 054 private boolean cacheEnabled; 055 private boolean tightEncodingEnabled; 056 private boolean sizePrefixDisabled; 057 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 058 059 // The following fields are used for value caching 060 private short nextMarshallCacheIndex; 061 private short nextMarshallCacheEvictionIndex; 062 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>(); 063 private DataStructure marshallCache[] = null; 064 private DataStructure unmarshallCache[] = null; 065 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 066 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 067 private WireFormatInfo preferedWireFormatInfo; 068 069 public OpenWireFormat() { 070 this(DEFAULT_STORE_VERSION); 071 } 072 073 public OpenWireFormat(int i) { 074 setVersion(i); 075 } 076 077 @Override 078 public int hashCode() { 079 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000) 080 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000) 081 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000) 082 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000); 083 } 084 085 public OpenWireFormat copy() { 086 OpenWireFormat answer = new OpenWireFormat(version); 087 answer.stackTraceEnabled = stackTraceEnabled; 088 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 089 answer.cacheEnabled = cacheEnabled; 090 answer.tightEncodingEnabled = tightEncodingEnabled; 091 answer.sizePrefixDisabled = sizePrefixDisabled; 092 answer.preferedWireFormatInfo = preferedWireFormatInfo; 093 return answer; 094 } 095 096 @Override 097 public boolean equals(Object object) { 098 if (object == null) { 099 return false; 100 } 101 OpenWireFormat o = (OpenWireFormat)object; 102 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled 103 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled 104 && o.sizePrefixDisabled == sizePrefixDisabled; 105 } 106 107 108 @Override 109 public String toString() { 110 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" 111 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}"; 112 // return "OpenWireFormat{id="+id+", 113 // tightEncodingEnabled="+tightEncodingEnabled+"}"; 114 } 115 116 @Override 117 public int getVersion() { 118 return version; 119 } 120 121 @Override 122 public synchronized ByteSequence marshal(Object command) throws IOException { 123 124 if (cacheEnabled) { 125 runMarshallCacheEvictionSweep(); 126 } 127 128 ByteSequence sequence = null; 129 int size = 1; 130 if (command != null) { 131 132 DataStructure c = (DataStructure)command; 133 byte type = c.getDataStructureType(); 134 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 135 if (dsm == null) { 136 throw new IOException("Unknown data type: " + type); 137 } 138 if (tightEncodingEnabled) { 139 140 BooleanStream bs = new BooleanStream(); 141 size += dsm.tightMarshal1(this, c, bs); 142 size += bs.marshalledSize(); 143 144 bytesOut.restart(size); 145 if (!sizePrefixDisabled) { 146 bytesOut.writeInt(size); 147 } 148 bytesOut.writeByte(type); 149 bs.marshal(bytesOut); 150 dsm.tightMarshal2(this, c, bytesOut, bs); 151 sequence = bytesOut.toByteSequence(); 152 153 } else { 154 bytesOut.restart(); 155 if (!sizePrefixDisabled) { 156 bytesOut.writeInt(0); // we don't know the final size 157 // yet but write this here for 158 // now. 159 } 160 bytesOut.writeByte(type); 161 dsm.looseMarshal(this, c, bytesOut); 162 sequence = bytesOut.toByteSequence(); 163 164 if (!sizePrefixDisabled) { 165 size = sequence.getLength() - 4; 166 int pos = sequence.offset; 167 ByteSequenceData.writeIntBig(sequence, size); 168 sequence.offset = pos; 169 } 170 } 171 172 } else { 173 bytesOut.restart(5); 174 bytesOut.writeInt(size); 175 bytesOut.writeByte(NULL_TYPE); 176 sequence = bytesOut.toByteSequence(); 177 } 178 179 return sequence; 180 } 181 182 @Override 183 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 184 bytesIn.restart(sequence); 185 // DataInputStream dis = new DataInputStream(new 186 // ByteArrayInputStream(sequence)); 187 188 if (!sizePrefixDisabled) { 189 int size = bytesIn.readInt(); 190 if (sequence.getLength() - 4 != size) { 191 // throw new IOException("Packet size does not match marshaled 192 // size"); 193 } 194 195 if (size > maxFrameSize) { 196 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 197 } 198 } 199 200 Object command = doUnmarshal(bytesIn); 201 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 202 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence); 203 // } 204 return command; 205 } 206 207 @Override 208 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 209 210 if (cacheEnabled) { 211 runMarshallCacheEvictionSweep(); 212 } 213 214 int size = 1; 215 if (o != null) { 216 217 DataStructure c = (DataStructure)o; 218 byte type = c.getDataStructureType(); 219 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 220 if (dsm == null) { 221 throw new IOException("Unknown data type: " + type); 222 } 223 if (tightEncodingEnabled) { 224 BooleanStream bs = new BooleanStream(); 225 size += dsm.tightMarshal1(this, c, bs); 226 size += bs.marshalledSize(); 227 228 if (!sizePrefixDisabled) { 229 dataOut.writeInt(size); 230 } 231 232 dataOut.writeByte(type); 233 bs.marshal(dataOut); 234 dsm.tightMarshal2(this, c, dataOut, bs); 235 236 } else { 237 DataOutput looseOut = dataOut; 238 239 if (!sizePrefixDisabled) { 240 bytesOut.restart(); 241 looseOut = bytesOut; 242 } 243 244 looseOut.writeByte(type); 245 dsm.looseMarshal(this, c, looseOut); 246 247 if (!sizePrefixDisabled) { 248 ByteSequence sequence = bytesOut.toByteSequence(); 249 dataOut.writeInt(sequence.getLength()); 250 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 251 } 252 253 } 254 255 } else { 256 if (!sizePrefixDisabled) { 257 dataOut.writeInt(size); 258 } 259 dataOut.writeByte(NULL_TYPE); 260 } 261 } 262 263 @Override 264 public Object unmarshal(DataInput dis) throws IOException { 265 DataInput dataIn = dis; 266 if (!sizePrefixDisabled) { 267 int size = dis.readInt(); 268 if (size > maxFrameSize) { 269 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 270 } 271 // int size = dis.readInt(); 272 // byte[] data = new byte[size]; 273 // dis.readFully(data); 274 // bytesIn.restart(data); 275 // dataIn = bytesIn; 276 } 277 return doUnmarshal(dataIn); 278 } 279 280 /** 281 * Used by NIO or AIO transports 282 */ 283 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 284 int size = 1; 285 if (o != null) { 286 DataStructure c = (DataStructure)o; 287 byte type = c.getDataStructureType(); 288 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 289 if (dsm == null) { 290 throw new IOException("Unknown data type: " + type); 291 } 292 293 size += dsm.tightMarshal1(this, c, bs); 294 size += bs.marshalledSize(); 295 } 296 return size; 297 } 298 299 /** 300 * Used by NIO or AIO transports; note that the size is not written as part 301 * of this method. 302 */ 303 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 304 if (cacheEnabled) { 305 runMarshallCacheEvictionSweep(); 306 } 307 308 if (o != null) { 309 DataStructure c = (DataStructure)o; 310 byte type = c.getDataStructureType(); 311 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 312 if (dsm == null) { 313 throw new IOException("Unknown data type: " + type); 314 } 315 ds.writeByte(type); 316 bs.marshal(ds); 317 dsm.tightMarshal2(this, c, ds, bs); 318 } 319 } 320 321 /** 322 * Allows you to dynamically switch the version of the openwire protocol 323 * being used. 324 * 325 * @param version 326 */ 327 @Override 328 public void setVersion(int version) { 329 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory"; 330 Class mfClass; 331 try { 332 mfClass = Class.forName(mfName, false, getClass().getClassLoader()); 333 } catch (ClassNotFoundException e) { 334 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version 335 + ", could not load " + mfName) 336 .initCause(e); 337 } 338 try { 339 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class}); 340 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this}); 341 } catch (Throwable e) { 342 throw (IllegalArgumentException)new IllegalArgumentException( 343 "Invalid version: " 344 + version 345 + ", " 346 + mfName 347 + " does not properly implement the createMarshallerMap method.") 348 .initCause(e); 349 } 350 this.version = version; 351 } 352 353 public Object doUnmarshal(DataInput dis) throws IOException { 354 byte dataType = dis.readByte(); 355 if (dataType != NULL_TYPE) { 356 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 357 if (dsm == null) { 358 throw new IOException("Unknown data type: " + dataType); 359 } 360 Object data = dsm.createObject(); 361 if (this.tightEncodingEnabled) { 362 BooleanStream bs = new BooleanStream(); 363 bs.unmarshal(dis); 364 dsm.tightUnmarshal(this, data, dis, bs); 365 } else { 366 dsm.looseUnmarshal(this, data, dis); 367 } 368 return data; 369 } else { 370 return null; 371 } 372 } 373 374 // public void debug(String msg) { 375 // String t = (Thread.currentThread().getName()+" ").substring(0, 40); 376 // System.out.println(t+": "+msg); 377 // } 378 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 379 bs.writeBoolean(o != null); 380 if (o == null) { 381 return 0; 382 } 383 384 if (o.isMarshallAware()) { 385 // MarshallAware ma = (MarshallAware)o; 386 ByteSequence sequence = null; 387 // sequence=ma.getCachedMarshalledForm(this); 388 bs.writeBoolean(sequence != null); 389 if (sequence != null) { 390 return 1 + sequence.getLength(); 391 } 392 } 393 394 byte type = o.getDataStructureType(); 395 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 396 if (dsm == null) { 397 throw new IOException("Unknown data type: " + type); 398 } 399 return 1 + dsm.tightMarshal1(this, o, bs); 400 } 401 402 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) 403 throws IOException { 404 if (!bs.readBoolean()) { 405 return; 406 } 407 408 byte type = o.getDataStructureType(); 409 ds.writeByte(type); 410 411 if (o.isMarshallAware() && bs.readBoolean()) { 412 413 // We should not be doing any caching 414 throw new IOException("Corrupted stream"); 415 // MarshallAware ma = (MarshallAware) o; 416 // ByteSequence sequence=ma.getCachedMarshalledForm(this); 417 // ds.write(sequence.getData(), sequence.getOffset(), 418 // sequence.getLength()); 419 420 } else { 421 422 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 423 if (dsm == null) { 424 throw new IOException("Unknown data type: " + type); 425 } 426 dsm.tightMarshal2(this, o, ds, bs); 427 428 } 429 } 430 431 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 432 if (bs.readBoolean()) { 433 434 byte dataType = dis.readByte(); 435 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 436 if (dsm == null) { 437 throw new IOException("Unknown data type: " + dataType); 438 } 439 DataStructure data = dsm.createObject(); 440 441 if (data.isMarshallAware() && bs.readBoolean()) { 442 443 dis.readInt(); 444 dis.readByte(); 445 446 BooleanStream bs2 = new BooleanStream(); 447 bs2.unmarshal(dis); 448 dsm.tightUnmarshal(this, data, dis, bs2); 449 450 // TODO: extract the sequence from the dis and associate it. 451 // MarshallAware ma = (MarshallAware)data 452 // ma.setCachedMarshalledForm(this, sequence); 453 454 } else { 455 dsm.tightUnmarshal(this, data, dis, bs); 456 } 457 458 return data; 459 } else { 460 return null; 461 } 462 } 463 464 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 465 if (dis.readBoolean()) { 466 467 byte dataType = dis.readByte(); 468 DataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF]; 469 if (dsm == null) { 470 throw new IOException("Unknown data type: " + dataType); 471 } 472 DataStructure data = dsm.createObject(); 473 dsm.looseUnmarshal(this, data, dis); 474 return data; 475 476 } else { 477 return null; 478 } 479 } 480 481 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 482 dataOut.writeBoolean(o != null); 483 if (o != null) { 484 byte type = o.getDataStructureType(); 485 dataOut.writeByte(type); 486 DataStreamMarshaller dsm = dataMarshallers[type & 0xFF]; 487 if (dsm == null) { 488 throw new IOException("Unknown data type: " + type); 489 } 490 dsm.looseMarshal(this, o, dataOut); 491 } 492 } 493 494 public void runMarshallCacheEvictionSweep() { 495 // Do we need to start evicting?? 496 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) { 497 498 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 499 marshallCache[nextMarshallCacheEvictionIndex] = null; 500 501 nextMarshallCacheEvictionIndex++; 502 if (nextMarshallCacheEvictionIndex >= marshallCache.length) { 503 nextMarshallCacheEvictionIndex = 0; 504 } 505 506 } 507 } 508 509 public Short getMarshallCacheIndex(DataStructure o) { 510 return marshallCacheMap.get(o); 511 } 512 513 public Short addToMarshallCache(DataStructure o) { 514 short i = nextMarshallCacheIndex++; 515 if (nextMarshallCacheIndex >= marshallCache.length) { 516 nextMarshallCacheIndex = 0; 517 } 518 519 // We can only cache that item if there is space left. 520 if (marshallCacheMap.size() < marshallCache.length) { 521 marshallCache[i] = o; 522 Short index = new Short(i); 523 marshallCacheMap.put(o, index); 524 return index; 525 } else { 526 // Use -1 to indicate that the value was not cached due to cache 527 // being full. 528 return new Short((short)-1); 529 } 530 } 531 532 public void setInUnmarshallCache(short index, DataStructure o) { 533 534 // There was no space left in the cache, so we can't 535 // put this in the cache. 536 if (index == -1) { 537 return; 538 } 539 540 unmarshallCache[index] = o; 541 } 542 543 public DataStructure getFromUnmarshallCache(short index) { 544 return unmarshallCache[index]; 545 } 546 547 public void setStackTraceEnabled(boolean b) { 548 stackTraceEnabled = b; 549 } 550 551 public boolean isStackTraceEnabled() { 552 return stackTraceEnabled; 553 } 554 555 public boolean isTcpNoDelayEnabled() { 556 return tcpNoDelayEnabled; 557 } 558 559 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 560 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 561 } 562 563 public boolean isCacheEnabled() { 564 return cacheEnabled; 565 } 566 567 public void setCacheEnabled(boolean cacheEnabled) { 568 if(cacheEnabled){ 569 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 570 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE]; 571 } 572 this.cacheEnabled = cacheEnabled; 573 } 574 575 public boolean isTightEncodingEnabled() { 576 return tightEncodingEnabled; 577 } 578 579 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 580 this.tightEncodingEnabled = tightEncodingEnabled; 581 } 582 583 public boolean isSizePrefixDisabled() { 584 return sizePrefixDisabled; 585 } 586 587 public void setSizePrefixDisabled(boolean prefixPacketSize) { 588 this.sizePrefixDisabled = prefixPacketSize; 589 } 590 591 public void setPreferedWireFormatInfo(WireFormatInfo info) { 592 this.preferedWireFormatInfo = info; 593 } 594 595 public WireFormatInfo getPreferedWireFormatInfo() { 596 return preferedWireFormatInfo; 597 } 598 599 public long getMaxFrameSize() { 600 return maxFrameSize; 601 } 602 603 public void setMaxFrameSize(long maxFrameSize) { 604 this.maxFrameSize = maxFrameSize; 605 } 606 607 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 608 609 if (preferedWireFormatInfo == null) { 610 throw new IllegalStateException("Wireformat cannot not be renegotiated."); 611 } 612 613 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion())); 614 info.setVersion(this.getVersion()); 615 616 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize())); 617 info.setMaxFrameSize(this.getMaxFrameSize()); 618 619 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 620 info.setStackTraceEnabled(this.stackTraceEnabled); 621 622 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 623 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 624 625 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 626 info.setCacheEnabled(this.cacheEnabled); 627 628 this.tightEncodingEnabled = info.isTightEncodingEnabled() 629 && preferedWireFormatInfo.isTightEncodingEnabled(); 630 info.setTightEncodingEnabled(this.tightEncodingEnabled); 631 632 this.sizePrefixDisabled = info.isSizePrefixDisabled() 633 && preferedWireFormatInfo.isSizePrefixDisabled(); 634 info.setSizePrefixDisabled(this.sizePrefixDisabled); 635 636 if (cacheEnabled) { 637 638 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 639 info.setCacheSize(size); 640 641 if (size == 0) { 642 size = MARSHAL_CACHE_SIZE; 643 } 644 645 marshallCache = new DataStructure[size]; 646 unmarshallCache = new DataStructure[size]; 647 nextMarshallCacheIndex = 0; 648 nextMarshallCacheEvictionIndex = 0; 649 marshallCacheMap = new HashMap<DataStructure, Short>(); 650 } else { 651 marshallCache = null; 652 unmarshallCache = null; 653 nextMarshallCacheIndex = 0; 654 nextMarshallCacheEvictionIndex = 0; 655 marshallCacheMap = null; 656 } 657 658 } 659 660 protected int min(int version1, int version2) { 661 if (version1 < version2 && version1 > 0 || version2 <= 0) { 662 return version1; 663 } 664 return version2; 665 } 666 667 protected long min(long version1, long version2) { 668 if (version1 < version2 && version1 > 0 || version2 <= 0) { 669 return version1; 670 } 671 return version2; 672 } 673}