001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.EOFException; 024import java.io.File; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InterruptedIOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.OutputStream; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.LinkedHashMap; 040import java.util.LinkedHashSet; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.SortedSet; 047import java.util.TreeMap; 048import java.util.TreeSet; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ConcurrentMap; 051import java.util.concurrent.Executors; 052import java.util.concurrent.ScheduledExecutorService; 053import java.util.concurrent.ThreadFactory; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.concurrent.atomic.AtomicLong; 057import java.util.concurrent.locks.ReentrantReadWriteLock; 058 059import org.apache.activemq.ActiveMQMessageAuditNoSync; 060import org.apache.activemq.broker.BrokerService; 061import org.apache.activemq.broker.BrokerServiceAware; 062import org.apache.activemq.broker.region.Destination; 063import org.apache.activemq.broker.region.Queue; 064import org.apache.activemq.broker.region.Topic; 065import org.apache.activemq.command.MessageAck; 066import org.apache.activemq.command.TransactionId; 067import org.apache.activemq.openwire.OpenWireFormat; 068import org.apache.activemq.protobuf.Buffer; 069import org.apache.activemq.store.MessageStore; 070import org.apache.activemq.store.MessageStoreStatistics; 071import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 072import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 073import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 074import org.apache.activemq.store.kahadb.data.KahaDestination; 075import org.apache.activemq.store.kahadb.data.KahaEntryType; 076import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 077import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 078import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 079import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 080import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 081import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 082import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 083import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 084import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 085import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 086import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 087import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 088import org.apache.activemq.store.kahadb.disk.index.ListIndex; 089import org.apache.activemq.store.kahadb.disk.journal.DataFile; 090import org.apache.activemq.store.kahadb.disk.journal.Journal; 091import org.apache.activemq.store.kahadb.disk.journal.Location; 092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 093import org.apache.activemq.store.kahadb.disk.page.Page; 094import org.apache.activemq.store.kahadb.disk.page.PageFile; 095import org.apache.activemq.store.kahadb.disk.page.Transaction; 096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 098import org.apache.activemq.store.kahadb.disk.util.Marshaller; 099import org.apache.activemq.store.kahadb.disk.util.Sequence; 100import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 103import org.apache.activemq.util.ByteSequence; 104import org.apache.activemq.util.DataByteArrayInputStream; 105import org.apache.activemq.util.DataByteArrayOutputStream; 106import org.apache.activemq.util.IOExceptionSupport; 107import org.apache.activemq.util.IOHelper; 108import org.apache.activemq.util.ServiceStopper; 109import org.apache.activemq.util.ServiceSupport; 110import org.apache.activemq.util.ThreadPoolUtils; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113 114public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 115 116 protected BrokerService brokerService; 117 118 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 119 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 120 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 121 protected static final Buffer UNMATCHED; 122 static { 123 UNMATCHED = new Buffer(new byte[]{}); 124 } 125 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 126 127 static final int CLOSED_STATE = 1; 128 static final int OPEN_STATE = 2; 129 static final long NOT_ACKED = -1; 130 131 static final int VERSION = 6; 132 133 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 134 135 protected class Metadata { 136 protected Page<Metadata> page; 137 protected int state; 138 protected BTreeIndex<String, StoredDestination> destinations; 139 protected Location lastUpdate; 140 protected Location firstInProgressTransactionLocation; 141 protected Location producerSequenceIdTrackerLocation = null; 142 protected Location ackMessageFileMapLocation = null; 143 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 144 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 145 protected int version = VERSION; 146 protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 147 148 public void read(DataInput is) throws IOException { 149 state = is.readInt(); 150 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 151 if (is.readBoolean()) { 152 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 153 } else { 154 lastUpdate = null; 155 } 156 if (is.readBoolean()) { 157 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 158 } else { 159 firstInProgressTransactionLocation = null; 160 } 161 try { 162 if (is.readBoolean()) { 163 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 164 } else { 165 producerSequenceIdTrackerLocation = null; 166 } 167 } catch (EOFException expectedOnUpgrade) { 168 } 169 try { 170 version = is.readInt(); 171 } catch (EOFException expectedOnUpgrade) { 172 version = 1; 173 } 174 if (version >= 5 && is.readBoolean()) { 175 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 176 } else { 177 ackMessageFileMapLocation = null; 178 } 179 try { 180 openwireVersion = is.readInt(); 181 } catch (EOFException expectedOnUpgrade) { 182 openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; 183 } 184 LOG.info("KahaDB is version " + version); 185 } 186 187 public void write(DataOutput os) throws IOException { 188 os.writeInt(state); 189 os.writeLong(destinations.getPageId()); 190 191 if (lastUpdate != null) { 192 os.writeBoolean(true); 193 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 194 } else { 195 os.writeBoolean(false); 196 } 197 198 if (firstInProgressTransactionLocation != null) { 199 os.writeBoolean(true); 200 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 201 } else { 202 os.writeBoolean(false); 203 } 204 205 if (producerSequenceIdTrackerLocation != null) { 206 os.writeBoolean(true); 207 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 208 } else { 209 os.writeBoolean(false); 210 } 211 os.writeInt(VERSION); 212 if (ackMessageFileMapLocation != null) { 213 os.writeBoolean(true); 214 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 215 } else { 216 os.writeBoolean(false); 217 } 218 os.writeInt(this.openwireVersion); 219 } 220 } 221 222 class MetadataMarshaller extends VariableMarshaller<Metadata> { 223 @Override 224 public Metadata readPayload(DataInput dataIn) throws IOException { 225 Metadata rc = createMetadata(); 226 rc.read(dataIn); 227 return rc; 228 } 229 230 @Override 231 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 232 object.write(dataOut); 233 } 234 } 235 236 protected PageFile pageFile; 237 protected Journal journal; 238 protected Metadata metadata = new Metadata(); 239 240 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 241 242 protected boolean failIfDatabaseIsLocked; 243 244 protected boolean deleteAllMessages; 245 protected File directory = DEFAULT_DIRECTORY; 246 protected File indexDirectory = null; 247 protected ScheduledExecutorService scheduler; 248 private final Object schedulerLock = new Object(); 249 250 protected boolean enableJournalDiskSyncs = true; 251 protected boolean archiveDataLogs; 252 protected File directoryArchive; 253 protected AtomicLong journalSize = new AtomicLong(0); 254 long checkpointInterval = 5*1000; 255 long cleanupInterval = 30*1000; 256 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 257 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 258 boolean enableIndexWriteAsync = false; 259 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 260 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 261 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 262 263 protected AtomicBoolean opened = new AtomicBoolean(); 264 private boolean ignoreMissingJournalfiles = false; 265 private int indexCacheSize = 10000; 266 private boolean checkForCorruptJournalFiles = false; 267 private boolean checksumJournalFiles = true; 268 protected boolean forceRecoverIndex = false; 269 private boolean archiveCorruptedIndex = false; 270 private boolean useIndexLFRUEviction = false; 271 private float indexLFUEvictionFactor = 0.2f; 272 private boolean enableIndexDiskSyncs = true; 273 private boolean enableIndexRecoveryFile = true; 274 private boolean enableIndexPageCaching = true; 275 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 276 277 private boolean enableAckCompaction = false; 278 private int compactAcksAfterNoGC = 10; 279 private boolean compactAcksIgnoresStoreGrowth = false; 280 private int checkPointCyclesWithNoGC; 281 private int journalLogOnLastCompactionCheck; 282 283 @Override 284 public void doStart() throws Exception { 285 load(); 286 } 287 288 @Override 289 public void doStop(ServiceStopper stopper) throws Exception { 290 unload(); 291 } 292 293 private void loadPageFile() throws IOException { 294 this.indexLock.writeLock().lock(); 295 try { 296 final PageFile pageFile = getPageFile(); 297 pageFile.load(); 298 pageFile.tx().execute(new Transaction.Closure<IOException>() { 299 @Override 300 public void execute(Transaction tx) throws IOException { 301 if (pageFile.getPageCount() == 0) { 302 // First time this is created.. Initialize the metadata 303 Page<Metadata> page = tx.allocate(); 304 assert page.getPageId() == 0; 305 page.set(metadata); 306 metadata.page = page; 307 metadata.state = CLOSED_STATE; 308 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 309 310 tx.store(metadata.page, metadataMarshaller, true); 311 } else { 312 Page<Metadata> page = tx.load(0, metadataMarshaller); 313 metadata = page.get(); 314 metadata.page = page; 315 } 316 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 317 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 318 metadata.destinations.load(tx); 319 } 320 }); 321 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 322 // Perhaps we should just keep an index of file 323 storedDestinations.clear(); 324 pageFile.tx().execute(new Transaction.Closure<IOException>() { 325 @Override 326 public void execute(Transaction tx) throws IOException { 327 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 328 Entry<String, StoredDestination> entry = iterator.next(); 329 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 330 storedDestinations.put(entry.getKey(), sd); 331 332 if (checkForCorruptJournalFiles) { 333 // sanity check the index also 334 if (!entry.getValue().locationIndex.isEmpty(tx)) { 335 if (entry.getValue().orderIndex.nextMessageId <= 0) { 336 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 337 } 338 } 339 } 340 } 341 } 342 }); 343 pageFile.flush(); 344 } finally { 345 this.indexLock.writeLock().unlock(); 346 } 347 } 348 349 private void startCheckpoint() { 350 if (checkpointInterval == 0 && cleanupInterval == 0) { 351 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 352 return; 353 } 354 synchronized (schedulerLock) { 355 if (scheduler == null) { 356 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 357 358 @Override 359 public Thread newThread(Runnable r) { 360 Thread schedulerThread = new Thread(r); 361 362 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 363 schedulerThread.setDaemon(true); 364 365 return schedulerThread; 366 } 367 }); 368 369 // Short intervals for check-point and cleanups 370 long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 371 372 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 373 } 374 } 375 } 376 377 private final class CheckpointRunner implements Runnable { 378 379 private long lastCheckpoint = System.currentTimeMillis(); 380 private long lastCleanup = System.currentTimeMillis(); 381 382 @Override 383 public void run() { 384 try { 385 // Decide on cleanup vs full checkpoint here. 386 if (opened.get()) { 387 long now = System.currentTimeMillis(); 388 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 389 checkpointCleanup(true); 390 lastCleanup = now; 391 lastCheckpoint = now; 392 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 393 checkpointCleanup(false); 394 lastCheckpoint = now; 395 } 396 } 397 } catch (IOException ioe) { 398 LOG.error("Checkpoint failed", ioe); 399 brokerService.handleIOException(ioe); 400 } catch (Throwable e) { 401 LOG.error("Checkpoint failed", e); 402 brokerService.handleIOException(IOExceptionSupport.create(e)); 403 } 404 } 405 } 406 407 public void open() throws IOException { 408 if( opened.compareAndSet(false, true) ) { 409 getJournal().start(); 410 try { 411 loadPageFile(); 412 } catch (Throwable t) { 413 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 414 if (LOG.isDebugEnabled()) { 415 LOG.debug("Index load failure", t); 416 } 417 // try to recover index 418 try { 419 pageFile.unload(); 420 } catch (Exception ignore) {} 421 if (archiveCorruptedIndex) { 422 pageFile.archive(); 423 } else { 424 pageFile.delete(); 425 } 426 metadata = createMetadata(); 427 //The metadata was recreated after a detect corruption so we need to 428 //reconfigure anything that was configured on the old metadata on startup 429 configureMetadata(); 430 pageFile = null; 431 loadPageFile(); 432 } 433 startCheckpoint(); 434 recover(); 435 } 436 } 437 438 public void load() throws IOException { 439 this.indexLock.writeLock().lock(); 440 IOHelper.mkdirs(directory); 441 try { 442 if (deleteAllMessages) { 443 getJournal().start(); 444 getJournal().delete(); 445 getJournal().close(); 446 journal = null; 447 getPageFile().delete(); 448 LOG.info("Persistence store purged."); 449 deleteAllMessages = false; 450 } 451 452 open(); 453 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 454 } finally { 455 this.indexLock.writeLock().unlock(); 456 } 457 } 458 459 public void close() throws IOException, InterruptedException { 460 if( opened.compareAndSet(true, false)) { 461 checkpointLock.writeLock().lock(); 462 try { 463 if (metadata.page != null) { 464 checkpointUpdate(true); 465 } 466 pageFile.unload(); 467 metadata = createMetadata(); 468 } finally { 469 checkpointLock.writeLock().unlock(); 470 } 471 journal.close(); 472 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 473 // clear the cache and journalSize on shutdown of the store 474 storeCache.clear(); 475 journalSize.set(0); 476 } 477 } 478 479 public void unload() throws IOException, InterruptedException { 480 this.indexLock.writeLock().lock(); 481 try { 482 if( pageFile != null && pageFile.isLoaded() ) { 483 metadata.state = CLOSED_STATE; 484 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 485 486 if (metadata.page != null) { 487 pageFile.tx().execute(new Transaction.Closure<IOException>() { 488 @Override 489 public void execute(Transaction tx) throws IOException { 490 tx.store(metadata.page, metadataMarshaller, true); 491 } 492 }); 493 } 494 } 495 } finally { 496 this.indexLock.writeLock().unlock(); 497 } 498 close(); 499 } 500 501 // public for testing 502 @SuppressWarnings("rawtypes") 503 public Location[] getInProgressTxLocationRange() { 504 Location[] range = new Location[]{null, null}; 505 synchronized (inflightTransactions) { 506 if (!inflightTransactions.isEmpty()) { 507 for (List<Operation> ops : inflightTransactions.values()) { 508 if (!ops.isEmpty()) { 509 trackMaxAndMin(range, ops); 510 } 511 } 512 } 513 if (!preparedTransactions.isEmpty()) { 514 for (List<Operation> ops : preparedTransactions.values()) { 515 if (!ops.isEmpty()) { 516 trackMaxAndMin(range, ops); 517 } 518 } 519 } 520 } 521 return range; 522 } 523 524 @SuppressWarnings("rawtypes") 525 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 526 Location t = ops.get(0).getLocation(); 527 if (range[0] == null || t.compareTo(range[0]) <= 0) { 528 range[0] = t; 529 } 530 t = ops.get(ops.size() -1).getLocation(); 531 if (range[1] == null || t.compareTo(range[1]) >= 0) { 532 range[1] = t; 533 } 534 } 535 536 class TranInfo { 537 TransactionId id; 538 Location location; 539 540 class opCount { 541 int add; 542 int remove; 543 } 544 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 545 546 @SuppressWarnings("rawtypes") 547 public void track(Operation operation) { 548 if (location == null ) { 549 location = operation.getLocation(); 550 } 551 KahaDestination destination; 552 boolean isAdd = false; 553 if (operation instanceof AddOperation) { 554 AddOperation add = (AddOperation) operation; 555 destination = add.getCommand().getDestination(); 556 isAdd = true; 557 } else { 558 RemoveOperation removeOpperation = (RemoveOperation) operation; 559 destination = removeOpperation.getCommand().getDestination(); 560 } 561 opCount opCount = destinationOpCount.get(destination); 562 if (opCount == null) { 563 opCount = new opCount(); 564 destinationOpCount.put(destination, opCount); 565 } 566 if (isAdd) { 567 opCount.add++; 568 } else { 569 opCount.remove++; 570 } 571 } 572 573 @Override 574 public String toString() { 575 StringBuffer buffer = new StringBuffer(); 576 buffer.append(location).append(";").append(id).append(";\n"); 577 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 578 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 579 } 580 return buffer.toString(); 581 } 582 } 583 584 @SuppressWarnings("rawtypes") 585 public String getTransactions() { 586 587 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 588 synchronized (inflightTransactions) { 589 if (!inflightTransactions.isEmpty()) { 590 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 591 TranInfo info = new TranInfo(); 592 info.id = entry.getKey(); 593 for (Operation operation : entry.getValue()) { 594 info.track(operation); 595 } 596 infos.add(info); 597 } 598 } 599 } 600 synchronized (preparedTransactions) { 601 if (!preparedTransactions.isEmpty()) { 602 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 603 TranInfo info = new TranInfo(); 604 info.id = entry.getKey(); 605 for (Operation operation : entry.getValue()) { 606 info.track(operation); 607 } 608 infos.add(info); 609 } 610 } 611 } 612 return infos.toString(); 613 } 614 615 /** 616 * Move all the messages that were in the journal into long term storage. We 617 * just replay and do a checkpoint. 618 * 619 * @throws IOException 620 * @throws IOException 621 * @throws IllegalStateException 622 */ 623 private void recover() throws IllegalStateException, IOException { 624 this.indexLock.writeLock().lock(); 625 try { 626 627 long start = System.currentTimeMillis(); 628 Location producerAuditPosition = recoverProducerAudit(); 629 Location ackMessageFileLocation = recoverAckMessageFileMap(); 630 Location lastIndoubtPosition = getRecoveryPosition(); 631 632 Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); 633 recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); 634 635 if (recoveryPosition != null) { 636 int redoCounter = 0; 637 LOG.info("Recovering from the journal @" + recoveryPosition); 638 while (recoveryPosition != null) { 639 try { 640 JournalCommand<?> message = load(recoveryPosition); 641 metadata.lastUpdate = recoveryPosition; 642 process(message, recoveryPosition, lastIndoubtPosition); 643 redoCounter++; 644 } catch (IOException failedRecovery) { 645 if (isIgnoreMissingJournalfiles()) { 646 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 647 // track this dud location 648 journal.corruptRecoveryLocation(recoveryPosition); 649 } else { 650 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 651 } 652 } 653 recoveryPosition = journal.getNextLocation(recoveryPosition); 654 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 655 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 656 } 657 } 658 if (LOG.isInfoEnabled()) { 659 long end = System.currentTimeMillis(); 660 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 661 } 662 } 663 664 // We may have to undo some index updates. 665 pageFile.tx().execute(new Transaction.Closure<IOException>() { 666 @Override 667 public void execute(Transaction tx) throws IOException { 668 recoverIndex(tx); 669 } 670 }); 671 672 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 673 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 674 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 675 synchronized (inflightTransactions) { 676 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 677 TransactionId id = it.next(); 678 if (id.isLocalTransaction()) { 679 toRollback.add(id); 680 } else { 681 toDiscard.add(id); 682 } 683 } 684 for (TransactionId tx: toRollback) { 685 if (LOG.isDebugEnabled()) { 686 LOG.debug("rolling back recovered indoubt local transaction " + tx); 687 } 688 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 689 } 690 for (TransactionId tx: toDiscard) { 691 if (LOG.isDebugEnabled()) { 692 LOG.debug("discarding recovered in-flight XA transaction " + tx); 693 } 694 inflightTransactions.remove(tx); 695 } 696 } 697 698 synchronized (preparedTransactions) { 699 for (TransactionId txId : preparedTransactions.keySet()) { 700 LOG.warn("Recovered prepared XA TX: [{}]", txId); 701 } 702 } 703 704 } finally { 705 this.indexLock.writeLock().unlock(); 706 } 707 } 708 709 @SuppressWarnings("unused") 710 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 711 return TransactionIdConversion.convertToLocal(tx); 712 } 713 714 private Location minimum(Location producerAuditPosition, 715 Location lastIndoubtPosition) { 716 Location min = null; 717 if (producerAuditPosition != null) { 718 min = producerAuditPosition; 719 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { 720 min = lastIndoubtPosition; 721 } 722 } else { 723 min = lastIndoubtPosition; 724 } 725 return min; 726 } 727 728 private Location recoverProducerAudit() throws IOException { 729 if (metadata.producerSequenceIdTrackerLocation != null) { 730 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 731 try { 732 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 733 int maxNumProducers = getMaxFailoverProducersToTrack(); 734 int maxAuditDepth = getFailoverProducersAuditDepth(); 735 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 736 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 737 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 738 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); 739 } catch (Exception e) { 740 LOG.warn("Cannot recover message audit", e); 741 return journal.getNextLocation(null); 742 } 743 } else { 744 // got no audit stored so got to recreate via replay from start of the journal 745 return journal.getNextLocation(null); 746 } 747 } 748 749 @SuppressWarnings("unchecked") 750 private Location recoverAckMessageFileMap() throws IOException { 751 if (metadata.ackMessageFileMapLocation != null) { 752 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 753 try { 754 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 755 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 756 return journal.getNextLocation(metadata.ackMessageFileMapLocation); 757 } catch (Exception e) { 758 LOG.warn("Cannot recover ackMessageFileMap", e); 759 return journal.getNextLocation(null); 760 } 761 } else { 762 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 763 return journal.getNextLocation(null); 764 } 765 } 766 767 protected void recoverIndex(Transaction tx) throws IOException { 768 long start = System.currentTimeMillis(); 769 // It is possible index updates got applied before the journal updates.. 770 // in that case we need to removed references to messages that are not in the journal 771 final Location lastAppendLocation = journal.getLastAppendLocation(); 772 long undoCounter=0; 773 774 // Go through all the destinations to see if they have messages past the lastAppendLocation 775 for (String key : storedDestinations.keySet()) { 776 StoredDestination sd = storedDestinations.get(key); 777 778 final ArrayList<Long> matches = new ArrayList<Long>(); 779 // Find all the Locations that are >= than the last Append Location. 780 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 781 @Override 782 protected void matched(Location key, Long value) { 783 matches.add(value); 784 } 785 }); 786 787 for (Long sequenceId : matches) { 788 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 789 if (keys != null) { 790 sd.locationIndex.remove(tx, keys.location); 791 sd.messageIdIndex.remove(tx, keys.messageId); 792 metadata.producerSequenceIdTracker.rollback(keys.messageId); 793 undoCounter++; 794 decrementAndSubSizeToStoreStat(key, keys.location.getSize()); 795 // TODO: do we need to modify the ack positions for the pub sub case? 796 } 797 } 798 } 799 800 if (undoCounter > 0) { 801 // The rolledback operations are basically in flight journal writes. To avoid getting 802 // these the end user should do sync writes to the journal. 803 if (LOG.isInfoEnabled()) { 804 long end = System.currentTimeMillis(); 805 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 806 } 807 } 808 809 undoCounter = 0; 810 start = System.currentTimeMillis(); 811 812 // Lets be extra paranoid here and verify that all the datafiles being referenced 813 // by the indexes still exists. 814 815 final SequenceSet ss = new SequenceSet(); 816 for (StoredDestination sd : storedDestinations.values()) { 817 // Use a visitor to cut down the number of pages that we load 818 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 819 int last=-1; 820 821 @Override 822 public boolean isInterestedInKeysBetween(Location first, Location second) { 823 if( first==null ) { 824 return !ss.contains(0, second.getDataFileId()); 825 } else if( second==null ) { 826 return true; 827 } else { 828 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 829 } 830 } 831 832 @Override 833 public void visit(List<Location> keys, List<Long> values) { 834 for (Location l : keys) { 835 int fileId = l.getDataFileId(); 836 if( last != fileId ) { 837 ss.add(fileId); 838 last = fileId; 839 } 840 } 841 } 842 843 }); 844 } 845 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 846 while (!ss.isEmpty()) { 847 missingJournalFiles.add((int) ss.removeFirst()); 848 } 849 850 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 851 missingJournalFiles.add(entry.getKey()); 852 for (Integer i : entry.getValue()) { 853 missingJournalFiles.add(i); 854 } 855 } 856 857 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 858 859 if (!missingJournalFiles.isEmpty()) { 860 LOG.warn("Some journal files are missing: " + missingJournalFiles); 861 } 862 863 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 864 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 865 for (Integer missing : missingJournalFiles) { 866 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 867 } 868 869 if (checkForCorruptJournalFiles) { 870 Collection<DataFile> dataFiles = journal.getFileMap().values(); 871 for (DataFile dataFile : dataFiles) { 872 int id = dataFile.getDataFileId(); 873 // eof to next file id 874 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 875 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 876 while (seq != null) { 877 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 878 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 879 missingPredicates.add(visitor); 880 knownCorruption.add(visitor); 881 seq = seq.getNext(); 882 } 883 } 884 } 885 886 if (!missingPredicates.isEmpty()) { 887 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 888 final StoredDestination sd = sdEntry.getValue(); 889 final ArrayList<Long> matches = new ArrayList<Long>(); 890 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 891 @Override 892 protected void matched(Location key, Long value) { 893 matches.add(value); 894 } 895 }); 896 897 // If some message references are affected by the missing data files... 898 if (!matches.isEmpty()) { 899 900 // We either 'gracefully' recover dropping the missing messages or 901 // we error out. 902 if( ignoreMissingJournalfiles ) { 903 // Update the index to remove the references to the missing data 904 for (Long sequenceId : matches) { 905 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 906 sd.locationIndex.remove(tx, keys.location); 907 sd.messageIdIndex.remove(tx, keys.messageId); 908 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 909 undoCounter++; 910 decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); 911 // TODO: do we need to modify the ack positions for the pub sub case? 912 } 913 } else { 914 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected."); 915 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 916 } 917 } 918 } 919 } 920 921 if (!ignoreMissingJournalfiles) { 922 if (!knownCorruption.isEmpty()) { 923 LOG.error("Detected corrupt journal files. " + knownCorruption); 924 throw new IOException("Detected corrupt journal files. " + knownCorruption); 925 } 926 927 if (!missingJournalFiles.isEmpty()) { 928 LOG.error("Detected missing journal files. " + missingJournalFiles); 929 throw new IOException("Detected missing journal files. " + missingJournalFiles); 930 } 931 } 932 933 if (undoCounter > 0) { 934 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 935 // should do sync writes to the journal. 936 if (LOG.isInfoEnabled()) { 937 long end = System.currentTimeMillis(); 938 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 939 } 940 } 941 } 942 943 private Location nextRecoveryPosition; 944 private Location lastRecoveryPosition; 945 946 public void incrementalRecover() throws IOException { 947 this.indexLock.writeLock().lock(); 948 try { 949 if( nextRecoveryPosition == null ) { 950 if( lastRecoveryPosition==null ) { 951 nextRecoveryPosition = getRecoveryPosition(); 952 } else { 953 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 954 } 955 } 956 while (nextRecoveryPosition != null) { 957 lastRecoveryPosition = nextRecoveryPosition; 958 metadata.lastUpdate = lastRecoveryPosition; 959 JournalCommand<?> message = load(lastRecoveryPosition); 960 process(message, lastRecoveryPosition, (IndexAware) null); 961 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 962 } 963 } finally { 964 this.indexLock.writeLock().unlock(); 965 } 966 } 967 968 public Location getLastUpdatePosition() throws IOException { 969 return metadata.lastUpdate; 970 } 971 972 private Location getRecoveryPosition() throws IOException { 973 974 if (!this.forceRecoverIndex) { 975 976 // If we need to recover the transactions.. 977 if (metadata.firstInProgressTransactionLocation != null) { 978 return metadata.firstInProgressTransactionLocation; 979 } 980 981 // Perhaps there were no transactions... 982 if( metadata.lastUpdate!=null) { 983 // Start replay at the record after the last one recorded in the index file. 984 return journal.getNextLocation(metadata.lastUpdate); 985 } 986 } 987 // This loads the first position. 988 return journal.getNextLocation(null); 989 } 990 991 protected void checkpointCleanup(final boolean cleanup) throws IOException { 992 long start; 993 this.indexLock.writeLock().lock(); 994 try { 995 start = System.currentTimeMillis(); 996 if( !opened.get() ) { 997 return; 998 } 999 } finally { 1000 this.indexLock.writeLock().unlock(); 1001 } 1002 checkpointUpdate(cleanup); 1003 long end = System.currentTimeMillis(); 1004 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1005 if (LOG.isInfoEnabled()) { 1006 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1007 } 1008 } 1009 } 1010 1011 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1012 int size = data.serializedSizeFramed(); 1013 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1014 os.writeByte(data.type().getNumber()); 1015 data.writeFramed(os); 1016 return os.toByteSequence(); 1017 } 1018 1019 // ///////////////////////////////////////////////////////////////// 1020 // Methods call by the broker to update and query the store. 1021 // ///////////////////////////////////////////////////////////////// 1022 public Location store(JournalCommand<?> data) throws IOException { 1023 return store(data, false, null,null); 1024 } 1025 1026 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1027 return store(data, false, null, null, onJournalStoreComplete); 1028 } 1029 1030 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1031 return store(data, sync, before, after, null); 1032 } 1033 1034 /** 1035 * All updated are are funneled through this method. The updates are converted 1036 * to a JournalMessage which is logged to the journal and then the data from 1037 * the JournalMessage is used to update the index just like it would be done 1038 * during a recovery process. 1039 */ 1040 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1041 try { 1042 ByteSequence sequence = toByteSequence(data); 1043 Location location; 1044 1045 checkpointLock.readLock().lock(); 1046 try { 1047 1048 long start = System.currentTimeMillis(); 1049 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1050 long start2 = System.currentTimeMillis(); 1051 process(data, location, before); 1052 1053 long end = System.currentTimeMillis(); 1054 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1055 if (LOG.isInfoEnabled()) { 1056 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1057 } 1058 } 1059 } finally { 1060 checkpointLock.readLock().unlock(); 1061 } 1062 1063 if (after != null) { 1064 after.run(); 1065 } 1066 1067 if (scheduler == null && opened.get()) { 1068 startCheckpoint(); 1069 } 1070 return location; 1071 } catch (IOException ioe) { 1072 LOG.error("KahaDB failed to store to Journal", ioe); 1073 brokerService.handleIOException(ioe); 1074 throw ioe; 1075 } 1076 } 1077 1078 /** 1079 * Loads a previously stored JournalMessage 1080 * 1081 * @param location 1082 * @return 1083 * @throws IOException 1084 */ 1085 public JournalCommand<?> load(Location location) throws IOException { 1086 long start = System.currentTimeMillis(); 1087 ByteSequence data = journal.read(location); 1088 long end = System.currentTimeMillis(); 1089 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1090 if (LOG.isInfoEnabled()) { 1091 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1092 } 1093 } 1094 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1095 byte readByte = is.readByte(); 1096 KahaEntryType type = KahaEntryType.valueOf(readByte); 1097 if( type == null ) { 1098 try { 1099 is.close(); 1100 } catch (IOException e) {} 1101 throw new IOException("Could not load journal record. Invalid location: "+location); 1102 } 1103 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1104 message.mergeFramed(is); 1105 return message; 1106 } 1107 1108 /** 1109 * do minimal recovery till we reach the last inDoubtLocation 1110 * @param data 1111 * @param location 1112 * @param inDoubtlocation 1113 * @throws IOException 1114 */ 1115 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1116 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1117 process(data, location, (IndexAware) null); 1118 } else { 1119 // just recover producer audit 1120 data.visit(new Visitor() { 1121 @Override 1122 public void visit(KahaAddMessageCommand command) throws IOException { 1123 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1124 } 1125 }); 1126 } 1127 } 1128 1129 // ///////////////////////////////////////////////////////////////// 1130 // Journaled record processing methods. Once the record is journaled, 1131 // these methods handle applying the index updates. These may be called 1132 // from the recovery method too so they need to be idempotent 1133 // ///////////////////////////////////////////////////////////////// 1134 1135 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1136 data.visit(new Visitor() { 1137 @Override 1138 public void visit(KahaAddMessageCommand command) throws IOException { 1139 process(command, location, onSequenceAssignedCallback); 1140 } 1141 1142 @Override 1143 public void visit(KahaRemoveMessageCommand command) throws IOException { 1144 process(command, location); 1145 } 1146 1147 @Override 1148 public void visit(KahaPrepareCommand command) throws IOException { 1149 process(command, location); 1150 } 1151 1152 @Override 1153 public void visit(KahaCommitCommand command) throws IOException { 1154 process(command, location, onSequenceAssignedCallback); 1155 } 1156 1157 @Override 1158 public void visit(KahaRollbackCommand command) throws IOException { 1159 process(command, location); 1160 } 1161 1162 @Override 1163 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1164 process(command, location); 1165 } 1166 1167 @Override 1168 public void visit(KahaSubscriptionCommand command) throws IOException { 1169 process(command, location); 1170 } 1171 1172 @Override 1173 public void visit(KahaProducerAuditCommand command) throws IOException { 1174 processLocation(location); 1175 } 1176 1177 @Override 1178 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1179 processLocation(location); 1180 } 1181 1182 @Override 1183 public void visit(KahaTraceCommand command) { 1184 processLocation(location); 1185 } 1186 1187 @Override 1188 public void visit(KahaUpdateMessageCommand command) throws IOException { 1189 process(command, location); 1190 } 1191 1192 @Override 1193 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1194 process(command, location); 1195 } 1196 }); 1197 } 1198 1199 @SuppressWarnings("rawtypes") 1200 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1201 if (command.hasTransactionInfo()) { 1202 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1203 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1204 } else { 1205 this.indexLock.writeLock().lock(); 1206 try { 1207 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1208 @Override 1209 public void execute(Transaction tx) throws IOException { 1210 long assignedIndex = updateIndex(tx, command, location); 1211 if (runWithIndexLock != null) { 1212 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1213 } 1214 } 1215 }); 1216 1217 } finally { 1218 this.indexLock.writeLock().unlock(); 1219 } 1220 } 1221 } 1222 1223 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1224 this.indexLock.writeLock().lock(); 1225 try { 1226 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1227 @Override 1228 public void execute(Transaction tx) throws IOException { 1229 updateIndex(tx, command, location); 1230 } 1231 }); 1232 } finally { 1233 this.indexLock.writeLock().unlock(); 1234 } 1235 } 1236 1237 @SuppressWarnings("rawtypes") 1238 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1239 if (command.hasTransactionInfo()) { 1240 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1241 inflightTx.add(new RemoveOperation(command, location)); 1242 } else { 1243 this.indexLock.writeLock().lock(); 1244 try { 1245 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1246 @Override 1247 public void execute(Transaction tx) throws IOException { 1248 updateIndex(tx, command, location); 1249 } 1250 }); 1251 } finally { 1252 this.indexLock.writeLock().unlock(); 1253 } 1254 } 1255 } 1256 1257 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1258 this.indexLock.writeLock().lock(); 1259 try { 1260 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1261 @Override 1262 public void execute(Transaction tx) throws IOException { 1263 updateIndex(tx, command, location); 1264 } 1265 }); 1266 } finally { 1267 this.indexLock.writeLock().unlock(); 1268 } 1269 } 1270 1271 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1272 this.indexLock.writeLock().lock(); 1273 try { 1274 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1275 @Override 1276 public void execute(Transaction tx) throws IOException { 1277 updateIndex(tx, command, location); 1278 } 1279 }); 1280 } finally { 1281 this.indexLock.writeLock().unlock(); 1282 } 1283 } 1284 1285 protected void processLocation(final Location location) { 1286 this.indexLock.writeLock().lock(); 1287 try { 1288 metadata.lastUpdate = location; 1289 } finally { 1290 this.indexLock.writeLock().unlock(); 1291 } 1292 } 1293 1294 @SuppressWarnings("rawtypes") 1295 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1296 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1297 List<Operation> inflightTx; 1298 synchronized (inflightTransactions) { 1299 inflightTx = inflightTransactions.remove(key); 1300 if (inflightTx == null) { 1301 inflightTx = preparedTransactions.remove(key); 1302 } 1303 } 1304 if (inflightTx == null) { 1305 // only non persistent messages in this tx 1306 if (before != null) { 1307 before.sequenceAssignedWithIndexLocked(-1); 1308 } 1309 return; 1310 } 1311 1312 final List<Operation> messagingTx = inflightTx; 1313 indexLock.writeLock().lock(); 1314 try { 1315 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1316 @Override 1317 public void execute(Transaction tx) throws IOException { 1318 for (Operation op : messagingTx) { 1319 op.execute(tx); 1320 } 1321 } 1322 }); 1323 metadata.lastUpdate = location; 1324 } finally { 1325 indexLock.writeLock().unlock(); 1326 } 1327 } 1328 1329 @SuppressWarnings("rawtypes") 1330 protected void process(KahaPrepareCommand command, Location location) { 1331 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1332 synchronized (inflightTransactions) { 1333 List<Operation> tx = inflightTransactions.remove(key); 1334 if (tx != null) { 1335 preparedTransactions.put(key, tx); 1336 } 1337 } 1338 } 1339 1340 @SuppressWarnings("rawtypes") 1341 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1342 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1343 List<Operation> updates = null; 1344 synchronized (inflightTransactions) { 1345 updates = inflightTransactions.remove(key); 1346 if (updates == null) { 1347 updates = preparedTransactions.remove(key); 1348 } 1349 } 1350 } 1351 1352 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1353 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1354 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1355 // Mark the current journal file as a compacted file so that gc checks can skip 1356 // over logs that are smaller compaction type logs. 1357 DataFile current = journal.getDataFileById(location.getDataFileId()); 1358 current.setTypeCode(command.getRewriteType()); 1359 1360 // Move offset so that next location read jumps to next file. 1361 location.setOffset(journalMaxFileLength); 1362 } 1363 } 1364 1365 // ///////////////////////////////////////////////////////////////// 1366 // These methods do the actual index updates. 1367 // ///////////////////////////////////////////////////////////////// 1368 1369 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1370 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1371 1372 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1373 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1374 1375 // Skip adding the message to the index if this is a topic and there are 1376 // no subscriptions. 1377 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1378 return -1; 1379 } 1380 1381 // Add the message. 1382 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1383 long id = sd.orderIndex.getNextMessageId(); 1384 Long previous = sd.locationIndex.put(tx, location, id); 1385 if (previous == null) { 1386 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1387 if (previous == null) { 1388 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1389 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1390 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1391 addAckLocationForNewMessage(tx, sd, id); 1392 } 1393 metadata.lastUpdate = location; 1394 } else { 1395 1396 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1397 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1398 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1399 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1400 } 1401 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1402 sd.locationIndex.remove(tx, location); 1403 id = -1; 1404 } 1405 } else { 1406 // restore the previous value.. Looks like this was a redo of a previously 1407 // added message. We don't want to assign it a new id as the other indexes would 1408 // be wrong.. 1409 sd.locationIndex.put(tx, location, previous); 1410 // ensure sequence is not broken 1411 sd.orderIndex.revertNextMessageId(); 1412 metadata.lastUpdate = location; 1413 } 1414 // record this id in any event, initial send or recovery 1415 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1416 1417 return id; 1418 } 1419 1420 void trackPendingAdd(KahaDestination destination, Long seq) { 1421 StoredDestination sd = storedDestinations.get(key(destination)); 1422 if (sd != null) { 1423 sd.trackPendingAdd(seq); 1424 } 1425 } 1426 1427 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1428 StoredDestination sd = storedDestinations.get(key(destination)); 1429 if (sd != null) { 1430 sd.trackPendingAddComplete(seq); 1431 } 1432 } 1433 1434 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1435 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1436 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1437 1438 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1439 if (id != null) { 1440 MessageKeys previousKeys = sd.orderIndex.put( 1441 tx, 1442 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1443 id, 1444 new MessageKeys(command.getMessageId(), location) 1445 ); 1446 sd.locationIndex.put(tx, location, id); 1447 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1448 // on first update previous is original location, on recovery/replay it may be the updated location 1449 if(previousKeys != null && !previousKeys.location.equals(location)) { 1450 sd.locationIndex.remove(tx, previousKeys.location); 1451 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); 1452 } 1453 metadata.lastUpdate = location; 1454 } else { 1455 //Add the message if it can't be found 1456 this.updateIndex(tx, command, location); 1457 } 1458 } 1459 1460 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1461 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1462 if (!command.hasSubscriptionKey()) { 1463 1464 // In the queue case we just remove the message from the index.. 1465 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1466 if (sequenceId != null) { 1467 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1468 if (keys != null) { 1469 sd.locationIndex.remove(tx, keys.location); 1470 decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); 1471 recordAckMessageReferenceLocation(ackLocation, keys.location); 1472 metadata.lastUpdate = ackLocation; 1473 } else if (LOG.isDebugEnabled()) { 1474 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1475 } 1476 } else if (LOG.isDebugEnabled()) { 1477 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1478 } 1479 } else { 1480 // In the topic case we need remove the message once it's been acked 1481 // by all the subs 1482 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1483 1484 // Make sure it's a valid message id... 1485 if (sequence != null) { 1486 String subscriptionKey = command.getSubscriptionKey(); 1487 if (command.getAck() != UNMATCHED) { 1488 sd.orderIndex.get(tx, sequence); 1489 byte priority = sd.orderIndex.lastGetPriority(); 1490 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1491 } 1492 1493 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1494 if (keys != null) { 1495 recordAckMessageReferenceLocation(ackLocation, keys.location); 1496 } 1497 // The following method handles deleting un-referenced messages. 1498 removeAckLocation(command, tx, sd, subscriptionKey, sequence); 1499 metadata.lastUpdate = ackLocation; 1500 } else if (LOG.isDebugEnabled()) { 1501 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1502 } 1503 1504 } 1505 } 1506 1507 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1508 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1509 if (referenceFileIds == null) { 1510 referenceFileIds = new HashSet<Integer>(); 1511 referenceFileIds.add(messageLocation.getDataFileId()); 1512 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1513 } else { 1514 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1515 if (!referenceFileIds.contains(id)) { 1516 referenceFileIds.add(id); 1517 } 1518 } 1519 } 1520 1521 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1522 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1523 sd.orderIndex.remove(tx); 1524 1525 sd.locationIndex.clear(tx); 1526 sd.locationIndex.unload(tx); 1527 tx.free(sd.locationIndex.getPageId()); 1528 1529 sd.messageIdIndex.clear(tx); 1530 sd.messageIdIndex.unload(tx); 1531 tx.free(sd.messageIdIndex.getPageId()); 1532 1533 if (sd.subscriptions != null) { 1534 sd.subscriptions.clear(tx); 1535 sd.subscriptions.unload(tx); 1536 tx.free(sd.subscriptions.getPageId()); 1537 1538 sd.subscriptionAcks.clear(tx); 1539 sd.subscriptionAcks.unload(tx); 1540 tx.free(sd.subscriptionAcks.getPageId()); 1541 1542 sd.ackPositions.clear(tx); 1543 sd.ackPositions.unload(tx); 1544 tx.free(sd.ackPositions.getHeadPageId()); 1545 1546 sd.subLocations.clear(tx); 1547 sd.subLocations.unload(tx); 1548 tx.free(sd.subLocations.getHeadPageId()); 1549 } 1550 1551 String key = key(command.getDestination()); 1552 storedDestinations.remove(key); 1553 metadata.destinations.remove(tx, key); 1554 clearStoreStats(command.getDestination()); 1555 storeCache.remove(key(command.getDestination())); 1556 } 1557 1558 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1559 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1560 final String subscriptionKey = command.getSubscriptionKey(); 1561 1562 // If set then we are creating it.. otherwise we are destroying the sub 1563 if (command.hasSubscriptionInfo()) { 1564 Location existing = sd.subLocations.get(tx, subscriptionKey); 1565 if (existing != null && existing.compareTo(location) == 0) { 1566 // replay on recovery, ignore 1567 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1568 return; 1569 } 1570 1571 sd.subscriptions.put(tx, subscriptionKey, command); 1572 sd.subLocations.put(tx, subscriptionKey, location); 1573 long ackLocation=NOT_ACKED; 1574 if (!command.getRetroactive()) { 1575 ackLocation = sd.orderIndex.nextMessageId-1; 1576 } else { 1577 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1578 } 1579 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1580 sd.subscriptionCache.add(subscriptionKey); 1581 } else { 1582 // delete the sub... 1583 sd.subscriptions.remove(tx, subscriptionKey); 1584 sd.subLocations.remove(tx, subscriptionKey); 1585 sd.subscriptionAcks.remove(tx, subscriptionKey); 1586 sd.subscriptionCache.remove(subscriptionKey); 1587 removeAckLocationsForSub(command, tx, sd, subscriptionKey); 1588 1589 if (sd.subscriptions.isEmpty(tx)) { 1590 // remove the stored destination 1591 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1592 removeDestinationCommand.setDestination(command.getDestination()); 1593 updateIndex(tx, removeDestinationCommand, null); 1594 clearStoreStats(command.getDestination()); 1595 } 1596 } 1597 } 1598 1599 private void checkpointUpdate(final boolean cleanup) throws IOException { 1600 checkpointLock.writeLock().lock(); 1601 try { 1602 this.indexLock.writeLock().lock(); 1603 try { 1604 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1605 @Override 1606 public void execute(Transaction tx) throws IOException { 1607 checkpointUpdate(tx, cleanup); 1608 } 1609 }); 1610 } finally { 1611 this.indexLock.writeLock().unlock(); 1612 } 1613 1614 } finally { 1615 checkpointLock.writeLock().unlock(); 1616 } 1617 } 1618 1619 /** 1620 * @param tx 1621 * @throws IOException 1622 */ 1623 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1624 LOG.debug("Checkpoint started."); 1625 1626 // reflect last update exclusive of current checkpoint 1627 Location lastUpdate = metadata.lastUpdate; 1628 1629 metadata.state = OPEN_STATE; 1630 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1631 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1632 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1633 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1634 tx.store(metadata.page, metadataMarshaller, true); 1635 pageFile.flush(); 1636 1637 if (cleanup) { 1638 1639 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1640 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1641 1642 if (LOG.isTraceEnabled()) { 1643 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1644 } 1645 1646 if (lastUpdate != null) { 1647 gcCandidateSet.remove(lastUpdate.getDataFileId()); 1648 } 1649 1650 // Don't GC files under replication 1651 if( journalFilesBeingReplicated!=null ) { 1652 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1653 } 1654 1655 if (metadata.producerSequenceIdTrackerLocation != null) { 1656 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1657 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1658 // rewrite so we don't prevent gc 1659 metadata.producerSequenceIdTracker.setModified(true); 1660 if (LOG.isTraceEnabled()) { 1661 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1662 } 1663 } 1664 gcCandidateSet.remove(dataFileId); 1665 if (LOG.isTraceEnabled()) { 1666 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); 1667 } 1668 } 1669 1670 if (metadata.ackMessageFileMapLocation != null) { 1671 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1672 gcCandidateSet.remove(dataFileId); 1673 if (LOG.isTraceEnabled()) { 1674 LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); 1675 } 1676 } 1677 1678 // Don't GC files referenced by in-progress tx 1679 if (inProgressTxRange[0] != null) { 1680 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1681 gcCandidateSet.remove(pendingTx); 1682 } 1683 } 1684 if (LOG.isTraceEnabled()) { 1685 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1686 } 1687 1688 // Go through all the destinations to see if any of them can remove GC candidates. 1689 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1690 if( gcCandidateSet.isEmpty() ) { 1691 break; 1692 } 1693 1694 // Use a visitor to cut down the number of pages that we load 1695 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1696 int last=-1; 1697 @Override 1698 public boolean isInterestedInKeysBetween(Location first, Location second) { 1699 if( first==null ) { 1700 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1701 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1702 subset.remove(second.getDataFileId()); 1703 } 1704 return !subset.isEmpty(); 1705 } else if( second==null ) { 1706 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1707 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1708 subset.remove(first.getDataFileId()); 1709 } 1710 return !subset.isEmpty(); 1711 } else { 1712 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1713 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1714 subset.remove(first.getDataFileId()); 1715 } 1716 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1717 subset.remove(second.getDataFileId()); 1718 } 1719 return !subset.isEmpty(); 1720 } 1721 } 1722 1723 @Override 1724 public void visit(List<Location> keys, List<Long> values) { 1725 for (Location l : keys) { 1726 int fileId = l.getDataFileId(); 1727 if( last != fileId ) { 1728 gcCandidateSet.remove(fileId); 1729 last = fileId; 1730 } 1731 } 1732 } 1733 }); 1734 1735 // Durable Subscription 1736 if (entry.getValue().subLocations != null) { 1737 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1738 while (iter.hasNext()) { 1739 Entry<String, Location> subscription = iter.next(); 1740 int dataFileId = subscription.getValue().getDataFileId(); 1741 1742 // Move subscription along if it has no outstanding messages that need ack'd 1743 // and its in the last log file in the journal. 1744 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1745 final StoredDestination destination = entry.getValue(); 1746 final String subscriptionKey = subscription.getKey(); 1747 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1748 1749 // When pending is size one that is the next message Id meaning there 1750 // are no pending messages currently. 1751 if (pendingAcks == null || pendingAcks.isEmpty() || 1752 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1753 1754 if (LOG.isTraceEnabled()) { 1755 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1756 } 1757 1758 final KahaSubscriptionCommand kahaSub = 1759 destination.subscriptions.get(tx, subscriptionKey); 1760 destination.subLocations.put( 1761 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1762 1763 // Skips the remove from candidates if we rewrote the subscription 1764 // in order to prevent duplicate subscription commands on recover. 1765 // If another subscription is on the same file and isn't rewritten 1766 // than it will remove the file from the set. 1767 continue; 1768 } 1769 } 1770 1771 gcCandidateSet.remove(dataFileId); 1772 } 1773 } 1774 1775 if (LOG.isTraceEnabled()) { 1776 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1777 } 1778 } 1779 1780 // check we are not deleting file with ack for in-use journal files 1781 if (LOG.isTraceEnabled()) { 1782 LOG.trace("gc candidates: " + gcCandidateSet); 1783 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1784 } 1785 1786 boolean ackMessageFileMapMod = false; 1787 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1788 while (candidates.hasNext()) { 1789 Integer candidate = candidates.next(); 1790 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1791 if (referencedFileIds != null) { 1792 for (Integer referencedFileId : referencedFileIds) { 1793 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1794 // active file that is not targeted for deletion is referenced so don't delete 1795 candidates.remove(); 1796 break; 1797 } 1798 } 1799 if (gcCandidateSet.contains(candidate)) { 1800 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1801 } else { 1802 if (LOG.isTraceEnabled()) { 1803 LOG.trace("not removing data file: " + candidate 1804 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1805 } 1806 } 1807 } 1808 } 1809 1810 if (!gcCandidateSet.isEmpty()) { 1811 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1812 journal.removeDataFiles(gcCandidateSet); 1813 for (Integer candidate : gcCandidateSet) { 1814 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1815 ackMessageFileMapMod |= ackFiles.remove(candidate); 1816 } 1817 } 1818 if (ackMessageFileMapMod) { 1819 checkpointUpdate(tx, false); 1820 } 1821 } else if (isEnableAckCompaction()) { 1822 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1823 // First check length of journal to make sure it makes sense to even try. 1824 // 1825 // If there is only one journal file with Acks in it we don't need to move 1826 // it since it won't be chained to any later logs. 1827 // 1828 // If the logs haven't grown since the last time then we need to compact 1829 // otherwise there seems to still be room for growth and we don't need to incur 1830 // the overhead. Depending on configuration this check can be avoided and 1831 // Ack compaction will run any time the store has not GC'd a journal file in 1832 // the configured amount of cycles. 1833 if (metadata.ackMessageFileMap.size() > 1 && 1834 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1835 1836 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1837 try { 1838 scheduler.execute(new AckCompactionRunner()); 1839 } catch (Exception ex) { 1840 LOG.warn("Error on queueing the Ack Compactor", ex); 1841 } 1842 } else { 1843 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1844 } 1845 1846 checkPointCyclesWithNoGC = 0; 1847 } else { 1848 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1849 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1850 } 1851 1852 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1853 } 1854 } 1855 1856 LOG.debug("Checkpoint done."); 1857 } 1858 1859 private final class AckCompactionRunner implements Runnable { 1860 1861 @Override 1862 public void run() { 1863 // Lock index to capture the ackMessageFileMap data 1864 indexLock.writeLock().lock(); 1865 1866 // Map keys might not be sorted, find the earliest log file to forward acks 1867 // from and move only those, future cycles can chip away at more as needed. 1868 // We won't move files that are themselves rewritten on a previous compaction. 1869 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 1870 Collections.sort(journalFileIds); 1871 int journalToAdvance = -1; 1872 for (Integer journalFileId : journalFileIds) { 1873 DataFile current = journal.getDataFileById(journalFileId); 1874 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1875 journalToAdvance = journalFileId; 1876 break; 1877 } 1878 } 1879 1880 // Check if we found one, or if we only found the current file being written to. 1881 if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { 1882 return; 1883 } 1884 1885 Set<Integer> journalLogsReferenced = 1886 new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance)); 1887 1888 indexLock.writeLock().unlock(); 1889 1890 try { 1891 // Background rewrite of the old acks 1892 forwardAllAcks(journalToAdvance, journalLogsReferenced); 1893 1894 // Checkpoint with changes from the ackMessageFileMap 1895 checkpointUpdate(false); 1896 } catch (IOException ioe) { 1897 LOG.error("Checkpoint failed", ioe); 1898 brokerService.handleIOException(ioe); 1899 } catch (Throwable e) { 1900 LOG.error("Checkpoint failed", e); 1901 brokerService.handleIOException(IOExceptionSupport.create(e)); 1902 } 1903 } 1904 } 1905 1906 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 1907 LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); 1908 1909 DataFile forwardsFile = journal.reserveDataFile(); 1910 LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); 1911 1912 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 1913 1914 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 1915 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 1916 compactionMarker.setSourceDataFileId(journalToRead); 1917 compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE); 1918 1919 ByteSequence payload = toByteSequence(compactionMarker); 1920 appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs()); 1921 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 1922 1923 Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0)); 1924 while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { 1925 JournalCommand<?> command = null; 1926 try { 1927 command = load(nextLocation); 1928 } catch (IOException ex) { 1929 LOG.trace("Error loading command during ack forward: {}", nextLocation); 1930 } 1931 1932 if (command != null && command instanceof KahaRemoveMessageCommand) { 1933 payload = toByteSequence(command); 1934 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs()); 1935 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 1936 } 1937 1938 nextLocation = journal.getNextLocation(nextLocation); 1939 } 1940 } 1941 1942 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 1943 1944 // Lock index while we update the ackMessageFileMap. 1945 indexLock.writeLock().lock(); 1946 1947 // Update the ack map with the new locations of the acks 1948 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 1949 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 1950 if (referenceFileIds == null) { 1951 referenceFileIds = new HashSet<Integer>(); 1952 referenceFileIds.addAll(entry.getValue()); 1953 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 1954 } else { 1955 referenceFileIds.addAll(entry.getValue()); 1956 } 1957 } 1958 1959 // remove the old location data from the ack map so that the old journal log file can 1960 // be removed on next GC. 1961 metadata.ackMessageFileMap.remove(journalToRead); 1962 1963 indexLock.writeLock().unlock(); 1964 1965 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 1966 } 1967 1968 final Runnable nullCompletionCallback = new Runnable() { 1969 @Override 1970 public void run() { 1971 } 1972 }; 1973 1974 private Location checkpointProducerAudit() throws IOException { 1975 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 1976 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1977 ObjectOutputStream oout = new ObjectOutputStream(baos); 1978 oout.writeObject(metadata.producerSequenceIdTracker); 1979 oout.flush(); 1980 oout.close(); 1981 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1982 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 1983 try { 1984 location.getLatch().await(); 1985 } catch (InterruptedException e) { 1986 throw new InterruptedIOException(e.toString()); 1987 } 1988 return location; 1989 } 1990 return metadata.producerSequenceIdTrackerLocation; 1991 } 1992 1993 private Location checkpointAckMessageFileMap() throws IOException { 1994 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1995 ObjectOutputStream oout = new ObjectOutputStream(baos); 1996 oout.writeObject(metadata.ackMessageFileMap); 1997 oout.flush(); 1998 oout.close(); 1999 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2000 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2001 try { 2002 location.getLatch().await(); 2003 } catch (InterruptedException e) { 2004 throw new InterruptedIOException(e.toString()); 2005 } 2006 return location; 2007 } 2008 2009 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2010 2011 ByteSequence sequence = toByteSequence(subscription); 2012 Location location = journal.write(sequence, nullCompletionCallback) ; 2013 2014 try { 2015 location.getLatch().await(); 2016 } catch (InterruptedException e) { 2017 throw new InterruptedIOException(e.toString()); 2018 } 2019 return location; 2020 } 2021 2022 public HashSet<Integer> getJournalFilesBeingReplicated() { 2023 return journalFilesBeingReplicated; 2024 } 2025 2026 // ///////////////////////////////////////////////////////////////// 2027 // StoredDestination related implementation methods. 2028 // ///////////////////////////////////////////////////////////////// 2029 2030 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2031 2032 static class MessageKeys { 2033 final String messageId; 2034 final Location location; 2035 2036 public MessageKeys(String messageId, Location location) { 2037 this.messageId=messageId; 2038 this.location=location; 2039 } 2040 2041 @Override 2042 public String toString() { 2043 return "["+messageId+","+location+"]"; 2044 } 2045 } 2046 2047 protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2048 final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); 2049 2050 @Override 2051 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2052 return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); 2053 } 2054 2055 @Override 2056 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2057 dataOut.writeUTF(object.messageId); 2058 locationSizeMarshaller.writePayload(object.location, dataOut); 2059 } 2060 } 2061 2062 class LastAck { 2063 long lastAckedSequence; 2064 byte priority; 2065 2066 public LastAck(LastAck source) { 2067 this.lastAckedSequence = source.lastAckedSequence; 2068 this.priority = source.priority; 2069 } 2070 2071 public LastAck() { 2072 this.priority = MessageOrderIndex.HI; 2073 } 2074 2075 public LastAck(long ackLocation) { 2076 this.lastAckedSequence = ackLocation; 2077 this.priority = MessageOrderIndex.LO; 2078 } 2079 2080 public LastAck(long ackLocation, byte priority) { 2081 this.lastAckedSequence = ackLocation; 2082 this.priority = priority; 2083 } 2084 2085 @Override 2086 public String toString() { 2087 return "[" + lastAckedSequence + ":" + priority + "]"; 2088 } 2089 } 2090 2091 protected class LastAckMarshaller implements Marshaller<LastAck> { 2092 2093 @Override 2094 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2095 dataOut.writeLong(object.lastAckedSequence); 2096 dataOut.writeByte(object.priority); 2097 } 2098 2099 @Override 2100 public LastAck readPayload(DataInput dataIn) throws IOException { 2101 LastAck lastAcked = new LastAck(); 2102 lastAcked.lastAckedSequence = dataIn.readLong(); 2103 if (metadata.version >= 3) { 2104 lastAcked.priority = dataIn.readByte(); 2105 } 2106 return lastAcked; 2107 } 2108 2109 @Override 2110 public int getFixedSize() { 2111 return 9; 2112 } 2113 2114 @Override 2115 public LastAck deepCopy(LastAck source) { 2116 return new LastAck(source); 2117 } 2118 2119 @Override 2120 public boolean isDeepCopySupported() { 2121 return true; 2122 } 2123 } 2124 2125 class StoredDestination { 2126 2127 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2128 BTreeIndex<Location, Long> locationIndex; 2129 BTreeIndex<String, Long> messageIdIndex; 2130 2131 // These bits are only set for Topics 2132 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2133 BTreeIndex<String, LastAck> subscriptionAcks; 2134 HashMap<String, MessageOrderCursor> subscriptionCursors; 2135 ListIndex<String, SequenceSet> ackPositions; 2136 ListIndex<String, Location> subLocations; 2137 2138 // Transient data used to track which Messages are no longer needed. 2139 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 2140 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2141 2142 public void trackPendingAdd(Long seq) { 2143 orderIndex.trackPendingAdd(seq); 2144 } 2145 2146 public void trackPendingAddComplete(Long seq) { 2147 orderIndex.trackPendingAddComplete(seq); 2148 } 2149 2150 @Override 2151 public String toString() { 2152 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2153 } 2154 } 2155 2156 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2157 2158 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 2159 2160 @Override 2161 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2162 final StoredDestination value = new StoredDestination(); 2163 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2164 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2165 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2166 2167 if (dataIn.readBoolean()) { 2168 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2169 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2170 if (metadata.version >= 4) { 2171 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2172 } else { 2173 // upgrade 2174 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2175 @Override 2176 public void execute(Transaction tx) throws IOException { 2177 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2178 2179 if (metadata.version >= 3) { 2180 // migrate 2181 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2182 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2183 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2184 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2185 oldAckPositions.load(tx); 2186 2187 2188 // Do the initial build of the data in memory before writing into the store 2189 // based Ack Positions List to avoid a lot of disk thrashing. 2190 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2191 while (iterator.hasNext()) { 2192 Entry<Long, HashSet<String>> entry = iterator.next(); 2193 2194 for(String subKey : entry.getValue()) { 2195 SequenceSet pendingAcks = temp.get(subKey); 2196 if (pendingAcks == null) { 2197 pendingAcks = new SequenceSet(); 2198 temp.put(subKey, pendingAcks); 2199 } 2200 2201 pendingAcks.add(entry.getKey()); 2202 } 2203 } 2204 } 2205 // Now move the pending messages to ack data into the store backed 2206 // structure. 2207 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2208 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2209 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2210 value.ackPositions.load(tx); 2211 for(String subscriptionKey : temp.keySet()) { 2212 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2213 } 2214 2215 } 2216 }); 2217 } 2218 2219 if (metadata.version >= 5) { 2220 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2221 } else { 2222 // upgrade 2223 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2224 @Override 2225 public void execute(Transaction tx) throws IOException { 2226 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2227 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2228 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2229 value.subLocations.load(tx); 2230 } 2231 }); 2232 } 2233 } 2234 if (metadata.version >= 2) { 2235 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2236 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2237 } else { 2238 // upgrade 2239 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2240 @Override 2241 public void execute(Transaction tx) throws IOException { 2242 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2243 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2244 value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2245 value.orderIndex.lowPriorityIndex.load(tx); 2246 2247 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2248 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2249 value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2250 value.orderIndex.highPriorityIndex.load(tx); 2251 } 2252 }); 2253 } 2254 2255 return value; 2256 } 2257 2258 @Override 2259 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2260 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2261 dataOut.writeLong(value.locationIndex.getPageId()); 2262 dataOut.writeLong(value.messageIdIndex.getPageId()); 2263 if (value.subscriptions != null) { 2264 dataOut.writeBoolean(true); 2265 dataOut.writeLong(value.subscriptions.getPageId()); 2266 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2267 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2268 dataOut.writeLong(value.subLocations.getHeadPageId()); 2269 } else { 2270 dataOut.writeBoolean(false); 2271 } 2272 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2273 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2274 } 2275 } 2276 2277 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2278 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2279 2280 @Override 2281 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2282 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2283 rc.mergeFramed((InputStream)dataIn); 2284 return rc; 2285 } 2286 2287 @Override 2288 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2289 object.writeFramed((OutputStream)dataOut); 2290 } 2291 } 2292 2293 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2294 String key = key(destination); 2295 StoredDestination rc = storedDestinations.get(key); 2296 if (rc == null) { 2297 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2298 rc = loadStoredDestination(tx, key, topic); 2299 // Cache it. We may want to remove/unload destinations from the 2300 // cache that are not used for a while 2301 // to reduce memory usage. 2302 storedDestinations.put(key, rc); 2303 } 2304 return rc; 2305 } 2306 2307 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2308 String key = key(destination); 2309 StoredDestination rc = storedDestinations.get(key); 2310 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2311 rc = getStoredDestination(destination, tx); 2312 } 2313 return rc; 2314 } 2315 2316 /** 2317 * @param tx 2318 * @param key 2319 * @param topic 2320 * @return 2321 * @throws IOException 2322 */ 2323 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2324 // Try to load the existing indexes.. 2325 StoredDestination rc = metadata.destinations.get(tx, key); 2326 if (rc == null) { 2327 // Brand new destination.. allocate indexes for it. 2328 rc = new StoredDestination(); 2329 rc.orderIndex.allocate(tx); 2330 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2331 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2332 2333 if (topic) { 2334 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2335 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2336 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2337 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2338 } 2339 metadata.destinations.put(tx, key, rc); 2340 } 2341 2342 // Configure the marshalers and load. 2343 rc.orderIndex.load(tx); 2344 2345 // Figure out the next key using the last entry in the destination. 2346 rc.orderIndex.configureLast(tx); 2347 2348 rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller()); 2349 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2350 rc.locationIndex.load(tx); 2351 2352 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2353 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2354 rc.messageIdIndex.load(tx); 2355 2356 //go through an upgrade old index if older than version 6 2357 if (metadata.version < 6) { 2358 for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) { 2359 Entry<Location, Long> entry = iterator.next(); 2360 // modify so it is upgraded 2361 rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); 2362 } 2363 //upgrade the order index 2364 for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { 2365 Entry<Long, MessageKeys> entry = iterator.next(); 2366 //call get so that the last priority is updated 2367 rc.orderIndex.get(tx, entry.getKey()); 2368 rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); 2369 } 2370 } 2371 2372 // If it was a topic... 2373 if (topic) { 2374 2375 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2376 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2377 rc.subscriptions.load(tx); 2378 2379 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2380 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2381 rc.subscriptionAcks.load(tx); 2382 2383 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2384 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2385 rc.ackPositions.load(tx); 2386 2387 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2388 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2389 rc.subLocations.load(tx); 2390 2391 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2392 2393 if (metadata.version < 3) { 2394 2395 // on upgrade need to fill ackLocation with available messages past last ack 2396 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2397 Entry<String, LastAck> entry = iterator.next(); 2398 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2399 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2400 Long sequence = orderIterator.next().getKey(); 2401 addAckLocation(tx, rc, sequence, entry.getKey()); 2402 } 2403 // modify so it is upgraded 2404 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2405 } 2406 } 2407 2408 // Configure the message references index 2409 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2410 while (subscriptions.hasNext()) { 2411 Entry<String, SequenceSet> subscription = subscriptions.next(); 2412 SequenceSet pendingAcks = subscription.getValue(); 2413 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2414 Long lastPendingAck = pendingAcks.getTail().getLast(); 2415 for (Long sequenceId : pendingAcks) { 2416 Long current = rc.messageReferences.get(sequenceId); 2417 if (current == null) { 2418 current = new Long(0); 2419 } 2420 2421 // We always add a trailing empty entry for the next position to start from 2422 // so we need to ensure we don't count that as a message reference on reload. 2423 if (!sequenceId.equals(lastPendingAck)) { 2424 current = current.longValue() + 1; 2425 } else { 2426 current = Long.valueOf(0L); 2427 } 2428 2429 rc.messageReferences.put(sequenceId, current); 2430 } 2431 } 2432 } 2433 2434 // Configure the subscription cache 2435 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2436 Entry<String, LastAck> entry = iterator.next(); 2437 rc.subscriptionCache.add(entry.getKey()); 2438 } 2439 2440 if (rc.orderIndex.nextMessageId == 0) { 2441 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2442 if (!rc.subscriptionAcks.isEmpty(tx)) { 2443 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2444 Entry<String, LastAck> entry = iterator.next(); 2445 rc.orderIndex.nextMessageId = 2446 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2447 } 2448 } 2449 } else { 2450 // update based on ackPositions for unmatched, last entry is always the next 2451 if (!rc.messageReferences.isEmpty()) { 2452 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2453 rc.orderIndex.nextMessageId = 2454 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2455 } 2456 } 2457 } 2458 2459 if (metadata.version < VERSION) { 2460 // store again after upgrade 2461 metadata.destinations.put(tx, key, rc); 2462 } 2463 return rc; 2464 } 2465 2466 /** 2467 * Clear the counter for the destination, if one exists. 2468 * 2469 * @param kahaDestination 2470 */ 2471 protected void clearStoreStats(KahaDestination kahaDestination) { 2472 MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination)); 2473 if (storeStats != null) { 2474 storeStats.reset(); 2475 } 2476 } 2477 2478 /** 2479 * Update MessageStoreStatistics 2480 * 2481 * @param kahaDestination 2482 * @param size 2483 */ 2484 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { 2485 incrementAndAddSizeToStoreStat(key(kahaDestination), size); 2486 } 2487 2488 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { 2489 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2490 if (storeStats != null) { 2491 storeStats.getMessageCount().increment(); 2492 if (size > 0) { 2493 storeStats.getMessageSize().addSize(size); 2494 } 2495 } 2496 } 2497 2498 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { 2499 decrementAndSubSizeToStoreStat(key(kahaDestination), size); 2500 } 2501 2502 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { 2503 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2504 if (storeStats != null) { 2505 storeStats.getMessageCount().decrement(); 2506 if (size > 0) { 2507 storeStats.getMessageSize().addSize(-size); 2508 } 2509 } 2510 } 2511 2512 /** 2513 * This is a map to cache DestinationStatistics for a specific 2514 * KahaDestination key 2515 */ 2516 protected final ConcurrentMap<String, MessageStore> storeCache = 2517 new ConcurrentHashMap<String, MessageStore>(); 2518 2519 /** 2520 * Locate the storeMessageSize counter for this KahaDestination 2521 * @param kahaDestination 2522 * @return 2523 */ 2524 protected MessageStoreStatistics getStoreStats(String kahaDestKey) { 2525 MessageStoreStatistics storeStats = null; 2526 try { 2527 MessageStore messageStore = storeCache.get(kahaDestKey); 2528 if (messageStore != null) { 2529 storeStats = messageStore.getMessageStoreStatistics(); 2530 } 2531 } catch (Exception e1) { 2532 LOG.error("Getting size counter of destination failed", e1); 2533 } 2534 2535 return storeStats; 2536 } 2537 2538 /** 2539 * Determine whether this Destination matches the DestinationType 2540 * 2541 * @param destination 2542 * @param type 2543 * @return 2544 */ 2545 protected boolean matchType(Destination destination, 2546 KahaDestination.DestinationType type) { 2547 if (destination instanceof Topic 2548 && type.equals(KahaDestination.DestinationType.TOPIC)) { 2549 return true; 2550 } else if (destination instanceof Queue 2551 && type.equals(KahaDestination.DestinationType.QUEUE)) { 2552 return true; 2553 } 2554 return false; 2555 } 2556 2557 class LocationSizeMarshaller implements Marshaller<Location> { 2558 2559 public LocationSizeMarshaller() { 2560 2561 } 2562 2563 @Override 2564 public Location readPayload(DataInput dataIn) throws IOException { 2565 Location rc = new Location(); 2566 rc.setDataFileId(dataIn.readInt()); 2567 rc.setOffset(dataIn.readInt()); 2568 if (metadata.version >= 6) { 2569 rc.setSize(dataIn.readInt()); 2570 } 2571 return rc; 2572 } 2573 2574 @Override 2575 public void writePayload(Location object, DataOutput dataOut) 2576 throws IOException { 2577 dataOut.writeInt(object.getDataFileId()); 2578 dataOut.writeInt(object.getOffset()); 2579 dataOut.writeInt(object.getSize()); 2580 } 2581 2582 @Override 2583 public int getFixedSize() { 2584 return 12; 2585 } 2586 2587 @Override 2588 public Location deepCopy(Location source) { 2589 return new Location(source); 2590 } 2591 2592 @Override 2593 public boolean isDeepCopySupported() { 2594 return true; 2595 } 2596 } 2597 2598 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2599 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2600 if (sequences == null) { 2601 sequences = new SequenceSet(); 2602 sequences.add(messageSequence); 2603 sd.ackPositions.add(tx, subscriptionKey, sequences); 2604 } else { 2605 sequences.add(messageSequence); 2606 sd.ackPositions.put(tx, subscriptionKey, sequences); 2607 } 2608 2609 Long count = sd.messageReferences.get(messageSequence); 2610 if (count == null) { 2611 count = Long.valueOf(0L); 2612 } 2613 count = count.longValue() + 1; 2614 sd.messageReferences.put(messageSequence, count); 2615 } 2616 2617 // new sub is interested in potentially all existing messages 2618 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2619 SequenceSet allOutstanding = new SequenceSet(); 2620 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2621 while (iterator.hasNext()) { 2622 SequenceSet set = iterator.next().getValue(); 2623 for (Long entry : set) { 2624 allOutstanding.add(entry); 2625 } 2626 } 2627 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2628 2629 for (Long ackPosition : allOutstanding) { 2630 Long count = sd.messageReferences.get(ackPosition); 2631 2632 // There might not be a reference if the ackLocation was the last 2633 // one which is a placeholder for the next incoming message and 2634 // no value was added to the message references table. 2635 if (count != null) { 2636 count = count.longValue() + 1; 2637 sd.messageReferences.put(ackPosition, count); 2638 } 2639 } 2640 } 2641 2642 // on a new message add, all existing subs are interested in this message 2643 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2644 for(String subscriptionKey : sd.subscriptionCache) { 2645 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2646 if (sequences == null) { 2647 sequences = new SequenceSet(); 2648 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2649 sd.ackPositions.add(tx, subscriptionKey, sequences); 2650 } else { 2651 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2652 sd.ackPositions.put(tx, subscriptionKey, sequences); 2653 } 2654 2655 Long count = sd.messageReferences.get(messageSequence); 2656 if (count == null) { 2657 count = Long.valueOf(0L); 2658 } 2659 count = count.longValue() + 1; 2660 sd.messageReferences.put(messageSequence, count); 2661 sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); 2662 } 2663 } 2664 2665 private void removeAckLocationsForSub(KahaSubscriptionCommand command, 2666 Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2667 if (!sd.ackPositions.isEmpty(tx)) { 2668 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2669 if (sequences == null || sequences.isEmpty()) { 2670 return; 2671 } 2672 2673 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2674 2675 for(Long sequenceId : sequences) { 2676 Long references = sd.messageReferences.get(sequenceId); 2677 if (references != null) { 2678 references = references.longValue() - 1; 2679 2680 if (references.longValue() > 0) { 2681 sd.messageReferences.put(sequenceId, references); 2682 } else { 2683 sd.messageReferences.remove(sequenceId); 2684 unreferenced.add(sequenceId); 2685 } 2686 } 2687 } 2688 2689 for(Long sequenceId : unreferenced) { 2690 // Find all the entries that need to get deleted. 2691 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2692 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2693 2694 // Do the actual deletes. 2695 for (Entry<Long, MessageKeys> entry : deletes) { 2696 sd.locationIndex.remove(tx, entry.getValue().location); 2697 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2698 sd.orderIndex.remove(tx, entry.getKey()); 2699 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2700 } 2701 } 2702 } 2703 } 2704 2705 /** 2706 * @param tx 2707 * @param sd 2708 * @param subscriptionKey 2709 * @param messageSequence 2710 * @throws IOException 2711 */ 2712 private void removeAckLocation(KahaRemoveMessageCommand command, 2713 Transaction tx, StoredDestination sd, String subscriptionKey, 2714 Long messageSequence) throws IOException { 2715 // Remove the sub from the previous location set.. 2716 if (messageSequence != null) { 2717 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2718 if (range != null && !range.isEmpty()) { 2719 range.remove(messageSequence); 2720 if (!range.isEmpty()) { 2721 sd.ackPositions.put(tx, subscriptionKey, range); 2722 } else { 2723 sd.ackPositions.remove(tx, subscriptionKey); 2724 } 2725 2726 // Check if the message is reference by any other subscription. 2727 Long count = sd.messageReferences.get(messageSequence); 2728 if (count != null) { 2729 long references = count.longValue() - 1; 2730 if (references > 0) { 2731 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2732 return; 2733 } else { 2734 sd.messageReferences.remove(messageSequence); 2735 } 2736 } 2737 2738 // Find all the entries that need to get deleted. 2739 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2740 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2741 2742 // Do the actual deletes. 2743 for (Entry<Long, MessageKeys> entry : deletes) { 2744 sd.locationIndex.remove(tx, entry.getValue().location); 2745 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2746 sd.orderIndex.remove(tx, entry.getKey()); 2747 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2748 } 2749 } 2750 } 2751 } 2752 2753 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2754 return sd.subscriptionAcks.get(tx, subscriptionKey); 2755 } 2756 2757 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2758 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2759 if (messageSequences != null) { 2760 long result = messageSequences.rangeSize(); 2761 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2762 return result > 0 ? result - 1 : 0; 2763 } 2764 2765 return 0; 2766 } 2767 2768 public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2769 //grab the messages attached to this subscription 2770 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2771 2772 long locationSize = 0; 2773 if (messageSequences != null) { 2774 Sequence head = messageSequences.getHead(); 2775 if (head != null) { 2776 //get an iterator over the order index starting at the first unacked message 2777 //and go over each message to add up the size 2778 Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, 2779 new MessageOrderCursor(head.getFirst())); 2780 2781 while (iterator.hasNext()) { 2782 Entry<Long, MessageKeys> entry = iterator.next(); 2783 locationSize += entry.getValue().location.getSize(); 2784 } 2785 } 2786 } 2787 2788 return locationSize; 2789 } 2790 2791 protected String key(KahaDestination destination) { 2792 return destination.getType().getNumber() + ":" + destination.getName(); 2793 } 2794 2795 // ///////////////////////////////////////////////////////////////// 2796 // Transaction related implementation methods. 2797 // ///////////////////////////////////////////////////////////////// 2798 @SuppressWarnings("rawtypes") 2799 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2800 @SuppressWarnings("rawtypes") 2801 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2802 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 2803 protected final Set<String> rolledBackAcks = new HashSet<String>(); 2804 2805 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 2806 // till then they are skipped by the store. 2807 // 'at most once' XA guarantee 2808 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 2809 this.indexLock.writeLock().lock(); 2810 try { 2811 for (MessageAck ack : acks) { 2812 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 2813 } 2814 } finally { 2815 this.indexLock.writeLock().unlock(); 2816 } 2817 } 2818 2819 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 2820 if (acks != null) { 2821 this.indexLock.writeLock().lock(); 2822 try { 2823 for (MessageAck ack : acks) { 2824 final String id = ack.getLastMessageId().toProducerKey(); 2825 ackedAndPrepared.remove(id); 2826 if (rollback) { 2827 rolledBackAcks.add(id); 2828 } 2829 } 2830 } finally { 2831 this.indexLock.writeLock().unlock(); 2832 } 2833 } 2834 } 2835 2836 @SuppressWarnings("rawtypes") 2837 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2838 TransactionId key = TransactionIdConversion.convert(info); 2839 List<Operation> tx; 2840 synchronized (inflightTransactions) { 2841 tx = inflightTransactions.get(key); 2842 if (tx == null) { 2843 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2844 inflightTransactions.put(key, tx); 2845 } 2846 } 2847 return tx; 2848 } 2849 2850 @SuppressWarnings("unused") 2851 private TransactionId key(KahaTransactionInfo transactionInfo) { 2852 return TransactionIdConversion.convert(transactionInfo); 2853 } 2854 2855 abstract class Operation <T extends JournalCommand<T>> { 2856 final T command; 2857 final Location location; 2858 2859 public Operation(T command, Location location) { 2860 this.command = command; 2861 this.location = location; 2862 } 2863 2864 public Location getLocation() { 2865 return location; 2866 } 2867 2868 public T getCommand() { 2869 return command; 2870 } 2871 2872 abstract public void execute(Transaction tx) throws IOException; 2873 } 2874 2875 class AddOperation extends Operation<KahaAddMessageCommand> { 2876 final IndexAware runWithIndexLock; 2877 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2878 super(command, location); 2879 this.runWithIndexLock = runWithIndexLock; 2880 } 2881 2882 @Override 2883 public void execute(Transaction tx) throws IOException { 2884 long seq = updateIndex(tx, command, location); 2885 if (runWithIndexLock != null) { 2886 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2887 } 2888 } 2889 } 2890 2891 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2892 2893 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2894 super(command, location); 2895 } 2896 2897 @Override 2898 public void execute(Transaction tx) throws IOException { 2899 updateIndex(tx, command, location); 2900 } 2901 } 2902 2903 // ///////////////////////////////////////////////////////////////// 2904 // Initialization related implementation methods. 2905 // ///////////////////////////////////////////////////////////////// 2906 2907 private PageFile createPageFile() throws IOException { 2908 if (indexDirectory == null) { 2909 indexDirectory = directory; 2910 } 2911 IOHelper.mkdirs(indexDirectory); 2912 PageFile index = new PageFile(indexDirectory, "db"); 2913 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2914 index.setWriteBatchSize(getIndexWriteBatchSize()); 2915 index.setPageCacheSize(indexCacheSize); 2916 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2917 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2918 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2919 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2920 index.setEnablePageCaching(isEnableIndexPageCaching()); 2921 return index; 2922 } 2923 2924 private Journal createJournal() throws IOException { 2925 Journal manager = new Journal(); 2926 manager.setDirectory(directory); 2927 manager.setMaxFileLength(getJournalMaxFileLength()); 2928 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2929 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2930 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2931 manager.setArchiveDataLogs(isArchiveDataLogs()); 2932 manager.setSizeAccumulator(journalSize); 2933 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2934 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2935 manager.setPreallocationStrategy( 2936 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2937 if (getDirectoryArchive() != null) { 2938 IOHelper.mkdirs(getDirectoryArchive()); 2939 manager.setDirectoryArchive(getDirectoryArchive()); 2940 } 2941 return manager; 2942 } 2943 2944 private Metadata createMetadata() { 2945 Metadata md = new Metadata(); 2946 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2947 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2948 return md; 2949 } 2950 2951 protected abstract void configureMetadata(); 2952 2953 public int getJournalMaxWriteBatchSize() { 2954 return journalMaxWriteBatchSize; 2955 } 2956 2957 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2958 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2959 } 2960 2961 public File getDirectory() { 2962 return directory; 2963 } 2964 2965 public void setDirectory(File directory) { 2966 this.directory = directory; 2967 } 2968 2969 public boolean isDeleteAllMessages() { 2970 return deleteAllMessages; 2971 } 2972 2973 public void setDeleteAllMessages(boolean deleteAllMessages) { 2974 this.deleteAllMessages = deleteAllMessages; 2975 } 2976 2977 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2978 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2979 } 2980 2981 public int getIndexWriteBatchSize() { 2982 return setIndexWriteBatchSize; 2983 } 2984 2985 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2986 this.enableIndexWriteAsync = enableIndexWriteAsync; 2987 } 2988 2989 boolean isEnableIndexWriteAsync() { 2990 return enableIndexWriteAsync; 2991 } 2992 2993 public boolean isEnableJournalDiskSyncs() { 2994 return enableJournalDiskSyncs; 2995 } 2996 2997 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2998 this.enableJournalDiskSyncs = syncWrites; 2999 } 3000 3001 public long getCheckpointInterval() { 3002 return checkpointInterval; 3003 } 3004 3005 public void setCheckpointInterval(long checkpointInterval) { 3006 this.checkpointInterval = checkpointInterval; 3007 } 3008 3009 public long getCleanupInterval() { 3010 return cleanupInterval; 3011 } 3012 3013 public void setCleanupInterval(long cleanupInterval) { 3014 this.cleanupInterval = cleanupInterval; 3015 } 3016 3017 public void setJournalMaxFileLength(int journalMaxFileLength) { 3018 this.journalMaxFileLength = journalMaxFileLength; 3019 } 3020 3021 public int getJournalMaxFileLength() { 3022 return journalMaxFileLength; 3023 } 3024 3025 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3026 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3027 } 3028 3029 public int getMaxFailoverProducersToTrack() { 3030 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3031 } 3032 3033 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3034 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3035 } 3036 3037 public int getFailoverProducersAuditDepth() { 3038 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3039 } 3040 3041 public PageFile getPageFile() throws IOException { 3042 if (pageFile == null) { 3043 pageFile = createPageFile(); 3044 } 3045 return pageFile; 3046 } 3047 3048 public Journal getJournal() throws IOException { 3049 if (journal == null) { 3050 journal = createJournal(); 3051 } 3052 return journal; 3053 } 3054 3055 public boolean isFailIfDatabaseIsLocked() { 3056 return failIfDatabaseIsLocked; 3057 } 3058 3059 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3060 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3061 } 3062 3063 public boolean isIgnoreMissingJournalfiles() { 3064 return ignoreMissingJournalfiles; 3065 } 3066 3067 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3068 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3069 } 3070 3071 public int getIndexCacheSize() { 3072 return indexCacheSize; 3073 } 3074 3075 public void setIndexCacheSize(int indexCacheSize) { 3076 this.indexCacheSize = indexCacheSize; 3077 } 3078 3079 public boolean isCheckForCorruptJournalFiles() { 3080 return checkForCorruptJournalFiles; 3081 } 3082 3083 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3084 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3085 } 3086 3087 public boolean isChecksumJournalFiles() { 3088 return checksumJournalFiles; 3089 } 3090 3091 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3092 this.checksumJournalFiles = checksumJournalFiles; 3093 } 3094 3095 @Override 3096 public void setBrokerService(BrokerService brokerService) { 3097 this.brokerService = brokerService; 3098 } 3099 3100 /** 3101 * @return the archiveDataLogs 3102 */ 3103 public boolean isArchiveDataLogs() { 3104 return this.archiveDataLogs; 3105 } 3106 3107 /** 3108 * @param archiveDataLogs the archiveDataLogs to set 3109 */ 3110 public void setArchiveDataLogs(boolean archiveDataLogs) { 3111 this.archiveDataLogs = archiveDataLogs; 3112 } 3113 3114 /** 3115 * @return the directoryArchive 3116 */ 3117 public File getDirectoryArchive() { 3118 return this.directoryArchive; 3119 } 3120 3121 /** 3122 * @param directoryArchive the directoryArchive to set 3123 */ 3124 public void setDirectoryArchive(File directoryArchive) { 3125 this.directoryArchive = directoryArchive; 3126 } 3127 3128 public boolean isArchiveCorruptedIndex() { 3129 return archiveCorruptedIndex; 3130 } 3131 3132 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3133 this.archiveCorruptedIndex = archiveCorruptedIndex; 3134 } 3135 3136 public float getIndexLFUEvictionFactor() { 3137 return indexLFUEvictionFactor; 3138 } 3139 3140 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3141 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3142 } 3143 3144 public boolean isUseIndexLFRUEviction() { 3145 return useIndexLFRUEviction; 3146 } 3147 3148 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3149 this.useIndexLFRUEviction = useIndexLFRUEviction; 3150 } 3151 3152 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3153 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3154 } 3155 3156 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3157 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3158 } 3159 3160 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3161 this.enableIndexPageCaching = enableIndexPageCaching; 3162 } 3163 3164 public boolean isEnableIndexDiskSyncs() { 3165 return enableIndexDiskSyncs; 3166 } 3167 3168 public boolean isEnableIndexRecoveryFile() { 3169 return enableIndexRecoveryFile; 3170 } 3171 3172 public boolean isEnableIndexPageCaching() { 3173 return enableIndexPageCaching; 3174 } 3175 3176 // ///////////////////////////////////////////////////////////////// 3177 // Internal conversion methods. 3178 // ///////////////////////////////////////////////////////////////// 3179 3180 class MessageOrderCursor{ 3181 long defaultCursorPosition; 3182 long lowPriorityCursorPosition; 3183 long highPriorityCursorPosition; 3184 MessageOrderCursor(){ 3185 } 3186 3187 MessageOrderCursor(long position){ 3188 this.defaultCursorPosition=position; 3189 this.lowPriorityCursorPosition=position; 3190 this.highPriorityCursorPosition=position; 3191 } 3192 3193 MessageOrderCursor(MessageOrderCursor other){ 3194 this.defaultCursorPosition=other.defaultCursorPosition; 3195 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3196 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3197 } 3198 3199 MessageOrderCursor copy() { 3200 return new MessageOrderCursor(this); 3201 } 3202 3203 void reset() { 3204 this.defaultCursorPosition=0; 3205 this.highPriorityCursorPosition=0; 3206 this.lowPriorityCursorPosition=0; 3207 } 3208 3209 void increment() { 3210 if (defaultCursorPosition!=0) { 3211 defaultCursorPosition++; 3212 } 3213 if (highPriorityCursorPosition!=0) { 3214 highPriorityCursorPosition++; 3215 } 3216 if (lowPriorityCursorPosition!=0) { 3217 lowPriorityCursorPosition++; 3218 } 3219 } 3220 3221 @Override 3222 public String toString() { 3223 return "MessageOrderCursor:[def:" + defaultCursorPosition 3224 + ", low:" + lowPriorityCursorPosition 3225 + ", high:" + highPriorityCursorPosition + "]"; 3226 } 3227 3228 public void sync(MessageOrderCursor other) { 3229 this.defaultCursorPosition=other.defaultCursorPosition; 3230 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3231 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3232 } 3233 } 3234 3235 class MessageOrderIndex { 3236 static final byte HI = 9; 3237 static final byte LO = 0; 3238 static final byte DEF = 4; 3239 3240 long nextMessageId; 3241 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3242 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3243 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3244 final MessageOrderCursor cursor = new MessageOrderCursor(); 3245 Long lastDefaultKey; 3246 Long lastHighKey; 3247 Long lastLowKey; 3248 byte lastGetPriority; 3249 final List<Long> pendingAdditions = new LinkedList<Long>(); 3250 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 3251 3252 MessageKeys remove(Transaction tx, Long key) throws IOException { 3253 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3254 if (result == null && highPriorityIndex!=null) { 3255 result = highPriorityIndex.remove(tx, key); 3256 if (result ==null && lowPriorityIndex!=null) { 3257 result = lowPriorityIndex.remove(tx, key); 3258 } 3259 } 3260 return result; 3261 } 3262 3263 void load(Transaction tx) throws IOException { 3264 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3265 defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3266 defaultPriorityIndex.load(tx); 3267 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3268 lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3269 lowPriorityIndex.load(tx); 3270 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3271 highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3272 highPriorityIndex.load(tx); 3273 } 3274 3275 void allocate(Transaction tx) throws IOException { 3276 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3277 if (metadata.version >= 2) { 3278 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3279 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3280 } 3281 } 3282 3283 void configureLast(Transaction tx) throws IOException { 3284 // Figure out the next key using the last entry in the destination. 3285 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3286 3287 addLast(orderedSet, highPriorityIndex, tx); 3288 addLast(orderedSet, defaultPriorityIndex, tx); 3289 addLast(orderedSet, lowPriorityIndex, tx); 3290 3291 if (!orderedSet.isEmpty()) { 3292 nextMessageId = orderedSet.last() + 1; 3293 } 3294 } 3295 3296 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3297 if (index != null) { 3298 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3299 if (lastEntry != null) { 3300 orderedSet.add(lastEntry.getKey()); 3301 } 3302 } 3303 } 3304 3305 void clear(Transaction tx) throws IOException { 3306 this.remove(tx); 3307 this.resetCursorPosition(); 3308 this.allocate(tx); 3309 this.load(tx); 3310 this.configureLast(tx); 3311 } 3312 3313 void remove(Transaction tx) throws IOException { 3314 defaultPriorityIndex.clear(tx); 3315 defaultPriorityIndex.unload(tx); 3316 tx.free(defaultPriorityIndex.getPageId()); 3317 if (lowPriorityIndex != null) { 3318 lowPriorityIndex.clear(tx); 3319 lowPriorityIndex.unload(tx); 3320 3321 tx.free(lowPriorityIndex.getPageId()); 3322 } 3323 if (highPriorityIndex != null) { 3324 highPriorityIndex.clear(tx); 3325 highPriorityIndex.unload(tx); 3326 tx.free(highPriorityIndex.getPageId()); 3327 } 3328 } 3329 3330 void resetCursorPosition() { 3331 this.cursor.reset(); 3332 lastDefaultKey = null; 3333 lastHighKey = null; 3334 lastLowKey = null; 3335 } 3336 3337 void setBatch(Transaction tx, Long sequence) throws IOException { 3338 if (sequence != null) { 3339 Long nextPosition = new Long(sequence.longValue() + 1); 3340 lastDefaultKey = sequence; 3341 cursor.defaultCursorPosition = nextPosition.longValue(); 3342 lastHighKey = sequence; 3343 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3344 lastLowKey = sequence; 3345 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3346 } 3347 } 3348 3349 void setBatch(Transaction tx, LastAck last) throws IOException { 3350 setBatch(tx, last.lastAckedSequence); 3351 if (cursor.defaultCursorPosition == 0 3352 && cursor.highPriorityCursorPosition == 0 3353 && cursor.lowPriorityCursorPosition == 0) { 3354 long next = last.lastAckedSequence + 1; 3355 switch (last.priority) { 3356 case DEF: 3357 cursor.defaultCursorPosition = next; 3358 cursor.highPriorityCursorPosition = next; 3359 break; 3360 case HI: 3361 cursor.highPriorityCursorPosition = next; 3362 break; 3363 case LO: 3364 cursor.lowPriorityCursorPosition = next; 3365 cursor.defaultCursorPosition = next; 3366 cursor.highPriorityCursorPosition = next; 3367 break; 3368 } 3369 } 3370 } 3371 3372 void stoppedIterating() { 3373 if (lastDefaultKey!=null) { 3374 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3375 } 3376 if (lastHighKey!=null) { 3377 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3378 } 3379 if (lastLowKey!=null) { 3380 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3381 } 3382 lastDefaultKey = null; 3383 lastHighKey = null; 3384 lastLowKey = null; 3385 } 3386 3387 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3388 throws IOException { 3389 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3390 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3391 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3392 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3393 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3394 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3395 } 3396 } 3397 3398 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3399 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3400 3401 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3402 deletes.add(iterator.next()); 3403 } 3404 3405 long getNextMessageId() { 3406 return nextMessageId++; 3407 } 3408 3409 void revertNextMessageId() { 3410 nextMessageId--; 3411 } 3412 3413 MessageKeys get(Transaction tx, Long key) throws IOException { 3414 MessageKeys result = defaultPriorityIndex.get(tx, key); 3415 if (result == null) { 3416 result = highPriorityIndex.get(tx, key); 3417 if (result == null) { 3418 result = lowPriorityIndex.get(tx, key); 3419 lastGetPriority = LO; 3420 } else { 3421 lastGetPriority = HI; 3422 } 3423 } else { 3424 lastGetPriority = DEF; 3425 } 3426 return result; 3427 } 3428 3429 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3430 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3431 return defaultPriorityIndex.put(tx, key, value); 3432 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3433 return highPriorityIndex.put(tx, key, value); 3434 } else { 3435 return lowPriorityIndex.put(tx, key, value); 3436 } 3437 } 3438 3439 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3440 return new MessageOrderIterator(tx,cursor,this); 3441 } 3442 3443 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3444 return new MessageOrderIterator(tx,m,this); 3445 } 3446 3447 public byte lastGetPriority() { 3448 return lastGetPriority; 3449 } 3450 3451 public boolean alreadyDispatched(Long sequence) { 3452 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3453 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3454 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3455 } 3456 3457 public void trackPendingAdd(Long seq) { 3458 synchronized (pendingAdditions) { 3459 pendingAdditions.add(seq); 3460 } 3461 } 3462 3463 public void trackPendingAddComplete(Long seq) { 3464 synchronized (pendingAdditions) { 3465 pendingAdditions.remove(seq); 3466 } 3467 } 3468 3469 public Long minPendingAdd() { 3470 synchronized (pendingAdditions) { 3471 if (!pendingAdditions.isEmpty()) { 3472 return pendingAdditions.get(0); 3473 } else { 3474 return null; 3475 } 3476 } 3477 } 3478 3479 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3480 Iterator<Entry<Long, MessageKeys>>currentIterator; 3481 final Iterator<Entry<Long, MessageKeys>>highIterator; 3482 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3483 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3484 3485 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3486 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3487 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3488 if (highPriorityIndex != null) { 3489 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3490 } else { 3491 this.highIterator = null; 3492 } 3493 if (lowPriorityIndex != null) { 3494 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3495 } else { 3496 this.lowIterator = null; 3497 } 3498 } 3499 3500 @Override 3501 public boolean hasNext() { 3502 if (currentIterator == null) { 3503 if (highIterator != null) { 3504 if (highIterator.hasNext()) { 3505 currentIterator = highIterator; 3506 return currentIterator.hasNext(); 3507 } 3508 if (defaultIterator.hasNext()) { 3509 currentIterator = defaultIterator; 3510 return currentIterator.hasNext(); 3511 } 3512 if (lowIterator.hasNext()) { 3513 currentIterator = lowIterator; 3514 return currentIterator.hasNext(); 3515 } 3516 return false; 3517 } else { 3518 currentIterator = defaultIterator; 3519 return currentIterator.hasNext(); 3520 } 3521 } 3522 if (highIterator != null) { 3523 if (currentIterator.hasNext()) { 3524 return true; 3525 } 3526 if (currentIterator == highIterator) { 3527 if (defaultIterator.hasNext()) { 3528 currentIterator = defaultIterator; 3529 return currentIterator.hasNext(); 3530 } 3531 if (lowIterator.hasNext()) { 3532 currentIterator = lowIterator; 3533 return currentIterator.hasNext(); 3534 } 3535 return false; 3536 } 3537 3538 if (currentIterator == defaultIterator) { 3539 if (lowIterator.hasNext()) { 3540 currentIterator = lowIterator; 3541 return currentIterator.hasNext(); 3542 } 3543 return false; 3544 } 3545 } 3546 return currentIterator.hasNext(); 3547 } 3548 3549 @Override 3550 public Entry<Long, MessageKeys> next() { 3551 Entry<Long, MessageKeys> result = currentIterator.next(); 3552 if (result != null) { 3553 Long key = result.getKey(); 3554 if (highIterator != null) { 3555 if (currentIterator == defaultIterator) { 3556 lastDefaultKey = key; 3557 } else if (currentIterator == highIterator) { 3558 lastHighKey = key; 3559 } else { 3560 lastLowKey = key; 3561 } 3562 } else { 3563 lastDefaultKey = key; 3564 } 3565 } 3566 return result; 3567 } 3568 3569 @Override 3570 public void remove() { 3571 throw new UnsupportedOperationException(); 3572 } 3573 } 3574 } 3575 3576 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3577 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3578 3579 @Override 3580 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3581 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3582 ObjectOutputStream oout = new ObjectOutputStream(baos); 3583 oout.writeObject(object); 3584 oout.flush(); 3585 oout.close(); 3586 byte[] data = baos.toByteArray(); 3587 dataOut.writeInt(data.length); 3588 dataOut.write(data); 3589 } 3590 3591 @Override 3592 @SuppressWarnings("unchecked") 3593 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3594 int dataLen = dataIn.readInt(); 3595 byte[] data = new byte[dataLen]; 3596 dataIn.readFully(data); 3597 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3598 ObjectInputStream oin = new ObjectInputStream(bais); 3599 try { 3600 return (HashSet<String>) oin.readObject(); 3601 } catch (ClassNotFoundException cfe) { 3602 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3603 ioe.initCause(cfe); 3604 throw ioe; 3605 } 3606 } 3607 } 3608 3609 public File getIndexDirectory() { 3610 return indexDirectory; 3611 } 3612 3613 public void setIndexDirectory(File indexDirectory) { 3614 this.indexDirectory = indexDirectory; 3615 } 3616 3617 interface IndexAware { 3618 public void sequenceAssignedWithIndexLocked(long index); 3619 } 3620 3621 public String getPreallocationScope() { 3622 return preallocationScope; 3623 } 3624 3625 public void setPreallocationScope(String preallocationScope) { 3626 this.preallocationScope = preallocationScope; 3627 } 3628 3629 public String getPreallocationStrategy() { 3630 return preallocationStrategy; 3631 } 3632 3633 public void setPreallocationStrategy(String preallocationStrategy) { 3634 this.preallocationStrategy = preallocationStrategy; 3635 } 3636 3637 public int getCompactAcksAfterNoGC() { 3638 return compactAcksAfterNoGC; 3639 } 3640 3641 /** 3642 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3643 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3644 * <p> 3645 * A value of -1 will disable this feature. 3646 * 3647 * @param compactAcksAfterNoGC 3648 * Number of empty GC cycles before we rewrite old ACKS. 3649 */ 3650 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3651 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3652 } 3653 3654 /** 3655 * Returns whether Ack compaction will ignore that the store is still growing 3656 * and run more often. 3657 * 3658 * @return the compactAcksIgnoresStoreGrowth current value. 3659 */ 3660 public boolean isCompactAcksIgnoresStoreGrowth() { 3661 return compactAcksIgnoresStoreGrowth; 3662 } 3663 3664 /** 3665 * Configure if Ack compaction will occur regardless of continued growth of the 3666 * journal logs meaning that the store has not run out of space yet. Because the 3667 * compaction operation can be costly this value is defaulted to off and the Ack 3668 * compaction is only done when it seems that the store cannot grow and larger. 3669 * 3670 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3671 */ 3672 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3673 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3674 } 3675 3676 /** 3677 * Returns whether Ack compaction is enabled 3678 * 3679 * @return enableAckCompaction 3680 */ 3681 public boolean isEnableAckCompaction() { 3682 return enableAckCompaction; 3683 } 3684 3685 /** 3686 * Configure if the Ack compaction task should be enabled to run 3687 * 3688 * @param enableAckCompaction 3689 */ 3690 public void setEnableAckCompaction(boolean enableAckCompaction) { 3691 this.enableAckCompaction = enableAckCompaction; 3692 } 3693}