001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.policy; 018 019import java.util.Set; 020 021import org.apache.activemq.ActiveMQPrefetchPolicy; 022import org.apache.activemq.broker.Broker; 023import org.apache.activemq.broker.region.BaseDestination; 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.DurableTopicSubscription; 026import org.apache.activemq.broker.region.Queue; 027import org.apache.activemq.broker.region.QueueBrowserSubscription; 028import org.apache.activemq.broker.region.QueueSubscription; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.broker.region.Topic; 031import org.apache.activemq.broker.region.TopicSubscription; 032import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 033import org.apache.activemq.broker.region.group.GroupFactoryFinder; 034import org.apache.activemq.broker.region.group.MessageGroupMapFactory; 035import org.apache.activemq.filter.DestinationMapEntry; 036import org.apache.activemq.network.NetworkBridgeFilterFactory; 037import org.apache.activemq.usage.SystemUsage; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * Represents an entry in a {@link PolicyMap} for assigning policies to a 043 * specific destination or a hierarchical wildcard area of destinations. 044 * 045 * @org.apache.xbean.XBean 046 * 047 */ 048public class PolicyEntry extends DestinationMapEntry { 049 050 private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class); 051 private DispatchPolicy dispatchPolicy; 052 private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; 053 private boolean sendAdvisoryIfNoConsumers; 054 private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY; 055 private PendingMessageLimitStrategy pendingMessageLimitStrategy; 056 private MessageEvictionStrategy messageEvictionStrategy; 057 private long memoryLimit; 058 private String messageGroupMapFactoryType = "cached"; 059 private MessageGroupMapFactory messageGroupMapFactory; 060 private PendingQueueMessageStoragePolicy pendingQueuePolicy; 061 private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; 062 private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; 063 private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT; 064 private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 065 private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH; 066 private boolean enableAudit=true; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 070 private boolean optimizedDispatch=false; 071 private int maxPageSize=BaseDestination.MAX_PAGE_SIZE; 072 private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE; 073 private boolean useCache=true; 074 private long minimumMessageSize=1024; 075 private boolean useConsumerPriority=true; 076 private boolean strictOrderDispatch=false; 077 private boolean lazyDispatch=false; 078 private int timeBeforeDispatchStarts = 0; 079 private int consumersBeforeDispatchStarts = 0; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private boolean includeBodyForAdvisory; 087 private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD; 088 private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE; 089 private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; 090 private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; 091 private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; 092 private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; 093 private boolean usePrefetchExtension = true; 094 private int cursorMemoryHighWaterMark = 70; 095 private int storeUsageHighWaterMark = 100; 096 private SlowConsumerStrategy slowConsumerStrategy; 097 private boolean prioritizedMessages; 098 private boolean allConsumersExclusiveByDefault; 099 private boolean gcInactiveDestinations; 100 private boolean gcWithNetworkConsumers; 101 private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 102 private boolean reduceMemoryFootprint; 103 private NetworkBridgeFilterFactory networkBridgeFilterFactory; 104 private boolean doOptimzeMessageStorage = true; 105 private int maxDestinations = -1; 106 107 /* 108 * percentage of in-flight messages above which optimize message store is disabled 109 */ 110 private int optimizeMessageStoreInFlightLimit = 10; 111 private boolean persistJMSRedelivered = false; 112 113 114 public void configure(Broker broker,Queue queue) { 115 baseConfiguration(broker,queue); 116 if (dispatchPolicy != null) { 117 queue.setDispatchPolicy(dispatchPolicy); 118 } 119 queue.setDeadLetterStrategy(getDeadLetterStrategy()); 120 queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); 121 if (memoryLimit > 0) { 122 queue.getMemoryUsage().setLimit(memoryLimit); 123 } 124 if (pendingQueuePolicy != null) { 125 PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); 126 queue.setMessages(messages); 127 } 128 129 queue.setUseConsumerPriority(isUseConsumerPriority()); 130 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 131 queue.setOptimizedDispatch(isOptimizedDispatch()); 132 queue.setLazyDispatch(isLazyDispatch()); 133 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 134 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 135 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 136 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 137 } 138 139 public void update(Queue queue) { 140 update(queue, null); 141 } 142 143 /** 144 * Update a queue with this policy. Only apply properties that 145 * match the includedProperties list. Not all properties are eligible 146 * to be updated. 147 * 148 * If includedProperties is null then all of the properties will be set as 149 * isUpdate will return true 150 * @param baseDestination 151 * @param includedProperties 152 */ 153 public void update(Queue queue, Set<String> includedProperties) { 154 baseUpdate(queue, includedProperties); 155 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 156 queue.getMemoryUsage().setLimit(memoryLimit); 157 } 158 if (isUpdate("useConsumerPriority", includedProperties)) { 159 queue.setUseConsumerPriority(isUseConsumerPriority()); 160 } 161 if (isUpdate("strictOrderDispatch", includedProperties)) { 162 queue.setStrictOrderDispatch(isStrictOrderDispatch()); 163 } 164 if (isUpdate("optimizedDispatch", includedProperties)) { 165 queue.setOptimizedDispatch(isOptimizedDispatch()); 166 } 167 if (isUpdate("lazyDispatch", includedProperties)) { 168 queue.setLazyDispatch(isLazyDispatch()); 169 } 170 if (isUpdate("timeBeforeDispatchStarts", includedProperties)) { 171 queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts()); 172 } 173 if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) { 174 queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts()); 175 } 176 if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) { 177 queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault()); 178 } 179 if (isUpdate("persistJMSRedelivered", includedProperties)) { 180 queue.setPersistJMSRedelivered(isPersistJMSRedelivered()); 181 } 182 } 183 184 public void configure(Broker broker,Topic topic) { 185 baseConfiguration(broker,topic); 186 if (dispatchPolicy != null) { 187 topic.setDispatchPolicy(dispatchPolicy); 188 } 189 topic.setDeadLetterStrategy(getDeadLetterStrategy()); 190 if (subscriptionRecoveryPolicy != null) { 191 SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy(); 192 srp.setBroker(broker); 193 topic.setSubscriptionRecoveryPolicy(srp); 194 } 195 if (memoryLimit > 0) { 196 topic.getMemoryUsage().setLimit(memoryLimit); 197 } 198 topic.setLazyDispatch(isLazyDispatch()); 199 } 200 201 public void update(Topic topic) { 202 update(topic, null); 203 } 204 205 //If includedProperties is null then all of the properties will be set as 206 //isUpdate will return true 207 public void update(Topic topic, Set<String> includedProperties) { 208 baseUpdate(topic, includedProperties); 209 if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) { 210 topic.getMemoryUsage().setLimit(memoryLimit); 211 } 212 if (isUpdate("lazyDispatch", includedProperties)) { 213 topic.setLazyDispatch(isLazyDispatch()); 214 } 215 } 216 217 // attributes that can change on the fly 218 public void baseUpdate(BaseDestination destination) { 219 baseUpdate(destination, null); 220 } 221 222 // attributes that can change on the fly 223 //If includedProperties is null then all of the properties will be set as 224 //isUpdate will return true 225 public void baseUpdate(BaseDestination destination, Set<String> includedProperties) { 226 if (isUpdate("producerFlowControl", includedProperties)) { 227 destination.setProducerFlowControl(isProducerFlowControl()); 228 } 229 if (isUpdate("alwaysRetroactive", includedProperties)) { 230 destination.setAlwaysRetroactive(isAlwaysRetroactive()); 231 } 232 if (isUpdate("blockedProducerWarningInterval", includedProperties)) { 233 destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval()); 234 } 235 if (isUpdate("maxPageSize", includedProperties)) { 236 destination.setMaxPageSize(getMaxPageSize()); 237 } 238 if (isUpdate("maxBrowsePageSize", includedProperties)) { 239 destination.setMaxBrowsePageSize(getMaxBrowsePageSize()); 240 } 241 242 if (isUpdate("minimumMessageSize", includedProperties)) { 243 destination.setMinimumMessageSize((int) getMinimumMessageSize()); 244 } 245 if (isUpdate("maxExpirePageSize", includedProperties)) { 246 destination.setMaxExpirePageSize(getMaxExpirePageSize()); 247 } 248 if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) { 249 destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 250 } 251 if (isUpdate("storeUsageHighWaterMark", includedProperties)) { 252 destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); 253 } 254 if (isUpdate("gcInactiveDestinations", includedProperties)) { 255 destination.setGcIfInactive(isGcInactiveDestinations()); 256 } 257 if (isUpdate("gcWithNetworkConsumers", includedProperties)) { 258 destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); 259 } 260 if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) { 261 destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC()); 262 } 263 if (isUpdate("reduceMemoryFootprint", includedProperties)) { 264 destination.setReduceMemoryFootprint(isReduceMemoryFootprint()); 265 } 266 if (isUpdate("doOptimizeMessageStore", includedProperties)) { 267 destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage()); 268 } 269 if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) { 270 destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit()); 271 } 272 if (isUpdate("advisoryForConsumed", includedProperties)) { 273 destination.setAdvisoryForConsumed(isAdvisoryForConsumed()); 274 } 275 if (isUpdate("advisoryForDelivery", includedProperties)) { 276 destination.setAdvisoryForDelivery(isAdvisoryForDelivery()); 277 } 278 if (isUpdate("advisoryForDiscardingMessages", includedProperties)) { 279 destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages()); 280 } 281 if (isUpdate("advisoryForSlowConsumers", includedProperties)) { 282 destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers()); 283 } 284 if (isUpdate("advisoryForFastProducers", includedProperties)) { 285 destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers()); 286 } 287 if (isUpdate("advisoryWhenFull", includedProperties)) { 288 destination.setAdvisoryWhenFull(isAdvisoryWhenFull()); 289 } 290 if (isUpdate("includeBodyForAdvisory", includedProperties)) { 291 destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory()); 292 } 293 if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) { 294 destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers()); 295 } 296 } 297 298 public void baseConfiguration(Broker broker, BaseDestination destination) { 299 baseUpdate(destination); 300 destination.setEnableAudit(isEnableAudit()); 301 destination.setMaxAuditDepth(getMaxQueueAuditDepth()); 302 destination.setMaxProducersToAudit(getMaxProducersToAudit()); 303 destination.setUseCache(isUseCache()); 304 destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); 305 SlowConsumerStrategy scs = getSlowConsumerStrategy(); 306 if (scs != null) { 307 scs.setBrokerService(broker); 308 scs.addDestination(destination); 309 } 310 destination.setSlowConsumerStrategy(scs); 311 destination.setPrioritizedMessages(isPrioritizedMessages()); 312 } 313 314 public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { 315 configurePrefetch(subscription); 316 if (pendingMessageLimitStrategy != null) { 317 int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); 318 int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); 319 if (consumerLimit > 0) { 320 if (value < 0 || consumerLimit < value) { 321 value = consumerLimit; 322 } 323 } 324 if (value >= 0) { 325 LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId()); 326 subscription.setMaximumPendingMessages(value); 327 } 328 } 329 if (messageEvictionStrategy != null) { 330 subscription.setMessageEvictionStrategy(messageEvictionStrategy); 331 } 332 if (pendingSubscriberPolicy != null) { 333 String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); 334 int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); 335 subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription)); 336 } 337 if (enableAudit) { 338 subscription.setEnableAudit(enableAudit); 339 subscription.setMaxProducersToAudit(maxProducersToAudit); 340 subscription.setMaxAuditDepth(maxAuditDepth); 341 } 342 } 343 344 public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) { 345 String clientId = sub.getSubscriptionKey().getClientId(); 346 String subName = sub.getSubscriptionKey().getSubscriptionName(); 347 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 348 configurePrefetch(sub); 349 if (pendingDurableSubscriberPolicy != null) { 350 PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub); 351 cursor.setSystemUsage(memoryManager); 352 sub.setPending(cursor); 353 } 354 int auditDepth = getMaxAuditDepth(); 355 if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) { 356 sub.setMaxAuditDepth(auditDepth * 10); 357 } else { 358 sub.setMaxAuditDepth(auditDepth); 359 } 360 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 361 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 362 } 363 364 public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { 365 configurePrefetch(sub); 366 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 367 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 368 369 // TODO 370 // We currently need an infinite audit because of the way that browser dispatch 371 // is done. We should refactor the browsers to better handle message dispatch so 372 // we can remove this and perform a more efficient dispatch. 373 sub.setMaxProducersToAudit(Integer.MAX_VALUE); 374 sub.setMaxAuditDepth(Short.MAX_VALUE); 375 376 // part solution - dispatching to browsers needs to be restricted 377 sub.setMaxMessages(getMaxBrowsePageSize()); 378 } 379 380 public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { 381 configurePrefetch(sub); 382 sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); 383 sub.setUsePrefetchExtension(isUsePrefetchExtension()); 384 sub.setMaxProducersToAudit(getMaxProducersToAudit()); 385 } 386 387 public void configurePrefetch(Subscription subscription) { 388 389 final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize(); 390 if (subscription instanceof QueueBrowserSubscription) { 391 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) { 392 ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch()); 393 } 394 } else if (subscription instanceof QueueSubscription) { 395 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) { 396 ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch()); 397 } 398 } else if (subscription instanceof DurableTopicSubscription) { 399 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH || 400 subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) { 401 ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch()); 402 } 403 } else if (subscription instanceof TopicSubscription) { 404 if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) { 405 ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch()); 406 } 407 } 408 if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) { 409 // tell the sub so that it can issue a pull request 410 subscription.updateConsumerPrefetch(0); 411 } 412 } 413 414 private boolean isUpdate(String property, Set<String> includedProperties) { 415 return includedProperties == null || includedProperties.contains(property); 416 } 417 // Properties 418 // ------------------------------------------------------------------------- 419 public DispatchPolicy getDispatchPolicy() { 420 return dispatchPolicy; 421 } 422 423 public void setDispatchPolicy(DispatchPolicy policy) { 424 this.dispatchPolicy = policy; 425 } 426 427 public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() { 428 return subscriptionRecoveryPolicy; 429 } 430 431 public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { 432 this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; 433 } 434 435 public boolean isSendAdvisoryIfNoConsumers() { 436 return sendAdvisoryIfNoConsumers; 437 } 438 439 /** 440 * Sends an advisory message if a non-persistent message is sent and there 441 * are no active consumers 442 */ 443 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 444 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 445 } 446 447 public DeadLetterStrategy getDeadLetterStrategy() { 448 return deadLetterStrategy; 449 } 450 451 /** 452 * Sets the policy used to determine which dead letter queue destination 453 * should be used 454 */ 455 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 456 this.deadLetterStrategy = deadLetterStrategy; 457 } 458 459 public PendingMessageLimitStrategy getPendingMessageLimitStrategy() { 460 return pendingMessageLimitStrategy; 461 } 462 463 /** 464 * Sets the strategy to calculate the maximum number of messages that are 465 * allowed to be pending on consumers (in addition to their prefetch sizes). 466 * Once the limit is reached, non-durable topics can then start discarding 467 * old messages. This allows us to keep dispatching messages to slow 468 * consumers while not blocking fast consumers and discarding the messages 469 * oldest first. 470 */ 471 public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) { 472 this.pendingMessageLimitStrategy = pendingMessageLimitStrategy; 473 } 474 475 public MessageEvictionStrategy getMessageEvictionStrategy() { 476 return messageEvictionStrategy; 477 } 478 479 /** 480 * Sets the eviction strategy used to decide which message to evict when the 481 * slow consumer needs to discard messages 482 */ 483 public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) { 484 this.messageEvictionStrategy = messageEvictionStrategy; 485 } 486 487 public long getMemoryLimit() { 488 return memoryLimit; 489 } 490 491 /** 492 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 493 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 494 */ 495 public void setMemoryLimit(long memoryLimit) { 496 this.memoryLimit = memoryLimit; 497 } 498 499 public MessageGroupMapFactory getMessageGroupMapFactory() { 500 if (messageGroupMapFactory == null) { 501 try { 502 messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType()); 503 }catch(Exception e){ 504 LOG.error("Failed to create message group Factory ",e); 505 } 506 } 507 return messageGroupMapFactory; 508 } 509 510 /** 511 * Sets the factory used to create new instances of {MessageGroupMap} used 512 * to implement the <a 513 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 514 * functionality. 515 */ 516 public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) { 517 this.messageGroupMapFactory = messageGroupMapFactory; 518 } 519 520 521 public String getMessageGroupMapFactoryType() { 522 return messageGroupMapFactoryType; 523 } 524 525 public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) { 526 this.messageGroupMapFactoryType = messageGroupMapFactoryType; 527 } 528 529 530 /** 531 * @return the pendingDurableSubscriberPolicy 532 */ 533 public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { 534 return this.pendingDurableSubscriberPolicy; 535 } 536 537 /** 538 * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy 539 * to set 540 */ 541 public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { 542 this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy; 543 } 544 545 /** 546 * @return the pendingQueuePolicy 547 */ 548 public PendingQueueMessageStoragePolicy getPendingQueuePolicy() { 549 return this.pendingQueuePolicy; 550 } 551 552 /** 553 * @param pendingQueuePolicy the pendingQueuePolicy to set 554 */ 555 public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) { 556 this.pendingQueuePolicy = pendingQueuePolicy; 557 } 558 559 /** 560 * @return the pendingSubscriberPolicy 561 */ 562 public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() { 563 return this.pendingSubscriberPolicy; 564 } 565 566 /** 567 * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set 568 */ 569 public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) { 570 this.pendingSubscriberPolicy = pendingSubscriberPolicy; 571 } 572 573 /** 574 * @return true if producer flow control enabled 575 */ 576 public boolean isProducerFlowControl() { 577 return producerFlowControl; 578 } 579 580 /** 581 * @param producerFlowControl 582 */ 583 public void setProducerFlowControl(boolean producerFlowControl) { 584 this.producerFlowControl = producerFlowControl; 585 } 586 587 /** 588 * @return true if topic is always retroactive 589 */ 590 public boolean isAlwaysRetroactive() { 591 return alwaysRetroactive; 592 } 593 594 /** 595 * @param alwaysRetroactive 596 */ 597 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 598 this.alwaysRetroactive = alwaysRetroactive; 599 } 600 601 602 /** 603 * Set's the interval at which warnings about producers being blocked by 604 * resource usage will be triggered. Values of 0 or less will disable 605 * warnings 606 * 607 * @param blockedProducerWarningInterval the interval at which warning about 608 * blocked producers will be triggered. 609 */ 610 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 611 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 612 } 613 614 /** 615 * 616 * @return the interval at which warning about blocked producers will be 617 * triggered. 618 */ 619 public long getBlockedProducerWarningInterval() { 620 return blockedProducerWarningInterval; 621 } 622 623 /** 624 * @return the maxProducersToAudit 625 */ 626 public int getMaxProducersToAudit() { 627 return maxProducersToAudit; 628 } 629 630 /** 631 * @param maxProducersToAudit the maxProducersToAudit to set 632 */ 633 public void setMaxProducersToAudit(int maxProducersToAudit) { 634 this.maxProducersToAudit = maxProducersToAudit; 635 } 636 637 /** 638 * @return the maxAuditDepth 639 */ 640 public int getMaxAuditDepth() { 641 return maxAuditDepth; 642 } 643 644 /** 645 * @param maxAuditDepth the maxAuditDepth to set 646 */ 647 public void setMaxAuditDepth(int maxAuditDepth) { 648 this.maxAuditDepth = maxAuditDepth; 649 } 650 651 /** 652 * @return the enableAudit 653 */ 654 public boolean isEnableAudit() { 655 return enableAudit; 656 } 657 658 /** 659 * @param enableAudit the enableAudit to set 660 */ 661 public void setEnableAudit(boolean enableAudit) { 662 this.enableAudit = enableAudit; 663 } 664 665 public int getMaxQueueAuditDepth() { 666 return maxQueueAuditDepth; 667 } 668 669 public void setMaxQueueAuditDepth(int maxQueueAuditDepth) { 670 this.maxQueueAuditDepth = maxQueueAuditDepth; 671 } 672 673 public boolean isOptimizedDispatch() { 674 return optimizedDispatch; 675 } 676 677 public void setOptimizedDispatch(boolean optimizedDispatch) { 678 this.optimizedDispatch = optimizedDispatch; 679 } 680 681 public int getMaxPageSize() { 682 return maxPageSize; 683 } 684 685 public void setMaxPageSize(int maxPageSize) { 686 this.maxPageSize = maxPageSize; 687 } 688 689 public int getMaxBrowsePageSize() { 690 return maxBrowsePageSize; 691 } 692 693 public void setMaxBrowsePageSize(int maxPageSize) { 694 this.maxBrowsePageSize = maxPageSize; 695 } 696 697 public boolean isUseCache() { 698 return useCache; 699 } 700 701 public void setUseCache(boolean useCache) { 702 this.useCache = useCache; 703 } 704 705 public long getMinimumMessageSize() { 706 return minimumMessageSize; 707 } 708 709 public void setMinimumMessageSize(long minimumMessageSize) { 710 this.minimumMessageSize = minimumMessageSize; 711 } 712 713 public boolean isUseConsumerPriority() { 714 return useConsumerPriority; 715 } 716 717 public void setUseConsumerPriority(boolean useConsumerPriority) { 718 this.useConsumerPriority = useConsumerPriority; 719 } 720 721 public boolean isStrictOrderDispatch() { 722 return strictOrderDispatch; 723 } 724 725 public void setStrictOrderDispatch(boolean strictOrderDispatch) { 726 this.strictOrderDispatch = strictOrderDispatch; 727 } 728 729 public boolean isLazyDispatch() { 730 return lazyDispatch; 731 } 732 733 public void setLazyDispatch(boolean lazyDispatch) { 734 this.lazyDispatch = lazyDispatch; 735 } 736 737 public int getTimeBeforeDispatchStarts() { 738 return timeBeforeDispatchStarts; 739 } 740 741 public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) { 742 this.timeBeforeDispatchStarts = timeBeforeDispatchStarts; 743 } 744 745 public int getConsumersBeforeDispatchStarts() { 746 return consumersBeforeDispatchStarts; 747 } 748 749 public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { 750 this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; 751 } 752 753 /** 754 * @return the advisoryForSlowConsumers 755 */ 756 public boolean isAdvisoryForSlowConsumers() { 757 return advisoryForSlowConsumers; 758 } 759 760 /** 761 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 762 */ 763 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 764 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 765 } 766 767 /** 768 * @return the advisoryForDiscardingMessages 769 */ 770 public boolean isAdvisoryForDiscardingMessages() { 771 return advisoryForDiscardingMessages; 772 } 773 774 /** 775 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set 776 */ 777 public void setAdvisoryForDiscardingMessages( 778 boolean advisoryForDiscardingMessages) { 779 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 780 } 781 782 /** 783 * @return the advisoryWhenFull 784 */ 785 public boolean isAdvisoryWhenFull() { 786 return advisoryWhenFull; 787 } 788 789 /** 790 * @param advisoryWhenFull the advisoryWhenFull to set 791 */ 792 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 793 this.advisoryWhenFull = advisoryWhenFull; 794 } 795 796 /** 797 * @return the advisoryForDelivery 798 */ 799 public boolean isAdvisoryForDelivery() { 800 return advisoryForDelivery; 801 } 802 803 /** 804 * @param advisoryForDelivery the advisoryForDelivery to set 805 */ 806 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 807 this.advisoryForDelivery = advisoryForDelivery; 808 } 809 810 /** 811 * @return the advisoryForConsumed 812 */ 813 public boolean isAdvisoryForConsumed() { 814 return advisoryForConsumed; 815 } 816 817 /** 818 * @param advisoryForConsumed the advisoryForConsumed to set 819 */ 820 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 821 this.advisoryForConsumed = advisoryForConsumed; 822 } 823 824 /** 825 * @return the advisdoryForFastProducers 826 */ 827 public boolean isAdvisoryForFastProducers() { 828 return advisoryForFastProducers; 829 } 830 831 /** 832 * @param advisoryForFastProducers the advisdoryForFastProducers to set 833 */ 834 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 835 this.advisoryForFastProducers = advisoryForFastProducers; 836 } 837 838 /** 839 * Returns true if the original message body should be included when applicable 840 * for advisory messages 841 * 842 * @return 843 */ 844 public boolean isIncludeBodyForAdvisory() { 845 return includeBodyForAdvisory; 846 } 847 848 /** 849 * Sets if the original message body should be included when applicable 850 * for advisory messages 851 * 852 * @param includeBodyForAdvisory 853 */ 854 public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { 855 this.includeBodyForAdvisory = includeBodyForAdvisory; 856 } 857 858 public void setMaxExpirePageSize(int maxExpirePageSize) { 859 this.maxExpirePageSize = maxExpirePageSize; 860 } 861 862 public int getMaxExpirePageSize() { 863 return maxExpirePageSize; 864 } 865 866 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 867 this.expireMessagesPeriod = expireMessagesPeriod; 868 } 869 870 public long getExpireMessagesPeriod() { 871 return expireMessagesPeriod; 872 } 873 874 /** 875 * Get the queuePrefetch 876 * @return the queuePrefetch 877 */ 878 public int getQueuePrefetch() { 879 return this.queuePrefetch; 880 } 881 882 /** 883 * Set the queuePrefetch 884 * @param queuePrefetch the queuePrefetch to set 885 */ 886 public void setQueuePrefetch(int queuePrefetch) { 887 this.queuePrefetch = queuePrefetch; 888 } 889 890 /** 891 * Get the queueBrowserPrefetch 892 * @return the queueBrowserPrefetch 893 */ 894 public int getQueueBrowserPrefetch() { 895 return this.queueBrowserPrefetch; 896 } 897 898 /** 899 * Set the queueBrowserPrefetch 900 * @param queueBrowserPrefetch the queueBrowserPrefetch to set 901 */ 902 public void setQueueBrowserPrefetch(int queueBrowserPrefetch) { 903 this.queueBrowserPrefetch = queueBrowserPrefetch; 904 } 905 906 /** 907 * Get the topicPrefetch 908 * @return the topicPrefetch 909 */ 910 public int getTopicPrefetch() { 911 return this.topicPrefetch; 912 } 913 914 /** 915 * Set the topicPrefetch 916 * @param topicPrefetch the topicPrefetch to set 917 */ 918 public void setTopicPrefetch(int topicPrefetch) { 919 this.topicPrefetch = topicPrefetch; 920 } 921 922 /** 923 * Get the durableTopicPrefetch 924 * @return the durableTopicPrefetch 925 */ 926 public int getDurableTopicPrefetch() { 927 return this.durableTopicPrefetch; 928 } 929 930 /** 931 * Set the durableTopicPrefetch 932 * @param durableTopicPrefetch the durableTopicPrefetch to set 933 */ 934 public void setDurableTopicPrefetch(int durableTopicPrefetch) { 935 this.durableTopicPrefetch = durableTopicPrefetch; 936 } 937 938 public boolean isUsePrefetchExtension() { 939 return this.usePrefetchExtension; 940 } 941 942 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 943 this.usePrefetchExtension = usePrefetchExtension; 944 } 945 946 public int getCursorMemoryHighWaterMark() { 947 return this.cursorMemoryHighWaterMark; 948 } 949 950 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 951 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 952 } 953 954 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 955 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 956 } 957 958 public int getStoreUsageHighWaterMark() { 959 return storeUsageHighWaterMark; 960 } 961 962 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 963 this.slowConsumerStrategy = slowConsumerStrategy; 964 } 965 966 public SlowConsumerStrategy getSlowConsumerStrategy() { 967 return this.slowConsumerStrategy; 968 } 969 970 971 public boolean isPrioritizedMessages() { 972 return this.prioritizedMessages; 973 } 974 975 public void setPrioritizedMessages(boolean prioritizedMessages) { 976 this.prioritizedMessages = prioritizedMessages; 977 } 978 979 public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) { 980 this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault; 981 } 982 983 public boolean isAllConsumersExclusiveByDefault() { 984 return allConsumersExclusiveByDefault; 985 } 986 987 public boolean isGcInactiveDestinations() { 988 return this.gcInactiveDestinations; 989 } 990 991 public void setGcInactiveDestinations(boolean gcInactiveDestinations) { 992 this.gcInactiveDestinations = gcInactiveDestinations; 993 } 994 995 /** 996 * @return the amount of time spent inactive before GC of the destination kicks in. 997 * 998 * @deprecated use getInactiveTimeoutBeforeGC instead. 999 */ 1000 @Deprecated 1001 public long getInactiveTimoutBeforeGC() { 1002 return getInactiveTimeoutBeforeGC(); 1003 } 1004 1005 /** 1006 * Sets the amount of time a destination is inactive before it is marked for GC 1007 * 1008 * @param inactiveTimoutBeforeGC 1009 * time in milliseconds to configure as the inactive timeout. 1010 * 1011 * @deprecated use getInactiveTimeoutBeforeGC instead. 1012 */ 1013 @Deprecated 1014 public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) { 1015 setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC); 1016 } 1017 1018 /** 1019 * @return the amount of time spent inactive before GC of the destination kicks in. 1020 */ 1021 public long getInactiveTimeoutBeforeGC() { 1022 return this.inactiveTimeoutBeforeGC; 1023 } 1024 1025 /** 1026 * Sets the amount of time a destination is inactive before it is marked for GC 1027 * 1028 * @param inactiveTimoutBeforeGC 1029 * time in milliseconds to configure as the inactive timeout. 1030 */ 1031 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 1032 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 1033 } 1034 1035 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 1036 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 1037 } 1038 1039 public boolean isGcWithNetworkConsumers() { 1040 return gcWithNetworkConsumers; 1041 } 1042 1043 public boolean isReduceMemoryFootprint() { 1044 return reduceMemoryFootprint; 1045 } 1046 1047 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 1048 this.reduceMemoryFootprint = reduceMemoryFootprint; 1049 } 1050 1051 public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) { 1052 this.networkBridgeFilterFactory = networkBridgeFilterFactory; 1053 } 1054 1055 public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() { 1056 return networkBridgeFilterFactory; 1057 } 1058 1059 public boolean isDoOptimzeMessageStorage() { 1060 return doOptimzeMessageStorage; 1061 } 1062 1063 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 1064 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 1065 } 1066 1067 public int getOptimizeMessageStoreInFlightLimit() { 1068 return optimizeMessageStoreInFlightLimit; 1069 } 1070 1071 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 1072 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 1073 } 1074 1075 public void setPersistJMSRedelivered(boolean val) { 1076 this.persistJMSRedelivered = val; 1077 } 1078 1079 public boolean isPersistJMSRedelivered() { 1080 return persistJMSRedelivered; 1081 } 1082 1083 public int getMaxDestinations() { 1084 return maxDestinations; 1085 } 1086 1087 /** 1088 * Sets the maximum number of destinations that can be created 1089 * 1090 * @param maxDestinations 1091 * maximum number of destinations 1092 */ 1093 public void setMaxDestinations(int maxDestinations) { 1094 this.maxDestinations = maxDestinations; 1095 } 1096 1097 @Override 1098 public String toString() { 1099 return "PolicyEntry [" + destination + "]"; 1100 } 1101}