001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.util.ArrayList; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Map.Entry; 030import java.util.Set; 031import java.util.concurrent.BlockingQueue; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.FutureTask; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.ThreadFactory; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.activemq.broker.ConnectionContext; 044import org.apache.activemq.broker.region.BaseDestination; 045import org.apache.activemq.broker.scheduler.JobSchedulerStore; 046import org.apache.activemq.command.ActiveMQDestination; 047import org.apache.activemq.command.ActiveMQQueue; 048import org.apache.activemq.command.ActiveMQTempQueue; 049import org.apache.activemq.command.ActiveMQTempTopic; 050import org.apache.activemq.command.ActiveMQTopic; 051import org.apache.activemq.command.Message; 052import org.apache.activemq.command.MessageAck; 053import org.apache.activemq.command.MessageId; 054import org.apache.activemq.command.ProducerId; 055import org.apache.activemq.command.SubscriptionInfo; 056import org.apache.activemq.command.TransactionId; 057import org.apache.activemq.openwire.OpenWireFormat; 058import org.apache.activemq.protobuf.Buffer; 059import org.apache.activemq.store.AbstractMessageStore; 060import org.apache.activemq.store.IndexListener; 061import org.apache.activemq.store.ListenableFuture; 062import org.apache.activemq.store.MessageRecoveryListener; 063import org.apache.activemq.store.MessageStore; 064import org.apache.activemq.store.MessageStoreStatistics; 065import org.apache.activemq.store.PersistenceAdapter; 066import org.apache.activemq.store.TopicMessageStore; 067import org.apache.activemq.store.TransactionIdTransformer; 068import org.apache.activemq.store.TransactionStore; 069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 070import org.apache.activemq.store.kahadb.data.KahaDestination; 071import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; 072import org.apache.activemq.store.kahadb.data.KahaLocation; 073import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 074import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 075import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 076import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 077import org.apache.activemq.store.kahadb.disk.journal.Location; 078import org.apache.activemq.store.kahadb.disk.page.Transaction; 079import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 080import org.apache.activemq.usage.MemoryUsage; 081import org.apache.activemq.usage.SystemUsage; 082import org.apache.activemq.util.ServiceStopper; 083import org.apache.activemq.util.ThreadPoolUtils; 084import org.apache.activemq.wireformat.WireFormat; 085import org.slf4j.Logger; 086import org.slf4j.LoggerFactory; 087 088public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { 089 static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); 090 private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH; 091 092 public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC"; 093 public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty( 094 PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10); 095 public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS"; 096 private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty( 097 PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);; 098 099 protected ExecutorService queueExecutor; 100 protected ExecutorService topicExecutor; 101 protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 102 protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>(); 103 final WireFormat wireFormat = new OpenWireFormat(); 104 private SystemUsage usageManager; 105 private LinkedBlockingQueue<Runnable> asyncQueueJobQueue; 106 private LinkedBlockingQueue<Runnable> asyncTopicJobQueue; 107 Semaphore globalQueueSemaphore; 108 Semaphore globalTopicSemaphore; 109 private boolean concurrentStoreAndDispatchQueues = true; 110 // when true, message order may be compromised when cache is exhausted if store is out 111 // or order w.r.t cache 112 private boolean concurrentStoreAndDispatchTopics = false; 113 private final boolean concurrentStoreAndDispatchTransactions = false; 114 private int maxAsyncJobs = MAX_ASYNC_JOBS; 115 private final KahaDBTransactionStore transactionStore; 116 private TransactionIdTransformer transactionIdTransformer; 117 118 public KahaDBStore() { 119 this.transactionStore = new KahaDBTransactionStore(this); 120 this.transactionIdTransformer = new TransactionIdTransformer() { 121 @Override 122 public TransactionId transform(TransactionId txid) { 123 return txid; 124 } 125 }; 126 } 127 128 @Override 129 public String toString() { 130 return "KahaDB:[" + directory.getAbsolutePath() + "]"; 131 } 132 133 @Override 134 public void setBrokerName(String brokerName) { 135 } 136 137 @Override 138 public void setUsageManager(SystemUsage usageManager) { 139 this.usageManager = usageManager; 140 } 141 142 public SystemUsage getUsageManager() { 143 return this.usageManager; 144 } 145 146 /** 147 * @return the concurrentStoreAndDispatch 148 */ 149 public boolean isConcurrentStoreAndDispatchQueues() { 150 return this.concurrentStoreAndDispatchQueues; 151 } 152 153 /** 154 * @param concurrentStoreAndDispatch 155 * the concurrentStoreAndDispatch to set 156 */ 157 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 158 this.concurrentStoreAndDispatchQueues = concurrentStoreAndDispatch; 159 } 160 161 /** 162 * @return the concurrentStoreAndDispatch 163 */ 164 public boolean isConcurrentStoreAndDispatchTopics() { 165 return this.concurrentStoreAndDispatchTopics; 166 } 167 168 /** 169 * @param concurrentStoreAndDispatch 170 * the concurrentStoreAndDispatch to set 171 */ 172 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 173 this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch; 174 } 175 176 public boolean isConcurrentStoreAndDispatchTransactions() { 177 return this.concurrentStoreAndDispatchTransactions; 178 } 179 180 /** 181 * @return the maxAsyncJobs 182 */ 183 public int getMaxAsyncJobs() { 184 return this.maxAsyncJobs; 185 } 186 187 /** 188 * @param maxAsyncJobs 189 * the maxAsyncJobs to set 190 */ 191 public void setMaxAsyncJobs(int maxAsyncJobs) { 192 this.maxAsyncJobs = maxAsyncJobs; 193 } 194 195 196 @Override 197 protected void configureMetadata() { 198 if (brokerService != null) { 199 metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); 200 wireFormat.setVersion(metadata.openwireVersion); 201 202 if (LOG.isDebugEnabled()) { 203 LOG.debug("Store OpenWire version configured as: {}", metadata.openwireVersion); 204 } 205 206 } 207 } 208 209 @Override 210 public void doStart() throws Exception { 211 //configure the metadata before start, right now 212 //this is just the open wire version 213 configureMetadata(); 214 215 super.doStart(); 216 217 if (brokerService != null) { 218 // In case the recovered store used a different OpenWire version log a warning 219 // to assist in determining why journal reads fail. 220 if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { 221 LOG.warn("Existing Store uses a different OpenWire version[{}] " + 222 "than the version configured[{}] reverting to the version " + 223 "used by this store, some newer broker features may not work" + 224 "as expected.", 225 metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); 226 227 // Update the broker service instance to the actual version in use. 228 wireFormat.setVersion(metadata.openwireVersion); 229 brokerService.setStoreOpenWireVersion(metadata.openwireVersion); 230 } 231 } 232 233 this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); 234 this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); 235 this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 236 this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs()); 237 this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 238 asyncQueueJobQueue, new ThreadFactory() { 239 @Override 240 public Thread newThread(Runnable runnable) { 241 Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); 242 thread.setDaemon(true); 243 return thread; 244 } 245 }); 246 this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, 247 asyncTopicJobQueue, new ThreadFactory() { 248 @Override 249 public Thread newThread(Runnable runnable) { 250 Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); 251 thread.setDaemon(true); 252 return thread; 253 } 254 }); 255 } 256 257 @Override 258 public void doStop(ServiceStopper stopper) throws Exception { 259 // drain down async jobs 260 LOG.info("Stopping async queue tasks"); 261 if (this.globalQueueSemaphore != null) { 262 this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 263 } 264 synchronized (this.asyncQueueMaps) { 265 for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) { 266 synchronized (m) { 267 for (StoreTask task : m.values()) { 268 task.cancel(); 269 } 270 } 271 } 272 this.asyncQueueMaps.clear(); 273 } 274 LOG.info("Stopping async topic tasks"); 275 if (this.globalTopicSemaphore != null) { 276 this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS); 277 } 278 synchronized (this.asyncTopicMaps) { 279 for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) { 280 synchronized (m) { 281 for (StoreTask task : m.values()) { 282 task.cancel(); 283 } 284 } 285 } 286 this.asyncTopicMaps.clear(); 287 } 288 if (this.globalQueueSemaphore != null) { 289 this.globalQueueSemaphore.drainPermits(); 290 } 291 if (this.globalTopicSemaphore != null) { 292 this.globalTopicSemaphore.drainPermits(); 293 } 294 if (this.queueExecutor != null) { 295 ThreadPoolUtils.shutdownNow(queueExecutor); 296 queueExecutor = null; 297 } 298 if (this.topicExecutor != null) { 299 ThreadPoolUtils.shutdownNow(topicExecutor); 300 topicExecutor = null; 301 } 302 LOG.info("Stopped KahaDB"); 303 super.doStop(stopper); 304 } 305 306 private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { 307 return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { 308 @Override 309 public Location execute(Transaction tx) throws IOException { 310 StoredDestination sd = getStoredDestination(destination, tx); 311 Long sequence = sd.messageIdIndex.get(tx, key); 312 if (sequence == null) { 313 return null; 314 } 315 return sd.orderIndex.get(tx, sequence).location; 316 } 317 }); 318 } 319 320 protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { 321 StoreQueueTask task = null; 322 synchronized (store.asyncTaskMap) { 323 task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 324 } 325 return task; 326 } 327 328 protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { 329 synchronized (store.asyncTaskMap) { 330 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 331 } 332 this.queueExecutor.execute(task); 333 } 334 335 protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { 336 StoreTopicTask task = null; 337 synchronized (store.asyncTaskMap) { 338 task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); 339 } 340 return task; 341 } 342 343 protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { 344 synchronized (store.asyncTaskMap) { 345 store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); 346 } 347 this.topicExecutor.execute(task); 348 } 349 350 @Override 351 public TransactionStore createTransactionStore() throws IOException { 352 return this.transactionStore; 353 } 354 355 public boolean getForceRecoverIndex() { 356 return this.forceRecoverIndex; 357 } 358 359 public void setForceRecoverIndex(boolean forceRecoverIndex) { 360 this.forceRecoverIndex = forceRecoverIndex; 361 } 362 363 public class KahaDBMessageStore extends AbstractMessageStore { 364 protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); 365 protected KahaDestination dest; 366 private final int maxAsyncJobs; 367 private final Semaphore localDestinationSemaphore; 368 369 double doneTasks, canceledTasks = 0; 370 371 public KahaDBMessageStore(ActiveMQDestination destination) { 372 super(destination); 373 this.dest = convert(destination); 374 this.maxAsyncJobs = getMaxAsyncJobs(); 375 this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); 376 } 377 378 @Override 379 public ActiveMQDestination getDestination() { 380 return destination; 381 } 382 383 @Override 384 public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) 385 throws IOException { 386 if (isConcurrentStoreAndDispatchQueues()) { 387 message.beforeMarshall(wireFormat); 388 StoreQueueTask result = new StoreQueueTask(this, context, message); 389 ListenableFuture<Object> future = result.getFuture(); 390 message.getMessageId().setFutureOrSequenceLong(future); 391 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch 392 result.aquireLocks(); 393 addQueueTask(this, result); 394 if (indexListener != null) { 395 indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); 396 } 397 return future; 398 } else { 399 return super.asyncAddQueueMessage(context, message); 400 } 401 } 402 403 @Override 404 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 405 if (isConcurrentStoreAndDispatchQueues()) { 406 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); 407 StoreQueueTask task = null; 408 synchronized (asyncTaskMap) { 409 task = (StoreQueueTask) asyncTaskMap.get(key); 410 } 411 if (task != null) { 412 if (ack.isInTransaction() || !task.cancel()) { 413 try { 414 task.future.get(); 415 } catch (InterruptedException e) { 416 throw new InterruptedIOException(e.toString()); 417 } catch (Exception ignored) { 418 LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored); 419 } 420 removeMessage(context, ack); 421 } else { 422 synchronized (asyncTaskMap) { 423 asyncTaskMap.remove(key); 424 } 425 } 426 } else { 427 removeMessage(context, ack); 428 } 429 } else { 430 removeMessage(context, ack); 431 } 432 } 433 434 @Override 435 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 436 final KahaAddMessageCommand command = new KahaAddMessageCommand(); 437 command.setDestination(dest); 438 command.setMessageId(message.getMessageId().toProducerKey()); 439 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId()))); 440 command.setPriority(message.getPriority()); 441 command.setPrioritySupported(isPrioritizedMessages()); 442 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 443 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 444 store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { 445 // sync add? (for async, future present from getFutureOrSequenceLong) 446 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 447 448 @Override 449 public void sequenceAssignedWithIndexLocked(final long sequence) { 450 message.getMessageId().setFutureOrSequenceLong(sequence); 451 if (indexListener != null) { 452 if (possibleFuture == null) { 453 trackPendingAdd(dest, sequence); 454 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 455 @Override 456 public void run() { 457 trackPendingAddComplete(dest, sequence); 458 } 459 })); 460 } 461 } 462 } 463 }, null); 464 } 465 466 @Override 467 public void updateMessage(Message message) throws IOException { 468 if (LOG.isTraceEnabled()) { 469 LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); 470 } 471 KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); 472 KahaAddMessageCommand command = new KahaAddMessageCommand(); 473 command.setDestination(dest); 474 command.setMessageId(message.getMessageId().toProducerKey()); 475 command.setPriority(message.getPriority()); 476 command.setPrioritySupported(prioritizedMessages); 477 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 478 command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 479 updateMessageCommand.setMessage(command); 480 store(updateMessageCommand, isEnableJournalDiskSyncs(), null, null); 481 } 482 483 @Override 484 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 485 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 486 command.setDestination(dest); 487 command.setMessageId(ack.getLastMessageId().toProducerKey()); 488 command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))); 489 490 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 491 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 492 store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null); 493 } 494 495 @Override 496 public void removeAllMessages(ConnectionContext context) throws IOException { 497 KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand(); 498 command.setDestination(dest); 499 store(command, true, null, null); 500 } 501 502 @Override 503 public Message getMessage(MessageId identity) throws IOException { 504 final String key = identity.toProducerKey(); 505 506 // Hopefully one day the page file supports concurrent read 507 // operations... but for now we must 508 // externally synchronize... 509 Location location; 510 indexLock.writeLock().lock(); 511 try { 512 location = findMessageLocation(key, dest); 513 } finally { 514 indexLock.writeLock().unlock(); 515 } 516 if (location == null) { 517 return null; 518 } 519 520 return loadMessage(location); 521 } 522 523 @Override 524 public boolean isEmpty() throws IOException { 525 indexLock.writeLock().lock(); 526 try { 527 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() { 528 @Override 529 public Boolean execute(Transaction tx) throws IOException { 530 // Iterate through all index entries to get a count of 531 // messages in the destination. 532 StoredDestination sd = getStoredDestination(dest, tx); 533 return sd.locationIndex.isEmpty(tx); 534 } 535 }); 536 } finally { 537 indexLock.writeLock().unlock(); 538 } 539 } 540 541 @Override 542 public void recover(final MessageRecoveryListener listener) throws Exception { 543 // recovery may involve expiry which will modify 544 indexLock.writeLock().lock(); 545 try { 546 pageFile.tx().execute(new Transaction.Closure<Exception>() { 547 @Override 548 public void execute(Transaction tx) throws Exception { 549 StoredDestination sd = getStoredDestination(dest, tx); 550 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 551 sd.orderIndex.resetCursorPosition(); 552 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator 553 .hasNext(); ) { 554 Entry<Long, MessageKeys> entry = iterator.next(); 555 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 556 continue; 557 } 558 Message msg = loadMessage(entry.getValue().location); 559 listener.recoverMessage(msg); 560 } 561 } 562 }); 563 } finally { 564 indexLock.writeLock().unlock(); 565 } 566 } 567 568 @Override 569 public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { 570 indexLock.writeLock().lock(); 571 try { 572 pageFile.tx().execute(new Transaction.Closure<Exception>() { 573 @Override 574 public void execute(Transaction tx) throws Exception { 575 StoredDestination sd = getStoredDestination(dest, tx); 576 Entry<Long, MessageKeys> entry = null; 577 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 578 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { 579 entry = iterator.next(); 580 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 581 continue; 582 } 583 Message msg = loadMessage(entry.getValue().location); 584 msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); 585 listener.recoverMessage(msg); 586 counter++; 587 if (counter >= maxReturned) { 588 break; 589 } 590 } 591 sd.orderIndex.stoppedIterating(); 592 } 593 }); 594 } finally { 595 indexLock.writeLock().unlock(); 596 } 597 } 598 599 protected int recoverRolledBackAcks(StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { 600 int counter = 0; 601 String id; 602 for (Iterator<String> iterator = rolledBackAcks.iterator(); iterator.hasNext(); ) { 603 id = iterator.next(); 604 iterator.remove(); 605 Long sequence = sd.messageIdIndex.get(tx, id); 606 if (sequence != null) { 607 if (sd.orderIndex.alreadyDispatched(sequence)) { 608 listener.recoverMessage(loadMessage(sd.orderIndex.get(tx, sequence).location)); 609 counter++; 610 if (counter >= maxReturned) { 611 break; 612 } 613 } else { 614 LOG.info("rolledback ack message {} with seq {} will be picked up in future batch {}", id, sequence, sd.orderIndex.cursor); 615 } 616 } else { 617 LOG.warn("Failed to locate rolled back ack message {} in {}", id, sd); 618 } 619 } 620 return counter; 621 } 622 623 624 @Override 625 public void resetBatching() { 626 if (pageFile.isLoaded()) { 627 indexLock.writeLock().lock(); 628 try { 629 pageFile.tx().execute(new Transaction.Closure<Exception>() { 630 @Override 631 public void execute(Transaction tx) throws Exception { 632 StoredDestination sd = getExistingStoredDestination(dest, tx); 633 if (sd != null) { 634 sd.orderIndex.resetCursorPosition();} 635 } 636 }); 637 } catch (Exception e) { 638 LOG.error("Failed to reset batching",e); 639 } finally { 640 indexLock.writeLock().unlock(); 641 } 642 } 643 } 644 645 @Override 646 public void setBatch(final MessageId identity) throws IOException { 647 indexLock.writeLock().lock(); 648 try { 649 pageFile.tx().execute(new Transaction.Closure<IOException>() { 650 @Override 651 public void execute(Transaction tx) throws IOException { 652 StoredDestination sd = getStoredDestination(dest, tx); 653 Long location = (Long) identity.getFutureOrSequenceLong(); 654 Long pending = sd.orderIndex.minPendingAdd(); 655 if (pending != null) { 656 location = Math.min(location, pending-1); 657 } 658 sd.orderIndex.setBatch(tx, location); 659 } 660 }); 661 } finally { 662 indexLock.writeLock().unlock(); 663 } 664 } 665 666 @Override 667 public void setMemoryUsage(MemoryUsage memoryUsage) { 668 } 669 @Override 670 public void start() throws Exception { 671 super.start(); 672 } 673 @Override 674 public void stop() throws Exception { 675 super.stop(); 676 } 677 678 protected void lockAsyncJobQueue() { 679 try { 680 if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { 681 throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); 682 } 683 } catch (Exception e) { 684 LOG.error("Failed to lock async jobs for " + this.destination, e); 685 } 686 } 687 688 protected void unlockAsyncJobQueue() { 689 this.localDestinationSemaphore.release(this.maxAsyncJobs); 690 } 691 692 protected void acquireLocalAsyncLock() { 693 try { 694 this.localDestinationSemaphore.acquire(); 695 } catch (InterruptedException e) { 696 LOG.error("Failed to aquire async lock for " + this.destination, e); 697 } 698 } 699 700 protected void releaseLocalAsyncLock() { 701 this.localDestinationSemaphore.release(); 702 } 703 704 @Override 705 public String toString(){ 706 return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest)); 707 } 708 709 @Override 710 protected void recoverMessageStoreStatistics() throws IOException { 711 try { 712 MessageStoreStatistics recoveredStatistics; 713 lockAsyncJobQueue(); 714 indexLock.writeLock().lock(); 715 try { 716 recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() { 717 @Override 718 public MessageStoreStatistics execute(Transaction tx) throws IOException { 719 MessageStoreStatistics statistics = new MessageStoreStatistics(); 720 721 // Iterate through all index entries to get the size of each message 722 StoredDestination sd = getStoredDestination(dest, tx); 723 for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { 724 int locationSize = iterator.next().getKey().getSize(); 725 statistics.getMessageCount().increment(); 726 statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); 727 } 728 return statistics; 729 } 730 }); 731 getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); 732 getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); 733 } finally { 734 indexLock.writeLock().unlock(); 735 } 736 } finally { 737 unlockAsyncJobQueue(); 738 } 739 } 740 } 741 742 class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore { 743 private final AtomicInteger subscriptionCount = new AtomicInteger(); 744 public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException { 745 super(destination); 746 this.subscriptionCount.set(getAllSubscriptions().length); 747 if (isConcurrentStoreAndDispatchTopics()) { 748 asyncTopicMaps.add(asyncTaskMap); 749 } 750 } 751 752 @Override 753 public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) 754 throws IOException { 755 if (isConcurrentStoreAndDispatchTopics()) { 756 message.beforeMarshall(wireFormat); 757 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); 758 result.aquireLocks(); 759 addTopicTask(this, result); 760 return result.getFuture(); 761 } else { 762 return super.asyncAddTopicMessage(context, message); 763 } 764 } 765 766 @Override 767 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 768 MessageId messageId, MessageAck ack) throws IOException { 769 String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); 770 if (isConcurrentStoreAndDispatchTopics()) { 771 AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); 772 StoreTopicTask task = null; 773 synchronized (asyncTaskMap) { 774 task = (StoreTopicTask) asyncTaskMap.get(key); 775 } 776 if (task != null) { 777 if (task.addSubscriptionKey(subscriptionKey)) { 778 removeTopicTask(this, messageId); 779 if (task.cancel()) { 780 synchronized (asyncTaskMap) { 781 asyncTaskMap.remove(key); 782 } 783 } 784 } 785 } else { 786 doAcknowledge(context, subscriptionKey, messageId, ack); 787 } 788 } else { 789 doAcknowledge(context, subscriptionKey, messageId, ack); 790 } 791 } 792 793 protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) 794 throws IOException { 795 KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); 796 command.setDestination(dest); 797 command.setSubscriptionKey(subscriptionKey); 798 command.setMessageId(messageId.toProducerKey()); 799 command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null); 800 if (ack != null && ack.isUnmatchedAck()) { 801 command.setAck(UNMATCHED); 802 } else { 803 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack); 804 command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 805 } 806 store(command, false, null, null); 807 } 808 809 @Override 810 public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { 811 String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo 812 .getSubscriptionName()); 813 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 814 command.setDestination(dest); 815 command.setSubscriptionKey(subscriptionKey.toString()); 816 command.setRetroactive(retroactive); 817 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); 818 command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); 819 store(command, isEnableJournalDiskSyncs() && true, null, null); 820 this.subscriptionCount.incrementAndGet(); 821 } 822 823 @Override 824 public void deleteSubscription(String clientId, String subscriptionName) throws IOException { 825 KahaSubscriptionCommand command = new KahaSubscriptionCommand(); 826 command.setDestination(dest); 827 command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); 828 store(command, isEnableJournalDiskSyncs() && true, null, null); 829 this.subscriptionCount.decrementAndGet(); 830 } 831 832 @Override 833 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 834 835 final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>(); 836 indexLock.writeLock().lock(); 837 try { 838 pageFile.tx().execute(new Transaction.Closure<IOException>() { 839 @Override 840 public void execute(Transaction tx) throws IOException { 841 StoredDestination sd = getStoredDestination(dest, tx); 842 for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator 843 .hasNext();) { 844 Entry<String, KahaSubscriptionCommand> entry = iterator.next(); 845 SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry 846 .getValue().getSubscriptionInfo().newInput())); 847 subscriptions.add(info); 848 849 } 850 } 851 }); 852 } finally { 853 indexLock.writeLock().unlock(); 854 } 855 856 SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; 857 subscriptions.toArray(rc); 858 return rc; 859 } 860 861 @Override 862 public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 863 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 864 indexLock.writeLock().lock(); 865 try { 866 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() { 867 @Override 868 public SubscriptionInfo execute(Transaction tx) throws IOException { 869 StoredDestination sd = getStoredDestination(dest, tx); 870 KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); 871 if (command == null) { 872 return null; 873 } 874 return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command 875 .getSubscriptionInfo().newInput())); 876 } 877 }); 878 } finally { 879 indexLock.writeLock().unlock(); 880 } 881 } 882 883 @Override 884 public int getMessageCount(String clientId, String subscriptionName) throws IOException { 885 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 886 indexLock.writeLock().lock(); 887 try { 888 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 889 @Override 890 public Integer execute(Transaction tx) throws IOException { 891 StoredDestination sd = getStoredDestination(dest, tx); 892 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 893 if (cursorPos == null) { 894 // The subscription might not exist. 895 return 0; 896 } 897 898 return (int) getStoredMessageCount(tx, sd, subscriptionKey); 899 } 900 }); 901 } finally { 902 indexLock.writeLock().unlock(); 903 } 904 } 905 906 907 @Override 908 public long getMessageSize(String clientId, String subscriptionName) throws IOException { 909 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 910 indexLock.writeLock().lock(); 911 try { 912 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() { 913 @Override 914 public Integer execute(Transaction tx) throws IOException { 915 StoredDestination sd = getStoredDestination(dest, tx); 916 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 917 if (cursorPos == null) { 918 // The subscription might not exist. 919 return 0; 920 } 921 922 return (int) getStoredMessageSize(tx, sd, subscriptionKey); 923 } 924 }); 925 } finally { 926 indexLock.writeLock().unlock(); 927 } 928 } 929 930 @Override 931 public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) 932 throws Exception { 933 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 934 @SuppressWarnings("unused") 935 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 936 indexLock.writeLock().lock(); 937 try { 938 pageFile.tx().execute(new Transaction.Closure<Exception>() { 939 @Override 940 public void execute(Transaction tx) throws Exception { 941 StoredDestination sd = getStoredDestination(dest, tx); 942 LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); 943 sd.orderIndex.setBatch(tx, cursorPos); 944 recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener); 945 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator 946 .hasNext();) { 947 Entry<Long, MessageKeys> entry = iterator.next(); 948 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 949 continue; 950 } 951 listener.recoverMessage(loadMessage(entry.getValue().location)); 952 } 953 sd.orderIndex.resetCursorPosition(); 954 } 955 }); 956 } finally { 957 indexLock.writeLock().unlock(); 958 } 959 } 960 961 @Override 962 public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, 963 final MessageRecoveryListener listener) throws Exception { 964 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 965 @SuppressWarnings("unused") 966 final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); 967 indexLock.writeLock().lock(); 968 try { 969 pageFile.tx().execute(new Transaction.Closure<Exception>() { 970 @Override 971 public void execute(Transaction tx) throws Exception { 972 StoredDestination sd = getStoredDestination(dest, tx); 973 sd.orderIndex.resetCursorPosition(); 974 MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); 975 if (moc == null) { 976 LastAck pos = getLastAck(tx, sd, subscriptionKey); 977 if (pos == null) { 978 // sub deleted 979 return; 980 } 981 sd.orderIndex.setBatch(tx, pos); 982 moc = sd.orderIndex.cursor; 983 } else { 984 sd.orderIndex.cursor.sync(moc); 985 } 986 987 Entry<Long, MessageKeys> entry = null; 988 int counter = recoverRolledBackAcks(sd, tx, maxReturned, listener); 989 for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator 990 .hasNext();) { 991 entry = iterator.next(); 992 if (ackedAndPrepared.contains(entry.getValue().messageId)) { 993 continue; 994 } 995 if (listener.recoverMessage(loadMessage(entry.getValue().location))) { 996 counter++; 997 } 998 if (counter >= maxReturned || listener.hasSpace() == false) { 999 break; 1000 } 1001 } 1002 sd.orderIndex.stoppedIterating(); 1003 if (entry != null) { 1004 MessageOrderCursor copy = sd.orderIndex.cursor.copy(); 1005 sd.subscriptionCursors.put(subscriptionKey, copy); 1006 } 1007 } 1008 }); 1009 } finally { 1010 indexLock.writeLock().unlock(); 1011 } 1012 } 1013 1014 @Override 1015 public void resetBatching(String clientId, String subscriptionName) { 1016 try { 1017 final String subscriptionKey = subscriptionKey(clientId, subscriptionName); 1018 indexLock.writeLock().lock(); 1019 try { 1020 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1021 @Override 1022 public void execute(Transaction tx) throws IOException { 1023 StoredDestination sd = getStoredDestination(dest, tx); 1024 sd.subscriptionCursors.remove(subscriptionKey); 1025 } 1026 }); 1027 }finally { 1028 indexLock.writeLock().unlock(); 1029 } 1030 } catch (IOException e) { 1031 throw new RuntimeException(e); 1032 } 1033 } 1034 } 1035 1036 String subscriptionKey(String clientId, String subscriptionName) { 1037 return clientId + ":" + subscriptionName; 1038 } 1039 1040 @Override 1041 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 1042 String key = key(convert(destination)); 1043 MessageStore store = storeCache.get(key(convert(destination))); 1044 if (store == null) { 1045 final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); 1046 store = storeCache.putIfAbsent(key, queueStore); 1047 if (store == null) { 1048 store = queueStore; 1049 } 1050 } 1051 1052 return store; 1053 } 1054 1055 @Override 1056 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 1057 String key = key(convert(destination)); 1058 MessageStore store = storeCache.get(key(convert(destination))); 1059 if (store == null) { 1060 final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); 1061 store = storeCache.putIfAbsent(key, topicStore); 1062 if (store == null) { 1063 store = topicStore; 1064 } 1065 } 1066 1067 return (TopicMessageStore) store; 1068 } 1069 1070 /** 1071 * Cleanup method to remove any state associated with the given destination. 1072 * This method does not stop the message store (it might not be cached). 1073 * 1074 * @param destination 1075 * Destination to forget 1076 */ 1077 @Override 1078 public void removeQueueMessageStore(ActiveMQQueue destination) { 1079 } 1080 1081 /** 1082 * Cleanup method to remove any state associated with the given destination 1083 * This method does not stop the message store (it might not be cached). 1084 * 1085 * @param destination 1086 * Destination to forget 1087 */ 1088 @Override 1089 public void removeTopicMessageStore(ActiveMQTopic destination) { 1090 } 1091 1092 @Override 1093 public void deleteAllMessages() throws IOException { 1094 deleteAllMessages = true; 1095 } 1096 1097 @Override 1098 public Set<ActiveMQDestination> getDestinations() { 1099 try { 1100 final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 1101 indexLock.writeLock().lock(); 1102 try { 1103 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1104 @Override 1105 public void execute(Transaction tx) throws IOException { 1106 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator 1107 .hasNext();) { 1108 Entry<String, StoredDestination> entry = iterator.next(); 1109 //Removing isEmpty topic check - see AMQ-5875 1110 rc.add(convert(entry.getKey())); 1111 } 1112 } 1113 }); 1114 }finally { 1115 indexLock.writeLock().unlock(); 1116 } 1117 return rc; 1118 } catch (IOException e) { 1119 throw new RuntimeException(e); 1120 } 1121 } 1122 1123 @Override 1124 public long getLastMessageBrokerSequenceId() throws IOException { 1125 return 0; 1126 } 1127 1128 @Override 1129 public long getLastProducerSequenceId(ProducerId id) { 1130 indexLock.writeLock().lock(); 1131 try { 1132 return metadata.producerSequenceIdTracker.getLastSeqId(id); 1133 } finally { 1134 indexLock.writeLock().unlock(); 1135 } 1136 } 1137 1138 @Override 1139 public long size() { 1140 try { 1141 return journalSize.get() + getPageFile().getDiskSize(); 1142 } catch (IOException e) { 1143 throw new RuntimeException(e); 1144 } 1145 } 1146 1147 @Override 1148 public void beginTransaction(ConnectionContext context) throws IOException { 1149 throw new IOException("Not yet implemented."); 1150 } 1151 @Override 1152 public void commitTransaction(ConnectionContext context) throws IOException { 1153 throw new IOException("Not yet implemented."); 1154 } 1155 @Override 1156 public void rollbackTransaction(ConnectionContext context) throws IOException { 1157 throw new IOException("Not yet implemented."); 1158 } 1159 1160 @Override 1161 public void checkpoint(boolean sync) throws IOException { 1162 super.checkpointCleanup(sync); 1163 } 1164 1165 // ///////////////////////////////////////////////////////////////// 1166 // Internal helper methods. 1167 // ///////////////////////////////////////////////////////////////// 1168 1169 /** 1170 * @param location 1171 * @return 1172 * @throws IOException 1173 */ 1174 Message loadMessage(Location location) throws IOException { 1175 JournalCommand<?> command = load(location); 1176 KahaAddMessageCommand addMessage = null; 1177 switch (command.type()) { 1178 case KAHA_UPDATE_MESSAGE_COMMAND: 1179 addMessage = ((KahaUpdateMessageCommand)command).getMessage(); 1180 break; 1181 default: 1182 addMessage = (KahaAddMessageCommand) command; 1183 } 1184 Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); 1185 return msg; 1186 } 1187 1188 // ///////////////////////////////////////////////////////////////// 1189 // Internal conversion methods. 1190 // ///////////////////////////////////////////////////////////////// 1191 1192 KahaLocation convert(Location location) { 1193 KahaLocation rc = new KahaLocation(); 1194 rc.setLogId(location.getDataFileId()); 1195 rc.setOffset(location.getOffset()); 1196 return rc; 1197 } 1198 1199 KahaDestination convert(ActiveMQDestination dest) { 1200 KahaDestination rc = new KahaDestination(); 1201 rc.setName(dest.getPhysicalName()); 1202 switch (dest.getDestinationType()) { 1203 case ActiveMQDestination.QUEUE_TYPE: 1204 rc.setType(DestinationType.QUEUE); 1205 return rc; 1206 case ActiveMQDestination.TOPIC_TYPE: 1207 rc.setType(DestinationType.TOPIC); 1208 return rc; 1209 case ActiveMQDestination.TEMP_QUEUE_TYPE: 1210 rc.setType(DestinationType.TEMP_QUEUE); 1211 return rc; 1212 case ActiveMQDestination.TEMP_TOPIC_TYPE: 1213 rc.setType(DestinationType.TEMP_TOPIC); 1214 return rc; 1215 default: 1216 return null; 1217 } 1218 } 1219 1220 ActiveMQDestination convert(String dest) { 1221 int p = dest.indexOf(":"); 1222 if (p < 0) { 1223 throw new IllegalArgumentException("Not in the valid destination format"); 1224 } 1225 int type = Integer.parseInt(dest.substring(0, p)); 1226 String name = dest.substring(p + 1); 1227 return convert(type, name); 1228 } 1229 1230 private ActiveMQDestination convert(KahaDestination commandDestination) { 1231 return convert(commandDestination.getType().getNumber(), commandDestination.getName()); 1232 } 1233 1234 private ActiveMQDestination convert(int type, String name) { 1235 switch (KahaDestination.DestinationType.valueOf(type)) { 1236 case QUEUE: 1237 return new ActiveMQQueue(name); 1238 case TOPIC: 1239 return new ActiveMQTopic(name); 1240 case TEMP_QUEUE: 1241 return new ActiveMQTempQueue(name); 1242 case TEMP_TOPIC: 1243 return new ActiveMQTempTopic(name); 1244 default: 1245 throw new IllegalArgumentException("Not in the valid destination format"); 1246 } 1247 } 1248 1249 public TransactionIdTransformer getTransactionIdTransformer() { 1250 return transactionIdTransformer; 1251 } 1252 1253 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 1254 this.transactionIdTransformer = transactionIdTransformer; 1255 } 1256 1257 static class AsyncJobKey { 1258 MessageId id; 1259 ActiveMQDestination destination; 1260 1261 AsyncJobKey(MessageId id, ActiveMQDestination destination) { 1262 this.id = id; 1263 this.destination = destination; 1264 } 1265 1266 @Override 1267 public boolean equals(Object obj) { 1268 if (obj == this) { 1269 return true; 1270 } 1271 return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id) 1272 && destination.equals(((AsyncJobKey) obj).destination); 1273 } 1274 1275 @Override 1276 public int hashCode() { 1277 return id.hashCode() + destination.hashCode(); 1278 } 1279 1280 @Override 1281 public String toString() { 1282 return destination.getPhysicalName() + "-" + id; 1283 } 1284 } 1285 1286 public interface StoreTask { 1287 public boolean cancel(); 1288 1289 public void aquireLocks(); 1290 1291 public void releaseLocks(); 1292 } 1293 1294 class StoreQueueTask implements Runnable, StoreTask { 1295 protected final Message message; 1296 protected final ConnectionContext context; 1297 protected final KahaDBMessageStore store; 1298 protected final InnerFutureTask future; 1299 protected final AtomicBoolean done = new AtomicBoolean(); 1300 protected final AtomicBoolean locked = new AtomicBoolean(); 1301 1302 public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) { 1303 this.store = store; 1304 this.context = context; 1305 this.message = message; 1306 this.future = new InnerFutureTask(this); 1307 } 1308 1309 public ListenableFuture<Object> getFuture() { 1310 return this.future; 1311 } 1312 1313 @Override 1314 public boolean cancel() { 1315 if (this.done.compareAndSet(false, true)) { 1316 return this.future.cancel(false); 1317 } 1318 return false; 1319 } 1320 1321 @Override 1322 public void aquireLocks() { 1323 if (this.locked.compareAndSet(false, true)) { 1324 try { 1325 globalQueueSemaphore.acquire(); 1326 store.acquireLocalAsyncLock(); 1327 message.incrementReferenceCount(); 1328 } catch (InterruptedException e) { 1329 LOG.warn("Failed to aquire lock", e); 1330 } 1331 } 1332 1333 } 1334 1335 @Override 1336 public void releaseLocks() { 1337 if (this.locked.compareAndSet(true, false)) { 1338 store.releaseLocalAsyncLock(); 1339 globalQueueSemaphore.release(); 1340 message.decrementReferenceCount(); 1341 } 1342 } 1343 1344 @Override 1345 public void run() { 1346 this.store.doneTasks++; 1347 try { 1348 if (this.done.compareAndSet(false, true)) { 1349 this.store.addMessage(context, message); 1350 removeQueueTask(this.store, this.message.getMessageId()); 1351 this.future.complete(); 1352 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1353 System.err.println(this.store.dest.getName() + " cancelled: " 1354 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1355 this.store.canceledTasks = this.store.doneTasks = 0; 1356 } 1357 } catch (Exception e) { 1358 this.future.setException(e); 1359 } 1360 } 1361 1362 protected Message getMessage() { 1363 return this.message; 1364 } 1365 1366 private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { 1367 1368 private Runnable listener; 1369 public InnerFutureTask(Runnable runnable) { 1370 super(runnable, null); 1371 1372 } 1373 1374 public void setException(final Exception e) { 1375 super.setException(e); 1376 } 1377 1378 public void complete() { 1379 super.set(null); 1380 } 1381 1382 @Override 1383 public void done() { 1384 fireListener(); 1385 } 1386 1387 @Override 1388 public void addListener(Runnable listener) { 1389 this.listener = listener; 1390 if (isDone()) { 1391 fireListener(); 1392 } 1393 } 1394 1395 private void fireListener() { 1396 if (listener != null) { 1397 try { 1398 listener.run(); 1399 } catch (Exception ignored) { 1400 LOG.warn("Unexpected exception from future {} listener callback {}", this, listener, ignored); 1401 } 1402 } 1403 } 1404 } 1405 } 1406 1407 class StoreTopicTask extends StoreQueueTask { 1408 private final int subscriptionCount; 1409 private final List<String> subscriptionKeys = new ArrayList<String>(1); 1410 private final KahaDBTopicMessageStore topicStore; 1411 public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, 1412 int subscriptionCount) { 1413 super(store, context, message); 1414 this.topicStore = store; 1415 this.subscriptionCount = subscriptionCount; 1416 1417 } 1418 1419 @Override 1420 public void aquireLocks() { 1421 if (this.locked.compareAndSet(false, true)) { 1422 try { 1423 globalTopicSemaphore.acquire(); 1424 store.acquireLocalAsyncLock(); 1425 message.incrementReferenceCount(); 1426 } catch (InterruptedException e) { 1427 LOG.warn("Failed to aquire lock", e); 1428 } 1429 } 1430 } 1431 1432 @Override 1433 public void releaseLocks() { 1434 if (this.locked.compareAndSet(true, false)) { 1435 message.decrementReferenceCount(); 1436 store.releaseLocalAsyncLock(); 1437 globalTopicSemaphore.release(); 1438 } 1439 } 1440 1441 /** 1442 * add a key 1443 * 1444 * @param key 1445 * @return true if all acknowledgements received 1446 */ 1447 public boolean addSubscriptionKey(String key) { 1448 synchronized (this.subscriptionKeys) { 1449 this.subscriptionKeys.add(key); 1450 } 1451 return this.subscriptionKeys.size() >= this.subscriptionCount; 1452 } 1453 1454 @Override 1455 public void run() { 1456 this.store.doneTasks++; 1457 try { 1458 if (this.done.compareAndSet(false, true)) { 1459 this.topicStore.addMessage(context, message); 1460 // apply any acks we have 1461 synchronized (this.subscriptionKeys) { 1462 for (String key : this.subscriptionKeys) { 1463 this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); 1464 1465 } 1466 } 1467 removeTopicTask(this.topicStore, this.message.getMessageId()); 1468 this.future.complete(); 1469 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) { 1470 System.err.println(this.store.dest.getName() + " cancelled: " 1471 + (this.store.canceledTasks / this.store.doneTasks) * 100); 1472 this.store.canceledTasks = this.store.doneTasks = 0; 1473 } 1474 } catch (Exception e) { 1475 this.future.setException(e); 1476 } 1477 } 1478 } 1479 1480 public class StoreTaskExecutor extends ThreadPoolExecutor { 1481 1482 public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) { 1483 super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); 1484 } 1485 1486 @Override 1487 protected void afterExecute(Runnable runnable, Throwable throwable) { 1488 super.afterExecute(runnable, throwable); 1489 1490 if (runnable instanceof StoreTask) { 1491 ((StoreTask)runnable).releaseLocks(); 1492 } 1493 } 1494 } 1495 1496 @Override 1497 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 1498 return new JobSchedulerStoreImpl(); 1499 } 1500}