001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.PersistenceAdapter; 046import org.apache.activemq.store.SharedFileLocker; 047import org.apache.activemq.store.TopicMessageStore; 048import org.apache.activemq.store.TransactionIdTransformer; 049import org.apache.activemq.store.TransactionIdTransformerAware; 050import org.apache.activemq.store.TransactionStore; 051import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 053import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 054import org.apache.activemq.usage.SystemUsage; 055import org.apache.activemq.util.ServiceStopper; 056 057/** 058 * An implementation of {@link PersistenceAdapter} designed for use with 059 * KahaDB - Embedded Lightweight Non-Relational Database 060 * 061 * @org.apache.xbean.XBean element="kahaDB" 062 * 063 */ 064public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware { 065 private final KahaDBStore letter = new KahaDBStore(); 066 067 /** 068 * @param context 069 * @throws IOException 070 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 071 */ 072 @Override 073 public void beginTransaction(ConnectionContext context) throws IOException { 074 this.letter.beginTransaction(context); 075 } 076 077 /** 078 * @param sync 079 * @throws IOException 080 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 081 */ 082 @Override 083 public void checkpoint(boolean sync) throws IOException { 084 this.letter.checkpoint(sync); 085 } 086 087 /** 088 * @param context 089 * @throws IOException 090 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 091 */ 092 @Override 093 public void commitTransaction(ConnectionContext context) throws IOException { 094 this.letter.commitTransaction(context); 095 } 096 097 /** 098 * @param destination 099 * @return MessageStore 100 * @throws IOException 101 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 102 */ 103 @Override 104 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 105 return this.letter.createQueueMessageStore(destination); 106 } 107 108 /** 109 * @param destination 110 * @return TopicMessageStore 111 * @throws IOException 112 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 113 */ 114 @Override 115 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 116 return this.letter.createTopicMessageStore(destination); 117 } 118 119 /** 120 * @return TransactionStore 121 * @throws IOException 122 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 123 */ 124 @Override 125 public TransactionStore createTransactionStore() throws IOException { 126 return this.letter.createTransactionStore(); 127 } 128 129 /** 130 * @throws IOException 131 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 132 */ 133 @Override 134 public void deleteAllMessages() throws IOException { 135 this.letter.deleteAllMessages(); 136 } 137 138 /** 139 * @return destinations 140 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 141 */ 142 @Override 143 public Set<ActiveMQDestination> getDestinations() { 144 return this.letter.getDestinations(); 145 } 146 147 /** 148 * @return lastMessageBrokerSequenceId 149 * @throws IOException 150 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 151 */ 152 @Override 153 public long getLastMessageBrokerSequenceId() throws IOException { 154 return this.letter.getLastMessageBrokerSequenceId(); 155 } 156 157 @Override 158 public long getLastProducerSequenceId(ProducerId id) throws IOException { 159 return this.letter.getLastProducerSequenceId(id); 160 } 161 162 /** 163 * @param destination 164 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 165 */ 166 @Override 167 public void removeQueueMessageStore(ActiveMQQueue destination) { 168 this.letter.removeQueueMessageStore(destination); 169 } 170 171 /** 172 * @param destination 173 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 174 */ 175 @Override 176 public void removeTopicMessageStore(ActiveMQTopic destination) { 177 this.letter.removeTopicMessageStore(destination); 178 } 179 180 /** 181 * @param context 182 * @throws IOException 183 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 184 */ 185 @Override 186 public void rollbackTransaction(ConnectionContext context) throws IOException { 187 this.letter.rollbackTransaction(context); 188 } 189 190 /** 191 * @param brokerName 192 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 193 */ 194 @Override 195 public void setBrokerName(String brokerName) { 196 this.letter.setBrokerName(brokerName); 197 } 198 199 /** 200 * @param usageManager 201 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 202 */ 203 @Override 204 public void setUsageManager(SystemUsage usageManager) { 205 this.letter.setUsageManager(usageManager); 206 } 207 208 /** 209 * @return the size of the store 210 * @see org.apache.activemq.store.PersistenceAdapter#size() 211 */ 212 @Override 213 public long size() { 214 return this.letter.size(); 215 } 216 217 /** 218 * @throws Exception 219 * @see org.apache.activemq.Service#start() 220 */ 221 @Override 222 public void doStart() throws Exception { 223 this.letter.start(); 224 225 if (brokerService != null && brokerService.isUseJmx()) { 226 PersistenceAdapterView view = new PersistenceAdapterView(this); 227 view.setInflightTransactionViewCallable(new Callable<String>() { 228 @Override 229 public String call() throws Exception { 230 return letter.getTransactions(); 231 } 232 }); 233 view.setDataViewCallable(new Callable<String>() { 234 @Override 235 public String call() throws Exception { 236 return letter.getJournal().getFileMap().keySet().toString(); 237 } 238 }); 239 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 240 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 241 } 242 } 243 244 /** 245 * @throws Exception 246 * @see org.apache.activemq.Service#stop() 247 */ 248 @Override 249 public void doStop(ServiceStopper stopper) throws Exception { 250 this.letter.stop(); 251 252 if (brokerService != null && brokerService.isUseJmx()) { 253 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 254 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 255 } 256 } 257 258 /** 259 * Get the journalMaxFileLength 260 * 261 * @return the journalMaxFileLength 262 */ 263 @Override 264 public int getJournalMaxFileLength() { 265 return this.letter.getJournalMaxFileLength(); 266 } 267 268 /** 269 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 270 * be used 271 * 272 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 273 */ 274 public void setJournalMaxFileLength(int journalMaxFileLength) { 275 this.letter.setJournalMaxFileLength(journalMaxFileLength); 276 } 277 278 /** 279 * Set the max number of producers (LRU cache) to track for duplicate sends 280 */ 281 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 282 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 283 } 284 285 public int getMaxFailoverProducersToTrack() { 286 return this.letter.getMaxFailoverProducersToTrack(); 287 } 288 289 /** 290 * set the audit window depth for duplicate suppression (should exceed the max transaction 291 * batch) 292 */ 293 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 294 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 295 } 296 297 public int getFailoverProducersAuditDepth() { 298 return this.letter.getFailoverProducersAuditDepth(); 299 } 300 301 /** 302 * Get the checkpointInterval 303 * 304 * @return the checkpointInterval 305 */ 306 public long getCheckpointInterval() { 307 return this.letter.getCheckpointInterval(); 308 } 309 310 /** 311 * Set the checkpointInterval 312 * 313 * @param checkpointInterval 314 * the checkpointInterval to set 315 */ 316 public void setCheckpointInterval(long checkpointInterval) { 317 this.letter.setCheckpointInterval(checkpointInterval); 318 } 319 320 /** 321 * Get the cleanupInterval 322 * 323 * @return the cleanupInterval 324 */ 325 public long getCleanupInterval() { 326 return this.letter.getCleanupInterval(); 327 } 328 329 /** 330 * Set the cleanupInterval 331 * 332 * @param cleanupInterval 333 * the cleanupInterval to set 334 */ 335 public void setCleanupInterval(long cleanupInterval) { 336 this.letter.setCleanupInterval(cleanupInterval); 337 } 338 339 /** 340 * Get the indexWriteBatchSize 341 * 342 * @return the indexWriteBatchSize 343 */ 344 public int getIndexWriteBatchSize() { 345 return this.letter.getIndexWriteBatchSize(); 346 } 347 348 /** 349 * Set the indexWriteBatchSize 350 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 351 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 352 * @param indexWriteBatchSize 353 * the indexWriteBatchSize to set 354 */ 355 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 356 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 357 } 358 359 /** 360 * Get the journalMaxWriteBatchSize 361 * 362 * @return the journalMaxWriteBatchSize 363 */ 364 public int getJournalMaxWriteBatchSize() { 365 return this.letter.getJournalMaxWriteBatchSize(); 366 } 367 368 /** 369 * Set the journalMaxWriteBatchSize 370 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 371 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 372 * @param journalMaxWriteBatchSize 373 * the journalMaxWriteBatchSize to set 374 */ 375 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 376 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 377 } 378 379 /** 380 * Get the enableIndexWriteAsync 381 * 382 * @return the enableIndexWriteAsync 383 */ 384 public boolean isEnableIndexWriteAsync() { 385 return this.letter.isEnableIndexWriteAsync(); 386 } 387 388 /** 389 * Set the enableIndexWriteAsync 390 * 391 * @param enableIndexWriteAsync 392 * the enableIndexWriteAsync to set 393 */ 394 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 395 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 396 } 397 398 /** 399 * Get the directory 400 * 401 * @return the directory 402 */ 403 @Override 404 public File getDirectory() { 405 return this.letter.getDirectory(); 406 } 407 408 /** 409 * @param dir 410 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 411 */ 412 @Override 413 public void setDirectory(File dir) { 414 this.letter.setDirectory(dir); 415 } 416 417 /** 418 * @return the currently configured location of the KahaDB index files. 419 */ 420 public File getIndexDirectory() { 421 return this.letter.getIndexDirectory(); 422 } 423 424 /** 425 * Sets the directory where KahaDB index files should be written. 426 * 427 * @param indexDirectory 428 * the directory where the KahaDB store index files should be written. 429 */ 430 public void setIndexDirectory(File indexDirectory) { 431 this.letter.setIndexDirectory(indexDirectory); 432 } 433 434 /** 435 * Get the enableJournalDiskSyncs 436 * 437 * @return the enableJournalDiskSyncs 438 */ 439 public boolean isEnableJournalDiskSyncs() { 440 return this.letter.isEnableJournalDiskSyncs(); 441 } 442 443 /** 444 * Set the enableJournalDiskSyncs 445 * 446 * @param enableJournalDiskSyncs 447 * the enableJournalDiskSyncs to set 448 */ 449 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 450 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 451 } 452 453 /** 454 * Get the indexCacheSize 455 * 456 * @return the indexCacheSize 457 */ 458 public int getIndexCacheSize() { 459 return this.letter.getIndexCacheSize(); 460 } 461 462 /** 463 * Set the indexCacheSize 464 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 465 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 466 * @param indexCacheSize 467 * the indexCacheSize to set 468 */ 469 public void setIndexCacheSize(int indexCacheSize) { 470 this.letter.setIndexCacheSize(indexCacheSize); 471 } 472 473 /** 474 * Get the ignoreMissingJournalfiles 475 * 476 * @return the ignoreMissingJournalfiles 477 */ 478 public boolean isIgnoreMissingJournalfiles() { 479 return this.letter.isIgnoreMissingJournalfiles(); 480 } 481 482 /** 483 * Set the ignoreMissingJournalfiles 484 * 485 * @param ignoreMissingJournalfiles 486 * the ignoreMissingJournalfiles to set 487 */ 488 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 489 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 490 } 491 492 public boolean isChecksumJournalFiles() { 493 return letter.isChecksumJournalFiles(); 494 } 495 496 public boolean isCheckForCorruptJournalFiles() { 497 return letter.isCheckForCorruptJournalFiles(); 498 } 499 500 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 501 letter.setChecksumJournalFiles(checksumJournalFiles); 502 } 503 504 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 505 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 506 } 507 508 @Override 509 public void setBrokerService(BrokerService brokerService) { 510 super.setBrokerService(brokerService); 511 letter.setBrokerService(brokerService); 512 } 513 514 public String getPreallocationScope() { 515 return letter.getPreallocationScope(); 516 } 517 518 public void setPreallocationScope(String preallocationScope) { 519 this.letter.setPreallocationScope(preallocationScope); 520 } 521 522 public String getPreallocationStrategy() { 523 return letter.getPreallocationStrategy(); 524 } 525 526 public void setPreallocationStrategy(String preallocationStrategy) { 527 this.letter.setPreallocationStrategy(preallocationStrategy); 528 } 529 530 public boolean isArchiveDataLogs() { 531 return letter.isArchiveDataLogs(); 532 } 533 534 public void setArchiveDataLogs(boolean archiveDataLogs) { 535 letter.setArchiveDataLogs(archiveDataLogs); 536 } 537 538 public File getDirectoryArchive() { 539 return letter.getDirectoryArchive(); 540 } 541 542 public void setDirectoryArchive(File directoryArchive) { 543 letter.setDirectoryArchive(directoryArchive); 544 } 545 546 public boolean isConcurrentStoreAndDispatchQueues() { 547 return letter.isConcurrentStoreAndDispatchQueues(); 548 } 549 550 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 551 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 552 } 553 554 public boolean isConcurrentStoreAndDispatchTopics() { 555 return letter.isConcurrentStoreAndDispatchTopics(); 556 } 557 558 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 559 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 560 } 561 562 public int getMaxAsyncJobs() { 563 return letter.getMaxAsyncJobs(); 564 } 565 /** 566 * @param maxAsyncJobs 567 * the maxAsyncJobs to set 568 */ 569 public void setMaxAsyncJobs(int maxAsyncJobs) { 570 letter.setMaxAsyncJobs(maxAsyncJobs); 571 } 572 573 /** 574 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 575 * 576 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 577 */ 578 @Deprecated 579 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 580 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 581 } 582 583 public boolean getForceRecoverIndex() { 584 return letter.getForceRecoverIndex(); 585 } 586 587 public void setForceRecoverIndex(boolean forceRecoverIndex) { 588 letter.setForceRecoverIndex(forceRecoverIndex); 589 } 590 591 public boolean isArchiveCorruptedIndex() { 592 return letter.isArchiveCorruptedIndex(); 593 } 594 595 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 596 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 597 } 598 599 public float getIndexLFUEvictionFactor() { 600 return letter.getIndexLFUEvictionFactor(); 601 } 602 603 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 604 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 605 } 606 607 public boolean isUseIndexLFRUEviction() { 608 return letter.isUseIndexLFRUEviction(); 609 } 610 611 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 612 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 613 } 614 615 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 616 letter.setEnableIndexDiskSyncs(diskSyncs); 617 } 618 619 public boolean isEnableIndexDiskSyncs() { 620 return letter.isEnableIndexDiskSyncs(); 621 } 622 623 public void setEnableIndexRecoveryFile(boolean enable) { 624 letter.setEnableIndexRecoveryFile(enable); 625 } 626 627 public boolean isEnableIndexRecoveryFile() { 628 return letter.isEnableIndexRecoveryFile(); 629 } 630 631 public void setEnableIndexPageCaching(boolean enable) { 632 letter.setEnableIndexPageCaching(enable); 633 } 634 635 public boolean isEnableIndexPageCaching() { 636 return letter.isEnableIndexPageCaching(); 637 } 638 639 public int getCompactAcksAfterNoGC() { 640 return letter.getCompactAcksAfterNoGC(); 641 } 642 643 /** 644 * Sets the number of GC cycles where no journal logs were removed before an attempt to 645 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 646 * <p> 647 * A value of -1 will disable this feature. 648 * 649 * @param compactAcksAfterNoGC 650 * Number of empty GC cycles before we rewrite old ACKS. 651 */ 652 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 653 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 654 } 655 656 public boolean isCompactAcksIgnoresStoreGrowth() { 657 return this.letter.isCompactAcksIgnoresStoreGrowth(); 658 } 659 660 /** 661 * Configure if Ack compaction will occur regardless of continued growth of the 662 * journal logs meaning that the store has not run out of space yet. Because the 663 * compaction operation can be costly this value is defaulted to off and the Ack 664 * compaction is only done when it seems that the store cannot grow and larger. 665 * 666 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 667 */ 668 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 669 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 670 } 671 672 /** 673 * Returns whether Ack compaction is enabled 674 * 675 * @return enableAckCompaction 676 */ 677 public boolean isEnableAckCompaction() { 678 return letter.isEnableAckCompaction(); 679 } 680 681 /** 682 * Configure if the Ack compaction task should be enabled to run 683 * 684 * @param enableAckCompaction 685 */ 686 public void setEnableAckCompaction(boolean enableAckCompaction) { 687 letter.setEnableAckCompaction(enableAckCompaction); 688 } 689 690 public KahaDBStore getStore() { 691 return letter; 692 } 693 694 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 695 if (txid == null) { 696 return null; 697 } 698 KahaTransactionInfo rc = new KahaTransactionInfo(); 699 700 if (txid.isLocalTransaction()) { 701 LocalTransactionId t = (LocalTransactionId) txid; 702 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 703 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 704 kahaTxId.setTransactionId(t.getValue()); 705 rc.setLocalTransactionId(kahaTxId); 706 } else { 707 XATransactionId t = (XATransactionId) txid; 708 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 709 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 710 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 711 kahaTxId.setFormatId(t.getFormatId()); 712 rc.setXaTransactionId(kahaTxId); 713 } 714 return rc; 715 } 716 717 @Override 718 public Locker createDefaultLocker() throws IOException { 719 SharedFileLocker locker = new SharedFileLocker(); 720 locker.configure(this); 721 return locker; 722 } 723 724 @Override 725 public void init() throws Exception {} 726 727 @Override 728 public String toString() { 729 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 730 return "KahaDBPersistenceAdapter[" + path + "]"; 731 } 732 733 @Override 734 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 735 getStore().setTransactionIdTransformer(transactionIdTransformer); 736 } 737 738 @Override 739 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 740 return this.letter.createJobSchedulerStore(); 741 } 742}