001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.command; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.io.OutputStream; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.zip.DeflaterOutputStream; 027 028import javax.jms.JMSException; 029 030import org.apache.activemq.ActiveMQConnection; 031import org.apache.activemq.advisory.AdvisorySupport; 032import org.apache.activemq.broker.region.MessageReference; 033import org.apache.activemq.usage.MemoryUsage; 034import org.apache.activemq.util.ByteArrayInputStream; 035import org.apache.activemq.util.ByteArrayOutputStream; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.util.MarshallingSupport; 038import org.apache.activemq.wireformat.WireFormat; 039import org.fusesource.hawtbuf.UTF8Buffer; 040 041/** 042 * Represents an ActiveMQ message 043 * 044 * @openwire:marshaller 045 * 046 */ 047public abstract class Message extends BaseCommand implements MarshallAware, MessageReference { 048 public static final String ORIGINAL_EXPIRATION = "originalExpiration"; 049 050 /** 051 * The default minimum amount of memory a message is assumed to use 052 */ 053 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024; 054 055 protected MessageId messageId; 056 protected ActiveMQDestination originalDestination; 057 protected TransactionId originalTransactionId; 058 059 protected ProducerId producerId; 060 protected ActiveMQDestination destination; 061 protected TransactionId transactionId; 062 063 protected long expiration; 064 protected long timestamp; 065 protected long arrival; 066 protected long brokerInTime; 067 protected long brokerOutTime; 068 protected String correlationId; 069 protected ActiveMQDestination replyTo; 070 protected boolean persistent; 071 protected String type; 072 protected byte priority; 073 protected String groupID; 074 protected int groupSequence; 075 protected ConsumerId targetConsumerId; 076 protected boolean compressed; 077 protected String userID; 078 079 protected ByteSequence content; 080 protected ByteSequence marshalledProperties; 081 protected DataStructure dataStructure; 082 protected int redeliveryCounter; 083 084 protected int size; 085 protected Map<String, Object> properties; 086 protected boolean readOnlyProperties; 087 protected boolean readOnlyBody; 088 protected transient boolean recievedByDFBridge; 089 protected boolean droppable; 090 protected boolean jmsXGroupFirstForConsumer; 091 092 private transient short referenceCount; 093 private transient ActiveMQConnection connection; 094 transient MessageDestination regionDestination; 095 transient MemoryUsage memoryUsage; 096 097 private BrokerId[] brokerPath; 098 private BrokerId[] cluster; 099 100 public static interface MessageDestination { 101 int getMinimumMessageSize(); 102 MemoryUsage getMemoryUsage(); 103 } 104 105 public abstract Message copy(); 106 public abstract void clearBody() throws JMSException; 107 public abstract void storeContent(); 108 public abstract void storeContentAndClear(); 109 110 // useful to reduce the memory footprint of a persisted message 111 public void clearMarshalledState() throws JMSException { 112 properties = null; 113 } 114 115 protected void copy(Message copy) { 116 super.copy(copy); 117 copy.producerId = producerId; 118 copy.transactionId = transactionId; 119 copy.destination = destination; 120 copy.messageId = messageId != null ? messageId.copy() : null; 121 copy.originalDestination = originalDestination; 122 copy.originalTransactionId = originalTransactionId; 123 copy.expiration = expiration; 124 copy.timestamp = timestamp; 125 copy.correlationId = correlationId; 126 copy.replyTo = replyTo; 127 copy.persistent = persistent; 128 copy.redeliveryCounter = redeliveryCounter; 129 copy.type = type; 130 copy.priority = priority; 131 copy.size = size; 132 copy.groupID = groupID; 133 copy.userID = userID; 134 copy.groupSequence = groupSequence; 135 136 if (properties != null) { 137 copy.properties = new HashMap<String, Object>(properties); 138 139 // The new message hasn't expired, so remove this feild. 140 copy.properties.remove(ORIGINAL_EXPIRATION); 141 } else { 142 copy.properties = properties; 143 } 144 145 copy.content = copyByteSequence(content); 146 copy.marshalledProperties = copyByteSequence(marshalledProperties); 147 copy.dataStructure = dataStructure; 148 copy.readOnlyProperties = readOnlyProperties; 149 copy.readOnlyBody = readOnlyBody; 150 copy.compressed = compressed; 151 copy.recievedByDFBridge = recievedByDFBridge; 152 153 copy.arrival = arrival; 154 copy.connection = connection; 155 copy.regionDestination = regionDestination; 156 copy.brokerInTime = brokerInTime; 157 copy.brokerOutTime = brokerOutTime; 158 copy.memoryUsage=this.memoryUsage; 159 copy.brokerPath = brokerPath; 160 copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer; 161 162 // lets not copy the following fields 163 // copy.targetConsumerId = targetConsumerId; 164 // copy.referenceCount = referenceCount; 165 } 166 167 private ByteSequence copyByteSequence(ByteSequence content) { 168 if (content != null) { 169 return new ByteSequence(content.getData(), content.getOffset(), content.getLength()); 170 } 171 return null; 172 } 173 174 public Object getProperty(String name) throws IOException { 175 if (properties == null) { 176 if (marshalledProperties == null) { 177 return null; 178 } 179 properties = unmarsallProperties(marshalledProperties); 180 } 181 Object result = properties.get(name); 182 if (result instanceof UTF8Buffer) { 183 result = result.toString(); 184 } 185 186 return result; 187 } 188 189 @SuppressWarnings("unchecked") 190 public Map<String, Object> getProperties() throws IOException { 191 if (properties == null) { 192 if (marshalledProperties == null) { 193 return Collections.EMPTY_MAP; 194 } 195 properties = unmarsallProperties(marshalledProperties); 196 } 197 return Collections.unmodifiableMap(properties); 198 } 199 200 public void clearProperties() { 201 marshalledProperties = null; 202 properties = null; 203 } 204 205 public void setProperty(String name, Object value) throws IOException { 206 lazyCreateProperties(); 207 properties.put(name, value); 208 } 209 210 public void removeProperty(String name) throws IOException { 211 lazyCreateProperties(); 212 properties.remove(name); 213 } 214 215 protected void lazyCreateProperties() throws IOException { 216 if (properties == null) { 217 if (marshalledProperties == null) { 218 properties = new HashMap<String, Object>(); 219 } else { 220 properties = unmarsallProperties(marshalledProperties); 221 marshalledProperties = null; 222 } 223 } else { 224 marshalledProperties = null; 225 } 226 } 227 228 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 229 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties))); 230 } 231 232 @Override 233 public void beforeMarshall(WireFormat wireFormat) throws IOException { 234 // Need to marshal the properties. 235 if (marshalledProperties == null && properties != null) { 236 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 237 DataOutputStream os = new DataOutputStream(baos); 238 MarshallingSupport.marshalPrimitiveMap(properties, os); 239 os.close(); 240 marshalledProperties = baos.toByteSequence(); 241 } 242 } 243 244 @Override 245 public void afterMarshall(WireFormat wireFormat) throws IOException { 246 } 247 248 @Override 249 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 250 } 251 252 @Override 253 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 254 } 255 256 // ///////////////////////////////////////////////////////////////// 257 // 258 // Simple Field accessors 259 // 260 // ///////////////////////////////////////////////////////////////// 261 262 /** 263 * @openwire:property version=1 cache=true 264 */ 265 public ProducerId getProducerId() { 266 return producerId; 267 } 268 269 public void setProducerId(ProducerId producerId) { 270 this.producerId = producerId; 271 } 272 273 /** 274 * @openwire:property version=1 cache=true 275 */ 276 public ActiveMQDestination getDestination() { 277 return destination; 278 } 279 280 public void setDestination(ActiveMQDestination destination) { 281 this.destination = destination; 282 } 283 284 /** 285 * @openwire:property version=1 cache=true 286 */ 287 public TransactionId getTransactionId() { 288 return transactionId; 289 } 290 291 public void setTransactionId(TransactionId transactionId) { 292 this.transactionId = transactionId; 293 } 294 295 public boolean isInTransaction() { 296 return transactionId != null; 297 } 298 299 /** 300 * @openwire:property version=1 cache=true 301 */ 302 public ActiveMQDestination getOriginalDestination() { 303 return originalDestination; 304 } 305 306 public void setOriginalDestination(ActiveMQDestination destination) { 307 this.originalDestination = destination; 308 } 309 310 /** 311 * @openwire:property version=1 312 */ 313 @Override 314 public MessageId getMessageId() { 315 return messageId; 316 } 317 318 public void setMessageId(MessageId messageId) { 319 this.messageId = messageId; 320 } 321 322 /** 323 * @openwire:property version=1 cache=true 324 */ 325 public TransactionId getOriginalTransactionId() { 326 return originalTransactionId; 327 } 328 329 public void setOriginalTransactionId(TransactionId transactionId) { 330 this.originalTransactionId = transactionId; 331 } 332 333 /** 334 * @openwire:property version=1 335 */ 336 @Override 337 public String getGroupID() { 338 return groupID; 339 } 340 341 public void setGroupID(String groupID) { 342 this.groupID = groupID; 343 } 344 345 /** 346 * @openwire:property version=1 347 */ 348 @Override 349 public int getGroupSequence() { 350 return groupSequence; 351 } 352 353 public void setGroupSequence(int groupSequence) { 354 this.groupSequence = groupSequence; 355 } 356 357 /** 358 * @openwire:property version=1 359 */ 360 public String getCorrelationId() { 361 return correlationId; 362 } 363 364 public void setCorrelationId(String correlationId) { 365 this.correlationId = correlationId; 366 } 367 368 /** 369 * @openwire:property version=1 370 */ 371 @Override 372 public boolean isPersistent() { 373 return persistent; 374 } 375 376 public void setPersistent(boolean deliveryMode) { 377 this.persistent = deliveryMode; 378 } 379 380 /** 381 * @openwire:property version=1 382 */ 383 @Override 384 public long getExpiration() { 385 return expiration; 386 } 387 388 public void setExpiration(long expiration) { 389 this.expiration = expiration; 390 } 391 392 /** 393 * @openwire:property version=1 394 */ 395 public byte getPriority() { 396 return priority; 397 } 398 399 public void setPriority(byte priority) { 400 if (priority < 0) { 401 this.priority = 0; 402 } else if (priority > 9) { 403 this.priority = 9; 404 } else { 405 this.priority = priority; 406 } 407 } 408 409 /** 410 * @openwire:property version=1 411 */ 412 public ActiveMQDestination getReplyTo() { 413 return replyTo; 414 } 415 416 public void setReplyTo(ActiveMQDestination replyTo) { 417 this.replyTo = replyTo; 418 } 419 420 /** 421 * @openwire:property version=1 422 */ 423 public long getTimestamp() { 424 return timestamp; 425 } 426 427 public void setTimestamp(long timestamp) { 428 this.timestamp = timestamp; 429 } 430 431 /** 432 * @openwire:property version=1 433 */ 434 public String getType() { 435 return type; 436 } 437 438 public void setType(String type) { 439 this.type = type; 440 } 441 442 /** 443 * @openwire:property version=1 444 */ 445 public ByteSequence getContent() { 446 return content; 447 } 448 449 public void setContent(ByteSequence content) { 450 this.content = content; 451 } 452 453 /** 454 * @openwire:property version=1 455 */ 456 public ByteSequence getMarshalledProperties() { 457 return marshalledProperties; 458 } 459 460 public void setMarshalledProperties(ByteSequence marshalledProperties) { 461 this.marshalledProperties = marshalledProperties; 462 } 463 464 /** 465 * @openwire:property version=1 466 */ 467 public DataStructure getDataStructure() { 468 return dataStructure; 469 } 470 471 public void setDataStructure(DataStructure data) { 472 this.dataStructure = data; 473 } 474 475 /** 476 * Can be used to route the message to a specific consumer. Should be null 477 * to allow the broker use normal JMS routing semantics. If the target 478 * consumer id is an active consumer on the broker, the message is dropped. 479 * Used by the AdvisoryBroker to replay advisory messages to a specific 480 * consumer. 481 * 482 * @openwire:property version=1 cache=true 483 */ 484 @Override 485 public ConsumerId getTargetConsumerId() { 486 return targetConsumerId; 487 } 488 489 public void setTargetConsumerId(ConsumerId targetConsumerId) { 490 this.targetConsumerId = targetConsumerId; 491 } 492 493 @Override 494 public boolean isExpired() { 495 long expireTime = getExpiration(); 496 return expireTime > 0 && System.currentTimeMillis() > expireTime; 497 } 498 499 @Override 500 public boolean isAdvisory() { 501 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 502 } 503 504 /** 505 * @openwire:property version=1 506 */ 507 public boolean isCompressed() { 508 return compressed; 509 } 510 511 public void setCompressed(boolean compressed) { 512 this.compressed = compressed; 513 } 514 515 public boolean isRedelivered() { 516 return redeliveryCounter > 0; 517 } 518 519 public void setRedelivered(boolean redelivered) { 520 if (redelivered) { 521 if (!isRedelivered()) { 522 setRedeliveryCounter(1); 523 } 524 } else { 525 if (isRedelivered()) { 526 setRedeliveryCounter(0); 527 } 528 } 529 } 530 531 @Override 532 public void incrementRedeliveryCounter() { 533 redeliveryCounter++; 534 } 535 536 /** 537 * @openwire:property version=1 538 */ 539 @Override 540 public int getRedeliveryCounter() { 541 return redeliveryCounter; 542 } 543 544 public void setRedeliveryCounter(int deliveryCounter) { 545 this.redeliveryCounter = deliveryCounter; 546 } 547 548 /** 549 * The route of brokers the command has moved through. 550 * 551 * @openwire:property version=1 cache=true 552 */ 553 public BrokerId[] getBrokerPath() { 554 return brokerPath; 555 } 556 557 public void setBrokerPath(BrokerId[] brokerPath) { 558 this.brokerPath = brokerPath; 559 } 560 561 public boolean isReadOnlyProperties() { 562 return readOnlyProperties; 563 } 564 565 public void setReadOnlyProperties(boolean readOnlyProperties) { 566 this.readOnlyProperties = readOnlyProperties; 567 } 568 569 public boolean isReadOnlyBody() { 570 return readOnlyBody; 571 } 572 573 public void setReadOnlyBody(boolean readOnlyBody) { 574 this.readOnlyBody = readOnlyBody; 575 } 576 577 public ActiveMQConnection getConnection() { 578 return this.connection; 579 } 580 581 public void setConnection(ActiveMQConnection connection) { 582 this.connection = connection; 583 } 584 585 /** 586 * Used to schedule the arrival time of a message to a broker. The broker 587 * will not dispatch a message to a consumer until it's arrival time has 588 * elapsed. 589 * 590 * @openwire:property version=1 591 */ 592 public long getArrival() { 593 return arrival; 594 } 595 596 public void setArrival(long arrival) { 597 this.arrival = arrival; 598 } 599 600 /** 601 * Only set by the broker and defines the userID of the producer connection 602 * who sent this message. This is an optional field, it needs to be enabled 603 * on the broker to have this field populated. 604 * 605 * @openwire:property version=1 606 */ 607 public String getUserID() { 608 return userID; 609 } 610 611 public void setUserID(String jmsxUserID) { 612 this.userID = jmsxUserID; 613 } 614 615 @Override 616 public int getReferenceCount() { 617 return referenceCount; 618 } 619 620 @Override 621 public Message getMessageHardRef() { 622 return this; 623 } 624 625 @Override 626 public Message getMessage() { 627 return this; 628 } 629 630 public void setRegionDestination(MessageDestination destination) { 631 this.regionDestination = destination; 632 if(this.memoryUsage==null) { 633 this.memoryUsage=destination.getMemoryUsage(); 634 } 635 } 636 637 @Override 638 public MessageDestination getRegionDestination() { 639 return regionDestination; 640 } 641 642 public MemoryUsage getMemoryUsage() { 643 return this.memoryUsage; 644 } 645 646 public void setMemoryUsage(MemoryUsage usage) { 647 this.memoryUsage=usage; 648 } 649 650 @Override 651 public boolean isMarshallAware() { 652 return true; 653 } 654 655 @Override 656 public int incrementReferenceCount() { 657 int rc; 658 int size; 659 synchronized (this) { 660 rc = ++referenceCount; 661 size = getSize(); 662 } 663 664 if (rc == 1 && getMemoryUsage() != null) { 665 getMemoryUsage().increaseUsage(size); 666 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 667 668 } 669 670 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 671 return rc; 672 } 673 674 @Override 675 public int decrementReferenceCount() { 676 int rc; 677 int size; 678 synchronized (this) { 679 rc = --referenceCount; 680 size = getSize(); 681 } 682 683 if (rc == 0 && getMemoryUsage() != null) { 684 getMemoryUsage().decreaseUsage(size); 685 //Thread.dumpStack(); 686 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage()); 687 } 688 689 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc); 690 691 return rc; 692 } 693 694 @Override 695 public int getSize() { 696 int minimumMessageSize = getMinimumMessageSize(); 697 if (size < minimumMessageSize || size == 0) { 698 size = minimumMessageSize; 699 if (marshalledProperties != null) { 700 size += marshalledProperties.getLength(); 701 } 702 if (content != null) { 703 size += content.getLength(); 704 } 705 } 706 return size; 707 } 708 709 protected int getMinimumMessageSize() { 710 int result = DEFAULT_MINIMUM_MESSAGE_SIZE; 711 //let destination override 712 MessageDestination dest = regionDestination; 713 if (dest != null) { 714 result=dest.getMinimumMessageSize(); 715 } 716 return result; 717 } 718 719 /** 720 * @openwire:property version=1 721 * @return Returns the recievedByDFBridge. 722 */ 723 public boolean isRecievedByDFBridge() { 724 return recievedByDFBridge; 725 } 726 727 /** 728 * @param recievedByDFBridge The recievedByDFBridge to set. 729 */ 730 public void setRecievedByDFBridge(boolean recievedByDFBridge) { 731 this.recievedByDFBridge = recievedByDFBridge; 732 } 733 734 public void onMessageRolledBack() { 735 incrementRedeliveryCounter(); 736 } 737 738 /** 739 * @openwire:property version=2 cache=true 740 */ 741 public boolean isDroppable() { 742 return droppable; 743 } 744 745 public void setDroppable(boolean droppable) { 746 this.droppable = droppable; 747 } 748 749 /** 750 * If a message is stored in multiple nodes on a cluster, all the cluster 751 * members will be listed here. Otherwise, it will be null. 752 * 753 * @openwire:property version=3 cache=true 754 */ 755 public BrokerId[] getCluster() { 756 return cluster; 757 } 758 759 public void setCluster(BrokerId[] cluster) { 760 this.cluster = cluster; 761 } 762 763 @Override 764 public boolean isMessage() { 765 return true; 766 } 767 768 /** 769 * @openwire:property version=3 770 */ 771 public long getBrokerInTime() { 772 return this.brokerInTime; 773 } 774 775 public void setBrokerInTime(long brokerInTime) { 776 this.brokerInTime = brokerInTime; 777 } 778 779 /** 780 * @openwire:property version=3 781 */ 782 public long getBrokerOutTime() { 783 return this.brokerOutTime; 784 } 785 786 public void setBrokerOutTime(long brokerOutTime) { 787 this.brokerOutTime = brokerOutTime; 788 } 789 790 @Override 791 public boolean isDropped() { 792 return false; 793 } 794 795 /** 796 * @openwire:property version=10 797 */ 798 public boolean isJMSXGroupFirstForConsumer() { 799 return jmsXGroupFirstForConsumer; 800 } 801 802 public void setJMSXGroupFirstForConsumer(boolean val) { 803 jmsXGroupFirstForConsumer = val; 804 } 805 806 public void compress() throws IOException { 807 if (!isCompressed()) { 808 storeContent(); 809 if (!isCompressed() && getContent() != null) { 810 doCompress(); 811 } 812 } 813 } 814 815 protected void doCompress() throws IOException { 816 compressed = true; 817 ByteSequence bytes = getContent(); 818 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 819 OutputStream os = new DeflaterOutputStream(bytesOut); 820 os.write(bytes.data, bytes.offset, bytes.length); 821 os.close(); 822 setContent(bytesOut.toByteSequence()); 823 } 824 825 @Override 826 public String toString() { 827 return toString(null); 828 } 829 830 @Override 831 public String toString(Map<String, Object>overrideFields) { 832 try { 833 getProperties(); 834 } catch (IOException e) { 835 } 836 return super.toString(overrideFields); 837 } 838}