001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashSet; 023import java.util.Set; 024import java.util.TreeSet; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.command.Message; 032import org.apache.activemq.command.MessageAck; 033import org.apache.activemq.command.MessageId; 034import org.apache.activemq.command.TransactionId; 035import org.apache.activemq.command.XATransactionId; 036import org.apache.activemq.store.AbstractMessageStore; 037import org.apache.activemq.store.ListenableFuture; 038import org.apache.activemq.store.MessageStore; 039import org.apache.activemq.store.PersistenceAdapter; 040import org.apache.activemq.store.ProxyMessageStore; 041import org.apache.activemq.store.ProxyTopicMessageStore; 042import org.apache.activemq.store.TopicMessageStore; 043import org.apache.activemq.store.TransactionRecoveryListener; 044import org.apache.activemq.store.TransactionStore; 045import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 046import org.apache.activemq.store.kahadb.data.KahaEntryType; 047import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 048import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 049import org.apache.activemq.store.kahadb.disk.journal.Journal; 050import org.apache.activemq.store.kahadb.disk.journal.Location; 051import org.apache.activemq.util.DataByteArrayInputStream; 052import org.apache.activemq.util.DataByteArrayOutputStream; 053import org.apache.activemq.util.IOHelper; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057public class MultiKahaDBTransactionStore implements TransactionStore { 058 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 059 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 060 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 061 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); 062 private Journal journal; 063 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 064 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 065 private final AtomicBoolean started = new AtomicBoolean(false); 066 067 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 068 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 069 } 070 071 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 072 return new ProxyMessageStore(messageStore) { 073 @Override 074 public void addMessage(ConnectionContext context, final Message send) throws IOException { 075 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 076 } 077 078 @Override 079 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 080 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 081 } 082 083 @Override 084 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 085 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 086 } 087 088 @Override 089 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 090 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 091 } 092 093 @Override 094 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 095 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 096 } 097 098 @Override 099 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 100 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 101 } 102 }; 103 } 104 105 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 106 return new ProxyTopicMessageStore(messageStore) { 107 @Override 108 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 109 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 110 } 111 112 @Override 113 public void addMessage(ConnectionContext context, final Message send) throws IOException { 114 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 115 } 116 117 @Override 118 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 119 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 120 } 121 122 @Override 123 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 124 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 125 } 126 127 @Override 128 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 129 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 130 } 131 132 @Override 133 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 134 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 135 } 136 137 @Override 138 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 139 MessageId messageId, MessageAck ack) throws IOException { 140 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 141 subscriptionName, messageId, ack); 142 } 143 }; 144 } 145 146 public void deleteAllMessages() { 147 IOHelper.deleteChildren(getDirectory()); 148 } 149 150 public int getJournalMaxFileLength() { 151 return journalMaxFileLength; 152 } 153 154 public void setJournalMaxFileLength(int journalMaxFileLength) { 155 this.journalMaxFileLength = journalMaxFileLength; 156 } 157 158 public int getJournalMaxWriteBatchSize() { 159 return journalWriteBatchSize; 160 } 161 162 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 163 this.journalWriteBatchSize = journalWriteBatchSize; 164 } 165 166 public class Tx { 167 private final Set<TransactionStore> stores = new HashSet<TransactionStore>(); 168 private int prepareLocationId = 0; 169 170 public void trackStore(TransactionStore store) { 171 stores.add(store); 172 } 173 174 public Set<TransactionStore> getStores() { 175 return stores; 176 } 177 178 public void trackPrepareLocation(Location location) { 179 this.prepareLocationId = location.getDataFileId(); 180 } 181 182 public int getPreparedLocationId() { 183 return prepareLocationId; 184 } 185 } 186 187 public Tx getTx(TransactionId txid) { 188 Tx tx = inflightTransactions.get(txid); 189 if (tx == null) { 190 tx = new Tx(); 191 inflightTransactions.put(txid, tx); 192 } 193 return tx; 194 } 195 196 public Tx removeTx(TransactionId txid) { 197 return inflightTransactions.remove(txid); 198 } 199 200 @Override 201 public void prepare(TransactionId txid) throws IOException { 202 Tx tx = getTx(txid); 203 for (TransactionStore store : tx.getStores()) { 204 store.prepare(txid); 205 } 206 } 207 208 @Override 209 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 210 throws IOException { 211 212 if (preCommit != null) { 213 preCommit.run(); 214 } 215 216 Tx tx = getTx(txid); 217 if (wasPrepared) { 218 for (TransactionStore store : tx.getStores()) { 219 store.commit(txid, true, null, null); 220 } 221 } else { 222 // can only do 1pc on a single store 223 if (tx.getStores().size() == 1) { 224 for (TransactionStore store : tx.getStores()) { 225 store.commit(txid, false, null, null); 226 } 227 } else { 228 // need to do local 2pc 229 for (TransactionStore store : tx.getStores()) { 230 store.prepare(txid); 231 } 232 persistOutcome(tx, txid); 233 for (TransactionStore store : tx.getStores()) { 234 store.commit(txid, true, null, null); 235 } 236 persistCompletion(txid); 237 } 238 } 239 removeTx(txid); 240 if (postCommit != null) { 241 postCommit.run(); 242 } 243 } 244 245 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 246 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 247 } 248 249 public void persistCompletion(TransactionId txid) throws IOException { 250 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 251 } 252 253 private Location store(JournalCommand<?> data) throws IOException { 254 int size = data.serializedSizeFramed(); 255 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 256 os.writeByte(data.type().getNumber()); 257 data.writeFramed(os); 258 Location location = journal.write(os.toByteSequence(), true); 259 journal.setLastAppendLocation(location); 260 return location; 261 } 262 263 @Override 264 public void rollback(TransactionId txid) throws IOException { 265 Tx tx = removeTx(txid); 266 if (tx != null) { 267 for (TransactionStore store : tx.getStores()) { 268 store.rollback(txid); 269 } 270 } 271 } 272 273 @Override 274 public void start() throws Exception { 275 if (started.compareAndSet(false, true)) { 276 journal = new Journal() { 277 @Override 278 protected void cleanup() { 279 super.cleanup(); 280 txStoreCleanup(); 281 } 282 }; 283 journal.setDirectory(getDirectory()); 284 journal.setMaxFileLength(journalMaxFileLength); 285 journal.setWriteBatchSize(journalWriteBatchSize); 286 IOHelper.mkdirs(journal.getDirectory()); 287 journal.start(); 288 recoverPendingLocalTransactions(); 289 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 290 } 291 } 292 293 private void txStoreCleanup() { 294 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 295 for (Tx tx : inflightTransactions.values()) { 296 knownDataFileIds.remove(tx.getPreparedLocationId()); 297 } 298 try { 299 journal.removeDataFiles(knownDataFileIds); 300 } catch (Exception e) { 301 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 302 } 303 } 304 305 private File getDirectory() { 306 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 307 } 308 309 @Override 310 public void stop() throws Exception { 311 if (started.compareAndSet(true, false) && journal != null) { 312 journal.close(); 313 journal = null; 314 } 315 } 316 317 private void recoverPendingLocalTransactions() throws IOException { 318 Location location = journal.getNextLocation(null); 319 while (location != null) { 320 process(load(location)); 321 location = journal.getNextLocation(location); 322 } 323 recoveredPendingCommit.addAll(inflightTransactions.keySet()); 324 LOG.info("pending local transactions: " + recoveredPendingCommit); 325 } 326 327 public JournalCommand<?> load(Location location) throws IOException { 328 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 329 byte readByte = is.readByte(); 330 KahaEntryType type = KahaEntryType.valueOf(readByte); 331 if (type == null) { 332 throw new IOException("Could not load journal record. Invalid location: " + location); 333 } 334 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 335 message.mergeFramed(is); 336 return message; 337 } 338 339 public void process(JournalCommand<?> command) throws IOException { 340 switch (command.type()) { 341 case KAHA_PREPARE_COMMAND: 342 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 343 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); 344 break; 345 case KAHA_COMMIT_COMMAND: 346 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 347 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 348 break; 349 case KAHA_TRACE_COMMAND: 350 break; 351 default: 352 throw new IOException("Unexpected command in transaction journal: " + command); 353 } 354 } 355 356 357 @Override 358 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 359 360 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 361 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 362 @Override 363 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 364 try { 365 getTx(xid).trackStore(adapter.createTransactionStore()); 366 } catch (IOException e) { 367 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 368 } 369 listener.recover(xid, addedMessages, acks); 370 } 371 }); 372 } 373 374 try { 375 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 376 // force completion of local xa 377 for (TransactionId txid : broker.getPreparedTransactions(null)) { 378 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 379 try { 380 if (recoveredPendingCommit.contains(txid)) { 381 LOG.info("delivering pending commit outcome for tid: " + txid); 382 broker.commitTransaction(null, txid, false); 383 384 } else { 385 LOG.info("delivering rollback outcome to store for tid: " + txid); 386 broker.forgetTransaction(null, txid); 387 } 388 persistCompletion(txid); 389 } catch (Exception ex) { 390 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 391 } 392 } 393 } 394 } catch (Exception e) { 395 LOG.error("failed to resolve pending local transactions", e); 396 } 397 } 398 399 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 400 throws IOException { 401 if (message.getTransactionId() != null) { 402 getTx(message.getTransactionId()).trackStore(transactionStore); 403 } 404 destination.addMessage(context, message); 405 } 406 407 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 408 throws IOException { 409 if (message.getTransactionId() != null) { 410 getTx(message.getTransactionId()).trackStore(transactionStore); 411 destination.addMessage(context, message); 412 return AbstractMessageStore.FUTURE; 413 } else { 414 return destination.asyncAddQueueMessage(context, message); 415 } 416 } 417 418 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 419 throws IOException { 420 421 if (message.getTransactionId() != null) { 422 getTx(message.getTransactionId()).trackStore(transactionStore); 423 destination.addMessage(context, message); 424 return AbstractMessageStore.FUTURE; 425 } else { 426 return destination.asyncAddTopicMessage(context, message); 427 } 428 } 429 430 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 431 throws IOException { 432 if (ack.getTransactionId() != null) { 433 getTx(ack.getTransactionId()).trackStore(transactionStore); 434 } 435 destination.removeMessage(context, ack); 436 } 437 438 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 439 throws IOException { 440 if (ack.getTransactionId() != null) { 441 getTx(ack.getTransactionId()).trackStore(transactionStore); 442 } 443 destination.removeAsyncMessage(context, ack); 444 } 445 446 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 447 final String clientId, final String subscriptionName, 448 final MessageId messageId, final MessageAck ack) throws IOException { 449 if (ack.getTransactionId() != null) { 450 getTx(ack.getTransactionId()).trackStore(transactionStore); 451 } 452 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 453 } 454 455}