001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.security.AccessController; 023import java.security.PrivilegedAction; 024import java.util.*; 025import java.util.concurrent.RejectedExecutionHandler; 026 027import javax.jms.Connection; 028import javax.jms.ConnectionFactory; 029import javax.jms.ExceptionListener; 030import javax.jms.JMSException; 031import javax.jms.QueueConnection; 032import javax.jms.QueueConnectionFactory; 033import javax.jms.TopicConnection; 034import javax.jms.TopicConnectionFactory; 035import javax.naming.Context; 036 037import org.apache.activemq.blob.BlobTransferPolicy; 038import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 039import org.apache.activemq.jndi.JNDIBaseStorable; 040import org.apache.activemq.management.JMSStatsImpl; 041import org.apache.activemq.management.StatsCapable; 042import org.apache.activemq.management.StatsImpl; 043import org.apache.activemq.thread.TaskRunnerFactory; 044import org.apache.activemq.transport.Transport; 045import org.apache.activemq.transport.TransportFactory; 046import org.apache.activemq.transport.TransportListener; 047import org.apache.activemq.util.*; 048import org.apache.activemq.util.URISupport.CompositeData; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A ConnectionFactory is an an Administered object, and is used for creating 054 * Connections. <p/> This class also implements QueueConnectionFactory and 055 * TopicConnectionFactory. You can use this connection to create both 056 * QueueConnections and TopicConnections. 057 * 058 * 059 * @see javax.jms.ConnectionFactory 060 */ 061public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 062 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class); 063 private static final String DEFAULT_BROKER_HOST; 064 private static final int DEFAULT_BROKER_PORT; 065 static{ 066 String host = null; 067 String port = null; 068 try { 069 host = AccessController.doPrivileged(new PrivilegedAction<String>() { 070 @Override 071 public String run() { 072 String result = System.getProperty("org.apache.activemq.AMQ_HOST"); 073 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result; 074 return result; 075 } 076 }); 077 port = AccessController.doPrivileged(new PrivilegedAction<String>() { 078 @Override 079 public String run() { 080 String result = System.getProperty("org.apache.activemq.AMQ_PORT"); 081 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result; 082 return result; 083 } 084 }); 085 }catch(Throwable e){ 086 LOG.debug("Failed to look up System properties for host and port",e); 087 } 088 host = (host == null || host.isEmpty()) ? "localhost" : host; 089 port = (port == null || port.isEmpty()) ? "61616" : port; 090 DEFAULT_BROKER_HOST = host; 091 DEFAULT_BROKER_PORT = Integer.parseInt(port); 092 } 093 094 095 public static final String DEFAULT_BROKER_BIND_URL; 096 097 static{ 098 final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 099 String bindURL = null; 100 101 try { 102 bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() { 103 @Override 104 public String run() { 105 String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL"); 106 result = (result==null||result.isEmpty()) ? System.getProperty("BROKER_BIND_URL",defaultURL) : result; 107 return result; 108 } 109 }); 110 }catch(Throwable e){ 111 LOG.debug("Failed to look up System properties for host and port",e); 112 } 113 bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; 114 DEFAULT_BROKER_BIND_URL = bindURL; 115 } 116 117 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 118 public static final String DEFAULT_USER = null; 119 public static final String DEFAULT_PASSWORD = null; 120 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 121 122 protected URI brokerURL; 123 protected String userName; 124 protected String password; 125 protected String clientID; 126 protected boolean dispatchAsync=true; 127 protected boolean alwaysSessionAsync=true; 128 129 JMSStatsImpl factoryStats = new JMSStatsImpl(); 130 131 private IdGenerator clientIdGenerator; 132 private String clientIDPrefix; 133 private IdGenerator connectionIdGenerator; 134 private String connectionIDPrefix; 135 136 // client policies 137 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 138 private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); 139 { 140 redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); 141 } 142 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 143 private MessageTransformer transformer; 144 145 private boolean disableTimeStampsByDefault; 146 private boolean optimizedMessageDispatch = true; 147 private long optimizeAcknowledgeTimeOut = 300; 148 private long optimizedAckScheduledAckInterval = 0; 149 private boolean copyMessageOnSend = true; 150 private boolean useCompression; 151 private boolean objectMessageSerializationDefered; 152 private boolean useAsyncSend; 153 private boolean optimizeAcknowledge; 154 private int closeTimeout = 15000; 155 private boolean useRetroactiveConsumer; 156 private boolean exclusiveConsumer; 157 private boolean nestedMapAndListEnabled = true; 158 private boolean alwaysSyncSend; 159 private boolean watchTopicAdvisories = true; 160 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 161 private long warnAboutUnstartedConnectionTimeout = 500L; 162 private int sendTimeout = 0; 163 private boolean sendAcksAsync=true; 164 private TransportListener transportListener; 165 private ExceptionListener exceptionListener; 166 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 167 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 168 private boolean useDedicatedTaskRunner; 169 private long consumerFailoverRedeliveryWaitPeriod = 0; 170 private boolean checkForDuplicates = true; 171 private ClientInternalExceptionListener clientInternalExceptionListener; 172 private boolean messagePrioritySupported = false; 173 private boolean transactedIndividualAck = false; 174 private boolean nonBlockingRedelivery = false; 175 private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; 176 private TaskRunnerFactory sessionTaskRunner; 177 private RejectedExecutionHandler rejectedTaskHandler = null; 178 protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class 179 private boolean rmIdFromConnectionId = false; 180 private boolean consumerExpiryCheckEnabled = true; 181 private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages); 182 private boolean trustAllPackages = false; 183 184 // ///////////////////////////////////////////// 185 // 186 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 187 // 188 // ///////////////////////////////////////////// 189 190 public ActiveMQConnectionFactory() { 191 this(DEFAULT_BROKER_URL); 192 } 193 194 public ActiveMQConnectionFactory(String brokerURL) { 195 this(createURI(brokerURL)); 196 } 197 198 public ActiveMQConnectionFactory(URI brokerURL) { 199 setBrokerURL(brokerURL.toString()); 200 } 201 202 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 203 setUserName(userName); 204 setPassword(password); 205 setBrokerURL(brokerURL.toString()); 206 } 207 208 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 209 setUserName(userName); 210 setPassword(password); 211 setBrokerURL(brokerURL); 212 } 213 214 /** 215 * Returns a copy of the given connection factory 216 */ 217 public ActiveMQConnectionFactory copy() { 218 try { 219 return (ActiveMQConnectionFactory)super.clone(); 220 } catch (CloneNotSupportedException e) { 221 throw new RuntimeException("This should never happen: " + e, e); 222 } 223 } 224 225 /*boolean* 226 * @param brokerURL 227 * @return 228 * @throws URISyntaxException 229 */ 230 private static URI createURI(String brokerURL) { 231 try { 232 return new URI(brokerURL); 233 } catch (URISyntaxException e) { 234 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 235 } 236 } 237 238 /** 239 * @return Returns the Connection. 240 */ 241 @Override 242 public Connection createConnection() throws JMSException { 243 return createActiveMQConnection(); 244 } 245 246 /** 247 * @return Returns the Connection. 248 */ 249 @Override 250 public Connection createConnection(String userName, String password) throws JMSException { 251 return createActiveMQConnection(userName, password); 252 } 253 254 /** 255 * @return Returns the QueueConnection. 256 * @throws JMSException 257 */ 258 @Override 259 public QueueConnection createQueueConnection() throws JMSException { 260 return createActiveMQConnection().enforceQueueOnlyConnection(); 261 } 262 263 /** 264 * @return Returns the QueueConnection. 265 */ 266 @Override 267 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 268 return createActiveMQConnection(userName, password).enforceQueueOnlyConnection(); 269 } 270 271 /** 272 * @return Returns the TopicConnection. 273 * @throws JMSException 274 */ 275 @Override 276 public TopicConnection createTopicConnection() throws JMSException { 277 return createActiveMQConnection(); 278 } 279 280 /** 281 * @return Returns the TopicConnection. 282 */ 283 @Override 284 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 285 return createActiveMQConnection(userName, password); 286 } 287 288 /** 289 * @return the StatsImpl associated with this ConnectionFactory. 290 */ 291 @Override 292 public StatsImpl getStats() { 293 return this.factoryStats; 294 } 295 296 // ///////////////////////////////////////////// 297 // 298 // Implementation methods. 299 // 300 // ///////////////////////////////////////////// 301 302 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 303 return createActiveMQConnection(userName, password); 304 } 305 306 /** 307 * Creates a Transport based on this object's connection settings. Separated 308 * from createActiveMQConnection to allow for subclasses to override. 309 * 310 * @return The newly created Transport. 311 * @throws JMSException If unable to create trasnport. 312 */ 313 protected Transport createTransport() throws JMSException { 314 try { 315 URI connectBrokerUL = brokerURL; 316 String scheme = brokerURL.getScheme(); 317 if (scheme == null) { 318 throw new IOException("Transport not scheme specified: [" + brokerURL + "]"); 319 } 320 if (scheme.equals("auto")) { 321 connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp")); 322 } else if (scheme.equals("auto+ssl")) { 323 connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl")); 324 } else if (scheme.equals("auto+nio")) { 325 connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio")); 326 } else if (scheme.equals("auto+nio+ssl")) { 327 connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl")); 328 } 329 330 return TransportFactory.connect(connectBrokerUL); 331 } catch (Exception e) { 332 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 333 } 334 } 335 336 /** 337 * @return Returns the Connection. 338 */ 339 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 340 if (brokerURL == null) { 341 throw new ConfigurationException("brokerURL not set."); 342 } 343 ActiveMQConnection connection = null; 344 try { 345 Transport transport = createTransport(); 346 connection = createActiveMQConnection(transport, factoryStats); 347 348 connection.setUserName(userName); 349 connection.setPassword(password); 350 351 configureConnection(connection); 352 353 transport.start(); 354 355 if (clientID != null) { 356 connection.setDefaultClientID(clientID); 357 } 358 359 return connection; 360 } catch (JMSException e) { 361 // Clean up! 362 try { 363 connection.close(); 364 } catch (Throwable ignore) { 365 } 366 throw e; 367 } catch (Exception e) { 368 // Clean up! 369 try { 370 connection.close(); 371 } catch (Throwable ignore) { 372 } 373 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 374 } 375 } 376 377 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 378 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), 379 getConnectionIdGenerator(), stats); 380 return connection; 381 } 382 383 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 384 connection.setPrefetchPolicy(getPrefetchPolicy()); 385 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 386 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 387 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 388 connection.setUseCompression(isUseCompression()); 389 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 390 connection.setDispatchAsync(isDispatchAsync()); 391 connection.setUseAsyncSend(isUseAsyncSend()); 392 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 393 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 394 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 395 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); 396 connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval()); 397 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 398 connection.setExclusiveConsumer(isExclusiveConsumer()); 399 connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); 400 connection.setTransformer(getTransformer()); 401 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 402 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 403 connection.setProducerWindowSize(getProducerWindowSize()); 404 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 405 connection.setSendTimeout(getSendTimeout()); 406 connection.setCloseTimeout(getCloseTimeout()); 407 connection.setSendAcksAsync(isSendAcksAsync()); 408 connection.setAuditDepth(getAuditDepth()); 409 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 410 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 411 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 412 connection.setCheckForDuplicates(isCheckForDuplicates()); 413 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 414 connection.setTransactedIndividualAck(isTransactedIndividualAck()); 415 connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); 416 connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); 417 connection.setSessionTaskRunner(getSessionTaskRunner()); 418 connection.setRejectedTaskHandler(getRejectedTaskHandler()); 419 connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); 420 connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); 421 connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); 422 connection.setTrustedPackages(getTrustedPackages()); 423 connection.setTrustAllPackages(isTrustAllPackages()); 424 if (transportListener != null) { 425 connection.addTransportListener(transportListener); 426 } 427 if (exceptionListener != null) { 428 connection.setExceptionListener(exceptionListener); 429 } 430 if (clientInternalExceptionListener != null) { 431 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 432 } 433 } 434 435 // ///////////////////////////////////////////// 436 // 437 // Property Accessors 438 // 439 // ///////////////////////////////////////////// 440 441 public String getBrokerURL() { 442 return brokerURL == null ? null : brokerURL.toString(); 443 } 444 445 /** 446 * Sets the <a 447 * href="http://activemq.apache.org/configuring-transports.html">connection 448 * URL</a> used to connect to the ActiveMQ broker. 449 */ 450 public void setBrokerURL(String brokerURL) { 451 this.brokerURL = createURI(brokerURL); 452 453 // Use all the properties prefixed with 'jms.' to set the connection 454 // factory 455 // options. 456 if (this.brokerURL.getQuery() != null) { 457 // It might be a standard URI or... 458 try { 459 460 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery()); 461 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); 462 if (buildFromMap(jmsOptionsMap)) { 463 if (!jmsOptionsMap.isEmpty()) { 464 String msg = "There are " + jmsOptionsMap.size() 465 + " jms options that couldn't be set on the ConnectionFactory." 466 + " Check the options are spelled correctly." 467 + " Unknown parameters=[" + jmsOptionsMap + "]." 468 + " This connection factory cannot be started."; 469 throw new IllegalArgumentException(msg); 470 } 471 472 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 473 } 474 475 } catch (URISyntaxException e) { 476 } 477 478 } else { 479 480 // It might be a composite URI. 481 try { 482 CompositeData data = URISupport.parseComposite(this.brokerURL); 483 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); 484 if (buildFromMap(jmsOptionsMap)) { 485 if (!jmsOptionsMap.isEmpty()) { 486 String msg = "There are " + jmsOptionsMap.size() 487 + " jms options that couldn't be set on the ConnectionFactory." 488 + " Check the options are spelled correctly." 489 + " Unknown parameters=[" + jmsOptionsMap + "]." 490 + " This connection factory cannot be started."; 491 throw new IllegalArgumentException(msg); 492 } 493 494 this.brokerURL = data.toURI(); 495 } 496 } catch (URISyntaxException e) { 497 } 498 } 499 } 500 501 public String getClientID() { 502 return clientID; 503 } 504 505 /** 506 * Sets the JMS clientID to use for the created connection. Note that this 507 * can only be used by one connection at once so generally its a better idea 508 * to set the clientID on a Connection 509 */ 510 public void setClientID(String clientID) { 511 this.clientID = clientID; 512 } 513 514 public boolean isCopyMessageOnSend() { 515 return copyMessageOnSend; 516 } 517 518 /** 519 * Should a JMS message be copied to a new JMS Message object as part of the 520 * send() method in JMS. This is enabled by default to be compliant with the 521 * JMS specification. You can disable it if you do not mutate JMS messages 522 * after they are sent for a performance boost 523 */ 524 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 525 this.copyMessageOnSend = copyMessageOnSend; 526 } 527 528 public boolean isDisableTimeStampsByDefault() { 529 return disableTimeStampsByDefault; 530 } 531 532 /** 533 * Sets whether or not timestamps on messages should be disabled or not. If 534 * you disable them it adds a small performance boost. 535 */ 536 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 537 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 538 } 539 540 public boolean isOptimizedMessageDispatch() { 541 return optimizedMessageDispatch; 542 } 543 544 /** 545 * If this flag is set then an larger prefetch limit is used - only 546 * applicable for durable topic subscribers. 547 */ 548 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 549 this.optimizedMessageDispatch = optimizedMessageDispatch; 550 } 551 552 public String getPassword() { 553 return password; 554 } 555 556 /** 557 * Sets the JMS password used for connections created from this factory 558 */ 559 public void setPassword(String password) { 560 this.password = password; 561 } 562 563 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 564 return prefetchPolicy; 565 } 566 567 /** 568 * Sets the <a 569 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 570 * policy</a> for consumers created by this connection. 571 */ 572 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 573 this.prefetchPolicy = prefetchPolicy; 574 } 575 576 public boolean isUseAsyncSend() { 577 return useAsyncSend; 578 } 579 580 public BlobTransferPolicy getBlobTransferPolicy() { 581 return blobTransferPolicy; 582 } 583 584 /** 585 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 586 * OBjects) are transferred from producers to brokers to consumers 587 */ 588 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 589 this.blobTransferPolicy = blobTransferPolicy; 590 } 591 592 /** 593 * Forces the use of <a 594 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 595 * adds a massive performance boost; but means that the send() method will 596 * return immediately whether the message has been sent or not which could 597 * lead to message loss. 598 */ 599 public void setUseAsyncSend(boolean useAsyncSend) { 600 this.useAsyncSend = useAsyncSend; 601 } 602 603 public synchronized boolean isWatchTopicAdvisories() { 604 return watchTopicAdvisories; 605 } 606 607 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 608 this.watchTopicAdvisories = watchTopicAdvisories; 609 } 610 611 /** 612 * @return true if always sync send messages 613 */ 614 public boolean isAlwaysSyncSend() { 615 return this.alwaysSyncSend; 616 } 617 618 /** 619 * Set true if always require messages to be sync sent 620 * 621 * @param alwaysSyncSend 622 */ 623 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 624 this.alwaysSyncSend = alwaysSyncSend; 625 } 626 627 public String getUserName() { 628 return userName; 629 } 630 631 /** 632 * Sets the JMS userName used by connections created by this factory 633 */ 634 public void setUserName(String userName) { 635 this.userName = userName; 636 } 637 638 public boolean isUseRetroactiveConsumer() { 639 return useRetroactiveConsumer; 640 } 641 642 /** 643 * Sets whether or not retroactive consumers are enabled. Retroactive 644 * consumers allow non-durable topic subscribers to receive old messages 645 * that were published before the non-durable subscriber started. 646 */ 647 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 648 this.useRetroactiveConsumer = useRetroactiveConsumer; 649 } 650 651 public boolean isExclusiveConsumer() { 652 return exclusiveConsumer; 653 } 654 655 /** 656 * Enables or disables whether or not queue consumers should be exclusive or 657 * not for example to preserve ordering when not using <a 658 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 659 * 660 * @param exclusiveConsumer 661 */ 662 public void setExclusiveConsumer(boolean exclusiveConsumer) { 663 this.exclusiveConsumer = exclusiveConsumer; 664 } 665 666 public RedeliveryPolicy getRedeliveryPolicy() { 667 return redeliveryPolicyMap.getDefaultEntry(); 668 } 669 670 /** 671 * Sets the global default redelivery policy to be used when a message is delivered 672 * but the session is rolled back 673 */ 674 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 675 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 676 } 677 678 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 679 return this.redeliveryPolicyMap; 680 } 681 682 /** 683 * Sets the global redelivery policy mapping to be used when a message is delivered 684 * but the session is rolled back 685 */ 686 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 687 this.redeliveryPolicyMap = redeliveryPolicyMap; 688 } 689 690 public MessageTransformer getTransformer() { 691 return transformer; 692 } 693 694 /** 695 * @return the sendTimeout (in milliseconds) 696 */ 697 public int getSendTimeout() { 698 return sendTimeout; 699 } 700 701 /** 702 * @param sendTimeout the sendTimeout to set (in milliseconds) 703 */ 704 public void setSendTimeout(int sendTimeout) { 705 this.sendTimeout = sendTimeout; 706 } 707 708 /** 709 * @return the sendAcksAsync 710 */ 711 public boolean isSendAcksAsync() { 712 return sendAcksAsync; 713 } 714 715 /** 716 * @param sendAcksAsync the sendAcksAsync to set 717 */ 718 public void setSendAcksAsync(boolean sendAcksAsync) { 719 this.sendAcksAsync = sendAcksAsync; 720 } 721 722 /** 723 * @return the messagePrioritySupported 724 */ 725 public boolean isMessagePrioritySupported() { 726 return this.messagePrioritySupported; 727 } 728 729 /** 730 * @param messagePrioritySupported the messagePrioritySupported to set 731 */ 732 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 733 this.messagePrioritySupported = messagePrioritySupported; 734 } 735 736 737 /** 738 * Sets the transformer used to transform messages before they are sent on 739 * to the JMS bus or when they are received from the bus but before they are 740 * delivered to the JMS client 741 */ 742 public void setTransformer(MessageTransformer transformer) { 743 this.transformer = transformer; 744 } 745 746 @SuppressWarnings({ "unchecked", "rawtypes" }) 747 @Override 748 public void buildFromProperties(Properties properties) { 749 750 if (properties == null) { 751 properties = new Properties(); 752 } 753 754 String temp = properties.getProperty(Context.PROVIDER_URL); 755 if (temp == null || temp.length() == 0) { 756 temp = properties.getProperty("brokerURL"); 757 } 758 if (temp != null && temp.length() > 0) { 759 setBrokerURL(temp); 760 } 761 762 Map<String, Object> p = new HashMap(properties); 763 buildFromMap(p); 764 } 765 766 public boolean buildFromMap(Map<String, Object> properties) { 767 boolean rc = false; 768 769 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 770 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 771 setPrefetchPolicy(p); 772 rc = true; 773 } 774 775 RedeliveryPolicy rp = new RedeliveryPolicy(); 776 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 777 setRedeliveryPolicy(rp); 778 rc = true; 779 } 780 781 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 782 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 783 setBlobTransferPolicy(blobTransferPolicy); 784 rc = true; 785 } 786 787 rc |= IntrospectionSupport.setProperties(this, properties); 788 789 return rc; 790 } 791 792 @Override 793 public void populateProperties(Properties props) { 794 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 795 796 if (getBrokerURL() != null) { 797 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 798 props.setProperty("brokerURL", getBrokerURL()); 799 } 800 801 if (getClientID() != null) { 802 props.setProperty("clientID", getClientID()); 803 } 804 805 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 806 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 807 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 808 809 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 810 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 811 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 812 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 813 814 if (getPassword() != null) { 815 props.setProperty("password", getPassword()); 816 } 817 818 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 819 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 820 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 821 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 822 823 if (getUserName() != null) { 824 props.setProperty("userName", getUserName()); 825 } 826 827 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 828 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 829 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 830 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 831 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 832 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 833 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 834 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 835 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 836 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 837 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 838 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 839 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); 840 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); 841 props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); 842 props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); 843 props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); 844 props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); 845 props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); 846 } 847 848 public boolean isUseCompression() { 849 return useCompression; 850 } 851 852 /** 853 * Enables the use of compression of the message bodies 854 */ 855 public void setUseCompression(boolean useCompression) { 856 this.useCompression = useCompression; 857 } 858 859 public boolean isObjectMessageSerializationDefered() { 860 return objectMessageSerializationDefered; 861 } 862 863 /** 864 * When an object is set on an ObjectMessage, the JMS spec requires the 865 * object to be serialized by that set method. Enabling this flag causes the 866 * object to not get serialized. The object may subsequently get serialized 867 * if the message needs to be sent over a socket or stored to disk. 868 */ 869 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 870 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 871 } 872 873 public boolean isDispatchAsync() { 874 return dispatchAsync; 875 } 876 877 /** 878 * Enables or disables the default setting of whether or not consumers have 879 * their messages <a 880 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 881 * synchronously or asynchronously by the broker</a>. For non-durable 882 * topics for example we typically dispatch synchronously by default to 883 * minimize context switches which boost performance. However sometimes its 884 * better to go slower to ensure that a single blocked consumer socket does 885 * not block delivery to other consumers. 886 * 887 * @param asyncDispatch If true then consumers created on this connection 888 * will default to having their messages dispatched 889 * asynchronously. The default value is true. 890 */ 891 public void setDispatchAsync(boolean asyncDispatch) { 892 this.dispatchAsync = asyncDispatch; 893 } 894 895 /** 896 * @return Returns the closeTimeout. 897 */ 898 public int getCloseTimeout() { 899 return closeTimeout; 900 } 901 902 /** 903 * Sets the timeout before a close is considered complete. Normally a 904 * close() on a connection waits for confirmation from the broker; this 905 * allows that operation to timeout to save the client hanging if there is 906 * no broker 907 */ 908 public void setCloseTimeout(int closeTimeout) { 909 this.closeTimeout = closeTimeout; 910 } 911 912 /** 913 * @return Returns the alwaysSessionAsync. 914 */ 915 public boolean isAlwaysSessionAsync() { 916 return alwaysSessionAsync; 917 } 918 919 /** 920 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 921 * the Connection. However, a separate thread is always used if there is more than one session, or the session 922 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 923 * happens asynchronously. 924 */ 925 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 926 this.alwaysSessionAsync = alwaysSessionAsync; 927 } 928 929 /** 930 * @return Returns the optimizeAcknowledge. 931 */ 932 public boolean isOptimizeAcknowledge() { 933 return optimizeAcknowledge; 934 } 935 936 /** 937 * @param optimizeAcknowledge The optimizeAcknowledge to set. 938 */ 939 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 940 this.optimizeAcknowledge = optimizeAcknowledge; 941 } 942 943 /** 944 * The max time in milliseconds between optimized ack batches 945 * @param optimizeAcknowledgeTimeOut 946 */ 947 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 948 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 949 } 950 951 public long getOptimizeAcknowledgeTimeOut() { 952 return optimizeAcknowledgeTimeOut; 953 } 954 955 public boolean isNestedMapAndListEnabled() { 956 return nestedMapAndListEnabled; 957 } 958 959 /** 960 * Enables/disables whether or not Message properties and MapMessage entries 961 * support <a 962 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 963 * Structures</a> of Map and List objects 964 */ 965 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 966 this.nestedMapAndListEnabled = structuredMapsEnabled; 967 } 968 969 public String getClientIDPrefix() { 970 return clientIDPrefix; 971 } 972 973 /** 974 * Sets the prefix used by autogenerated JMS Client ID values which are used 975 * if the JMS client does not explicitly specify on. 976 * 977 * @param clientIDPrefix 978 */ 979 public void setClientIDPrefix(String clientIDPrefix) { 980 this.clientIDPrefix = clientIDPrefix; 981 } 982 983 protected synchronized IdGenerator getClientIdGenerator() { 984 if (clientIdGenerator == null) { 985 if (clientIDPrefix != null) { 986 clientIdGenerator = new IdGenerator(clientIDPrefix); 987 } else { 988 clientIdGenerator = new IdGenerator(); 989 } 990 } 991 return clientIdGenerator; 992 } 993 994 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 995 this.clientIdGenerator = clientIdGenerator; 996 } 997 998 /** 999 * Sets the prefix used by connection id generator 1000 * @param connectionIDPrefix 1001 */ 1002 public void setConnectionIDPrefix(String connectionIDPrefix) { 1003 this.connectionIDPrefix = connectionIDPrefix; 1004 } 1005 1006 protected synchronized IdGenerator getConnectionIdGenerator() { 1007 if (connectionIdGenerator == null) { 1008 if (connectionIDPrefix != null) { 1009 connectionIdGenerator = new IdGenerator(connectionIDPrefix); 1010 } else { 1011 connectionIdGenerator = new IdGenerator(); 1012 } 1013 } 1014 return connectionIdGenerator; 1015 } 1016 1017 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { 1018 this.connectionIdGenerator = connectionIdGenerator; 1019 } 1020 1021 /** 1022 * @return the statsEnabled 1023 */ 1024 public boolean isStatsEnabled() { 1025 return this.factoryStats.isEnabled(); 1026 } 1027 1028 /** 1029 * @param statsEnabled the statsEnabled to set 1030 */ 1031 public void setStatsEnabled(boolean statsEnabled) { 1032 this.factoryStats.setEnabled(statsEnabled); 1033 } 1034 1035 public synchronized int getProducerWindowSize() { 1036 return producerWindowSize; 1037 } 1038 1039 public synchronized void setProducerWindowSize(int producerWindowSize) { 1040 this.producerWindowSize = producerWindowSize; 1041 } 1042 1043 public long getWarnAboutUnstartedConnectionTimeout() { 1044 return warnAboutUnstartedConnectionTimeout; 1045 } 1046 1047 /** 1048 * Enables the timeout from a connection creation to when a warning is 1049 * generated if the connection is not properly started via 1050 * {@link Connection#start()} and a message is received by a consumer. It is 1051 * a very common gotcha to forget to <a 1052 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1053 * the connection</a> so this option makes the default case to create a 1054 * warning if the user forgets. To disable the warning just set the value to < 1055 * 0 (say -1). 1056 */ 1057 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1058 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1059 } 1060 1061 public TransportListener getTransportListener() { 1062 return transportListener; 1063 } 1064 1065 /** 1066 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 1067 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 1068 * a transport listener. 1069 * 1070 * @param transportListener sets the listener to be registered on all connections 1071 * created by this factory 1072 */ 1073 public void setTransportListener(TransportListener transportListener) { 1074 this.transportListener = transportListener; 1075 } 1076 1077 1078 public ExceptionListener getExceptionListener() { 1079 return exceptionListener; 1080 } 1081 1082 /** 1083 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 1084 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1085 * an exception listener. 1086 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 1087 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1088 * @param exceptionListener sets the exception listener to be registered on all connections 1089 * created by this factory 1090 */ 1091 public void setExceptionListener(ExceptionListener exceptionListener) { 1092 this.exceptionListener = exceptionListener; 1093 } 1094 1095 public int getAuditDepth() { 1096 return auditDepth; 1097 } 1098 1099 public void setAuditDepth(int auditDepth) { 1100 this.auditDepth = auditDepth; 1101 } 1102 1103 public int getAuditMaximumProducerNumber() { 1104 return auditMaximumProducerNumber; 1105 } 1106 1107 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 1108 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 1109 } 1110 1111 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1112 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1113 } 1114 1115 public boolean isUseDedicatedTaskRunner() { 1116 return useDedicatedTaskRunner; 1117 } 1118 1119 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 1120 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 1121 } 1122 1123 public long getConsumerFailoverRedeliveryWaitPeriod() { 1124 return consumerFailoverRedeliveryWaitPeriod; 1125 } 1126 1127 public ClientInternalExceptionListener getClientInternalExceptionListener() { 1128 return clientInternalExceptionListener; 1129 } 1130 1131 /** 1132 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 1133 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1134 * an exception listener. 1135 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 1136 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1137 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 1138 * created by this factory 1139 */ 1140 public void setClientInternalExceptionListener( 1141 ClientInternalExceptionListener clientInternalExceptionListener) { 1142 this.clientInternalExceptionListener = clientInternalExceptionListener; 1143 } 1144 1145 /** 1146 * @return the checkForDuplicates 1147 */ 1148 public boolean isCheckForDuplicates() { 1149 return this.checkForDuplicates; 1150 } 1151 1152 /** 1153 * @param checkForDuplicates the checkForDuplicates to set 1154 */ 1155 public void setCheckForDuplicates(boolean checkForDuplicates) { 1156 this.checkForDuplicates = checkForDuplicates; 1157 } 1158 1159 public boolean isTransactedIndividualAck() { 1160 return transactedIndividualAck; 1161 } 1162 1163 /** 1164 * when true, submit individual transacted acks immediately rather than with transaction completion. 1165 * This allows the acks to represent delivery status which can be persisted on rollback 1166 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true 1167 */ 1168 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 1169 this.transactedIndividualAck = transactedIndividualAck; 1170 } 1171 1172 1173 public boolean isNonBlockingRedelivery() { 1174 return nonBlockingRedelivery; 1175 } 1176 1177 /** 1178 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages 1179 * from a rolled back transaction. This implies that message order will not be preserved and 1180 * also will result in the TransactedIndividualAck option to be enabled. 1181 */ 1182 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 1183 this.nonBlockingRedelivery = nonBlockingRedelivery; 1184 } 1185 1186 public int getMaxThreadPoolSize() { 1187 return maxThreadPoolSize; 1188 } 1189 1190 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 1191 this.maxThreadPoolSize = maxThreadPoolSize; 1192 } 1193 1194 public TaskRunnerFactory getSessionTaskRunner() { 1195 return sessionTaskRunner; 1196 } 1197 1198 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1199 this.sessionTaskRunner = sessionTaskRunner; 1200 } 1201 1202 public RejectedExecutionHandler getRejectedTaskHandler() { 1203 return rejectedTaskHandler; 1204 } 1205 1206 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 1207 this.rejectedTaskHandler = rejectedTaskHandler; 1208 } 1209 1210 /** 1211 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 1212 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 1213 * will not do any background Message acknowledgment. 1214 * 1215 * @return the scheduledOptimizedAckInterval 1216 */ 1217 public long getOptimizedAckScheduledAckInterval() { 1218 return optimizedAckScheduledAckInterval; 1219 } 1220 1221 /** 1222 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 1223 * have been configured with optimizeAcknowledge enabled. 1224 * 1225 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 1226 */ 1227 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 1228 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1229 } 1230 1231 1232 public boolean isRmIdFromConnectionId() { 1233 return rmIdFromConnectionId; 1234 } 1235 1236 /** 1237 * uses the connection id as the resource identity for XAResource.isSameRM 1238 * ensuring join will only occur on a single connection 1239 */ 1240 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 1241 this.rmIdFromConnectionId = rmIdFromConnectionId; 1242 } 1243 1244 /** 1245 * @return true if MessageConsumer instance will check for expired messages before dispatch. 1246 */ 1247 public boolean isConsumerExpiryCheckEnabled() { 1248 return consumerExpiryCheckEnabled; 1249 } 1250 1251 /** 1252 * Controls whether message expiration checking is done in each MessageConsumer 1253 * prior to dispatching a message. Disabling this check can lead to consumption 1254 * of expired messages. 1255 * 1256 * @param consumerExpiryCheckEnabled 1257 * controls whether expiration checking is done prior to dispatch. 1258 */ 1259 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1260 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1261 } 1262 1263 public List<String> getTrustedPackages() { 1264 return trustedPackages; 1265 } 1266 1267 public void setTrustedPackages(List<String> trustedPackages) { 1268 this.trustedPackages = trustedPackages; 1269 } 1270 1271 public boolean isTrustAllPackages() { 1272 return trustAllPackages; 1273 } 1274 1275 public void setTrustAllPackages(boolean trustAllPackages) { 1276 this.trustAllPackages = trustAllPackages; 1277 } 1278}