001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.HashMap; 022import java.util.List; 023 024import javax.jms.JMSException; 025import javax.jms.TransactionInProgressException; 026import javax.jms.TransactionRolledBackException; 027import javax.transaction.xa.XAException; 028import javax.transaction.xa.XAResource; 029import javax.transaction.xa.Xid; 030 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.DataArrayResponse; 033import org.apache.activemq.command.DataStructure; 034import org.apache.activemq.command.IntegerResponse; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.TransactionInfo; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.util.JMSExceptionSupport; 041import org.apache.activemq.util.LongSequenceGenerator; 042import org.apache.activemq.util.XASupport; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A TransactionContext provides the means to control a JMS transaction. It 048 * provides a local transaction interface and also an XAResource interface. <p/> 049 * An application server controls the transactional assignment of an XASession 050 * by obtaining its XAResource. It uses the XAResource to assign the session to 051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 052 * XAResource provides some fairly sophisticated facilities for interleaving 053 * work on multiple transactions, recovering a list of transactions in progress, 054 * and so on. A JTA aware JMS provider must fully implement this functionality. 055 * This could be done by using the services of a database that supports XA, or a 056 * JMS provider may choose to implement this functionality from scratch. <p/> 057 * 058 * 059 * @see javax.jms.Session 060 * @see javax.jms.QueueSession 061 * @see javax.jms.TopicSession 062 * @see javax.jms.XASession 063 */ 064public class TransactionContext implements XAResource { 065 066 public static final String xaErrorCodeMarker = "xaErrorCode:"; 067 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 068 069 // XATransactionId -> ArrayList of TransactionContext objects 070 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 071 new HashMap<TransactionId, List<TransactionContext>>(); 072 073 private ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private List<Synchronization> synchronizations; 076 077 // To track XA transactions. 078 private Xid associatedXid; 079 private TransactionId transactionId; 080 private LocalTransactionEventListener localTransactionEventListener; 081 private int beforeEndIndex; 082 private volatile boolean rollbackOnly; 083 084 // for RAR recovery 085 public TransactionContext() { 086 localTransactionIdGenerator = null; 087 } 088 089 public TransactionContext(ActiveMQConnection connection) { 090 this.connection = connection; 091 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 092 } 093 094 public boolean isInXATransaction() { 095 if (transactionId != null && transactionId.isXATransaction()) { 096 return true; 097 } else { 098 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 099 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 100 if (transactions.contains(this)) { 101 return true; 102 } 103 } 104 } 105 } 106 107 return false; 108 } 109 110 public void setRollbackOnly(boolean val) { 111 rollbackOnly = val; 112 } 113 114 public boolean isInLocalTransaction() { 115 return transactionId != null && transactionId.isLocalTransaction(); 116 } 117 118 public boolean isInTransaction() { 119 return transactionId != null; 120 } 121 122 /** 123 * @return Returns the localTransactionEventListener. 124 */ 125 public LocalTransactionEventListener getLocalTransactionEventListener() { 126 return localTransactionEventListener; 127 } 128 129 /** 130 * Used by the resource adapter to listen to transaction events. 131 * 132 * @param localTransactionEventListener The localTransactionEventListener to 133 * set. 134 */ 135 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 136 this.localTransactionEventListener = localTransactionEventListener; 137 } 138 139 // /////////////////////////////////////////////////////////// 140 // 141 // Methods that work with the Synchronization objects registered with 142 // the transaction. 143 // 144 // /////////////////////////////////////////////////////////// 145 146 public void addSynchronization(Synchronization s) { 147 if (synchronizations == null) { 148 synchronizations = new ArrayList<Synchronization>(10); 149 } 150 synchronizations.add(s); 151 } 152 153 private void afterRollback() throws JMSException { 154 if (synchronizations == null) { 155 return; 156 } 157 158 Throwable firstException = null; 159 int size = synchronizations.size(); 160 for (int i = 0; i < size; i++) { 161 try { 162 synchronizations.get(i).afterRollback(); 163 } catch (Throwable t) { 164 LOG.debug("Exception from afterRollback on {}", synchronizations.get(i), t); 165 if (firstException == null) { 166 firstException = t; 167 } 168 } 169 } 170 synchronizations = null; 171 if (firstException != null) { 172 throw JMSExceptionSupport.create(firstException); 173 } 174 } 175 176 private void afterCommit() throws JMSException { 177 if (synchronizations == null) { 178 return; 179 } 180 181 Throwable firstException = null; 182 int size = synchronizations.size(); 183 for (int i = 0; i < size; i++) { 184 try { 185 synchronizations.get(i).afterCommit(); 186 } catch (Throwable t) { 187 LOG.debug("Exception from afterCommit on {}", synchronizations.get(i), t); 188 if (firstException == null) { 189 firstException = t; 190 } 191 } 192 } 193 synchronizations = null; 194 if (firstException != null) { 195 throw JMSExceptionSupport.create(firstException); 196 } 197 } 198 199 private void beforeEnd() throws JMSException { 200 if (synchronizations == null) { 201 return; 202 } 203 204 int size = synchronizations.size(); 205 try { 206 for (;beforeEndIndex < size;) { 207 synchronizations.get(beforeEndIndex++).beforeEnd(); 208 } 209 } catch (JMSException e) { 210 throw e; 211 } catch (Throwable e) { 212 throw JMSExceptionSupport.create(e); 213 } 214 } 215 216 public TransactionId getTransactionId() { 217 return transactionId; 218 } 219 220 // /////////////////////////////////////////////////////////// 221 // 222 // Local transaction interface. 223 // 224 // /////////////////////////////////////////////////////////// 225 226 /** 227 * Start a local transaction. 228 * @throws javax.jms.JMSException on internal error 229 */ 230 public void begin() throws JMSException { 231 232 if (isInXATransaction()) { 233 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 234 } 235 236 if (transactionId == null) { 237 synchronizations = null; 238 beforeEndIndex = 0; 239 setRollbackOnly(false); 240 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 241 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 242 this.connection.ensureConnectionInfoSent(); 243 this.connection.asyncSendPacket(info); 244 245 // Notify the listener that the tx was started. 246 if (localTransactionEventListener != null) { 247 localTransactionEventListener.beginEvent(); 248 } 249 250 LOG.debug("Begin:{}", transactionId); 251 } 252 } 253 254 /** 255 * Rolls back any work done in this transaction and releases any locks 256 * currently held. 257 * 258 * @throws JMSException if the JMS provider fails to roll back the 259 * transaction due to some internal error. 260 * @throws javax.jms.IllegalStateException if the method is not called by a 261 * transacted session. 262 */ 263 public void rollback() throws JMSException { 264 if (isInXATransaction()) { 265 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 266 } 267 268 try { 269 beforeEnd(); 270 } catch (TransactionRolledBackException canOcurrOnFailover) { 271 LOG.warn("rollback processing error", canOcurrOnFailover); 272 } 273 if (transactionId != null) { 274 LOG.debug("Rollback: {} syncCount: {}", 275 transactionId, (synchronizations != null ? synchronizations.size() : 0)); 276 277 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 278 this.transactionId = null; 279 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 280 this.connection.syncSendPacket(info); 281 // Notify the listener that the tx was rolled back 282 if (localTransactionEventListener != null) { 283 localTransactionEventListener.rollbackEvent(); 284 } 285 } 286 287 afterRollback(); 288 } 289 290 /** 291 * Commits all work done in this transaction and releases any locks 292 * currently held. 293 * 294 * @throws JMSException if the JMS provider fails to commit the transaction 295 * due to some internal error. 296 * @throws javax.jms.IllegalStateException if the method is not called by a 297 * transacted session. 298 */ 299 public void commit() throws JMSException { 300 if (isInXATransaction()) { 301 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 302 } 303 304 try { 305 beforeEnd(); 306 } catch (JMSException e) { 307 rollback(); 308 throw e; 309 } 310 311 if (transactionId != null && rollbackOnly) { 312 final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; 313 try { 314 rollback(); 315 } finally { 316 LOG.warn(message); 317 throw new TransactionRolledBackException(message); 318 } 319 } 320 321 // Only send commit if the transaction was started. 322 if (transactionId != null) { 323 LOG.debug("Commit: {} syncCount: {}", 324 transactionId, (synchronizations != null ? synchronizations.size() : 0)); 325 326 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 327 this.transactionId = null; 328 // Notify the listener that the tx was committed back 329 try { 330 this.connection.syncSendPacket(info); 331 if (localTransactionEventListener != null) { 332 localTransactionEventListener.commitEvent(); 333 } 334 afterCommit(); 335 } catch (JMSException cause) { 336 LOG.info("commit failed for transaction {}", info.getTransactionId(), cause); 337 if (localTransactionEventListener != null) { 338 localTransactionEventListener.rollbackEvent(); 339 } 340 afterRollback(); 341 throw cause; 342 } 343 344 } 345 } 346 347 // /////////////////////////////////////////////////////////// 348 // 349 // XAResource Implementation 350 // 351 // /////////////////////////////////////////////////////////// 352 /** 353 * Associates a transaction with the resource. 354 */ 355 @Override 356 public void start(Xid xid, int flags) throws XAException { 357 358 LOG.debug("Start: {}, flags: {}", xid, XASupport.toString(flags)); 359 360 if (isInLocalTransaction()) { 361 throw new XAException(XAException.XAER_PROTO); 362 } 363 // Are we already associated? 364 if (associatedXid != null) { 365 throw new XAException(XAException.XAER_PROTO); 366 } 367 368 // if ((flags & TMJOIN) == TMJOIN) { 369 // TODO: verify that the server has seen the xid 370 // // } 371 // if ((flags & TMRESUME) == TMRESUME) { 372 // // TODO: verify that the xid was suspended. 373 // } 374 375 // associate 376 synchronizations = null; 377 beforeEndIndex = 0; 378 setRollbackOnly(false); 379 setXid(xid); 380 } 381 382 /** 383 * @return connectionId for connection 384 */ 385 private ConnectionId getConnectionId() { 386 return connection.getConnectionInfo().getConnectionId(); 387 } 388 389 @Override 390 public void end(Xid xid, int flags) throws XAException { 391 392 LOG.debug("End: {}, flags: {}", xid, XASupport.toString(flags)); 393 394 if (isInLocalTransaction()) { 395 throw new XAException(XAException.XAER_PROTO); 396 } 397 398 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 399 // You can only suspend the associated xid. 400 if (!equals(associatedXid, xid)) { 401 throw new XAException(XAException.XAER_PROTO); 402 } 403 invokeBeforeEnd(); 404 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 405 // set to null if this is the current xid. 406 // otherwise this could be an asynchronous success call 407 if (equals(associatedXid, xid)) { 408 invokeBeforeEnd(); 409 } 410 } else { 411 throw new XAException(XAException.XAER_INVAL); 412 } 413 } 414 415 private void invokeBeforeEnd() throws XAException { 416 boolean throwingException = false; 417 try { 418 beforeEnd(); 419 } catch (JMSException e) { 420 throwingException = true; 421 throw toXAException(e); 422 } finally { 423 try { 424 setXid(null); 425 } catch (XAException ignoreIfWillMask){ 426 if (!throwingException) { 427 throw ignoreIfWillMask; 428 } 429 } 430 } 431 } 432 433 private boolean equals(Xid xid1, Xid xid2) { 434 if (xid1 == xid2) { 435 return true; 436 } 437 if (xid1 == null ^ xid2 == null) { 438 return false; 439 } 440 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 441 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 442 } 443 444 @Override 445 public int prepare(Xid xid) throws XAException { 446 LOG.debug("Prepare: {}", xid); 447 448 // We allow interleaving multiple transactions, so 449 // we don't limit prepare to the associated xid. 450 XATransactionId x; 451 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 452 // called first 453 if (xid == null || (equals(associatedXid, xid))) { 454 throw new XAException(XAException.XAER_PROTO); 455 } else { 456 // TODO: cache the known xids so we don't keep recreating this one?? 457 x = new XATransactionId(xid); 458 } 459 460 if (rollbackOnly) { 461 LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 462 throw new XAException(XAException.XA_RBINTEGRITY); 463 } 464 465 try { 466 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 467 468 // Find out if the server wants to commit or rollback. 469 IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); 470 if (XAResource.XA_RDONLY == response.getResult()) { 471 // transaction stops now, may be syncs that need a callback 472 List<TransactionContext> l; 473 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 474 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 475 } 476 // After commit may be expensive and can deadlock, do it outside global synch block 477 // No risk for concurrent updates as we own the list now 478 if (l != null) { 479 if(! l.isEmpty()) { 480 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); 481 for (TransactionContext ctx : l) { 482 ctx.afterCommit(); 483 } 484 } 485 } 486 } 487 return response.getResult(); 488 489 } catch (JMSException e) { 490 LOG.warn("prepare of: " + x + " failed with: " + e, e); 491 List<TransactionContext> l; 492 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 493 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 494 } 495 // After rollback may be expensive and can deadlock, do it outside global synch block 496 // No risk for concurrent updates as we own the list now 497 if (l != null) { 498 for (TransactionContext ctx : l) { 499 try { 500 ctx.afterRollback(); 501 } catch (Throwable ignored) { 502 LOG.debug("failed to firing afterRollback callbacks on prepare " + 503 "failure, txid: {}, context: {}", x, ctx, ignored); 504 } 505 } 506 } 507 throw toXAException(e); 508 } 509 } 510 511 @Override 512 public void rollback(Xid xid) throws XAException { 513 514 if (LOG.isDebugEnabled()) { 515 LOG.debug("Rollback: " + xid); 516 } 517 518 // We allow interleaving multiple transactions, so 519 // we don't limit rollback to the associated xid. 520 XATransactionId x; 521 if (xid == null) { 522 throw new XAException(XAException.XAER_PROTO); 523 } 524 if (equals(associatedXid, xid)) { 525 // I think this can happen even without an end(xid) call. Need to 526 // check spec. 527 x = (XATransactionId)transactionId; 528 } else { 529 x = new XATransactionId(xid); 530 } 531 532 try { 533 this.connection.checkClosedOrFailed(); 534 this.connection.ensureConnectionInfoSent(); 535 536 // Let the server know that the tx is rollback. 537 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 538 this.connection.syncSendPacket(info); 539 540 List<TransactionContext> l; 541 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 542 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 543 } 544 // After rollback may be expensive and can deadlock, do it outside global synch block 545 // No risk for concurrent updates as we own the list now 546 if (l != null) { 547 for (TransactionContext ctx : l) { 548 ctx.afterRollback(); 549 } 550 } 551 } catch (JMSException e) { 552 throw toXAException(e); 553 } 554 } 555 556 // XAResource interface 557 @Override 558 public void commit(Xid xid, boolean onePhase) throws XAException { 559 560 LOG.debug("Commit: {}, onePhase={}", xid, onePhase); 561 562 // We allow interleaving multiple transactions, so 563 // we don't limit commit to the associated xid. 564 XATransactionId x; 565 if (xid == null || (equals(associatedXid, xid))) { 566 // should never happen, end(xid,TMSUCCESS) must have been previously 567 // called 568 throw new XAException(XAException.XAER_PROTO); 569 } else { 570 x = new XATransactionId(xid); 571 } 572 573 if (rollbackOnly) { 574 LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 575 throw new XAException(XAException.XA_RBINTEGRITY); 576 } 577 578 try { 579 this.connection.checkClosedOrFailed(); 580 this.connection.ensureConnectionInfoSent(); 581 582 // Notify the server that the tx was committed back 583 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 584 585 this.connection.syncSendPacket(info); 586 587 List<TransactionContext> l; 588 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 589 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 590 } 591 // After commit may be expensive and can deadlock, do it outside global synch block 592 // No risk for concurrent updates as we own the list now 593 if (l != null) { 594 for (TransactionContext ctx : l) { 595 try { 596 ctx.afterCommit(); 597 } catch (Exception ignored) { 598 LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); 599 } 600 } 601 } 602 603 } catch (JMSException e) { 604 LOG.warn("commit of: " + x + " failed with: " + e, e); 605 if (onePhase) { 606 List<TransactionContext> l; 607 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 608 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 609 } 610 // After rollback may be expensive and can deadlock, do it outside global synch block 611 // No risk for concurrent updates as we own the list now 612 if (l != null) { 613 for (TransactionContext ctx : l) { 614 try { 615 ctx.afterRollback(); 616 } catch (Throwable ignored) { 617 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); 618 } 619 } 620 } 621 } 622 throw toXAException(e); 623 } 624 } 625 626 @Override 627 public void forget(Xid xid) throws XAException { 628 LOG.debug("Forget: {}", xid); 629 630 // We allow interleaving multiple transactions, so 631 // we don't limit forget to the associated xid. 632 XATransactionId x; 633 if (xid == null) { 634 throw new XAException(XAException.XAER_PROTO); 635 } 636 if (equals(associatedXid, xid)) { 637 // TODO determine if this can happen... I think not. 638 x = (XATransactionId)transactionId; 639 } else { 640 x = new XATransactionId(xid); 641 } 642 643 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 644 645 try { 646 // Tell the server to forget the transaction. 647 this.connection.syncSendPacket(info); 648 } catch (JMSException e) { 649 throw toXAException(e); 650 } 651 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 652 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 653 } 654 } 655 656 @Override 657 public boolean isSameRM(XAResource xaResource) throws XAException { 658 if (xaResource == null) { 659 return false; 660 } 661 if (!(xaResource instanceof TransactionContext)) { 662 return false; 663 } 664 TransactionContext xar = (TransactionContext)xaResource; 665 try { 666 return getResourceManagerId().equals(xar.getResourceManagerId()); 667 } catch (Throwable e) { 668 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 669 } 670 } 671 672 @Override 673 public Xid[] recover(int flag) throws XAException { 674 LOG.debug("recover({})", flag); 675 676 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 677 try { 678 this.connection.checkClosedOrFailed(); 679 this.connection.ensureConnectionInfoSent(); 680 681 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); 682 DataStructure[] data = receipt.getData(); 683 XATransactionId[] answer; 684 if (data instanceof XATransactionId[]) { 685 answer = (XATransactionId[])data; 686 } else { 687 answer = new XATransactionId[data.length]; 688 System.arraycopy(data, 0, answer, 0, data.length); 689 } 690 LOG.debug("recover({})={}", flag, answer); 691 return answer; 692 } catch (JMSException e) { 693 throw toXAException(e); 694 } 695 } 696 697 @Override 698 public int getTransactionTimeout() throws XAException { 699 return 0; 700 } 701 702 @Override 703 public boolean setTransactionTimeout(int seconds) throws XAException { 704 return false; 705 } 706 707 // /////////////////////////////////////////////////////////// 708 // 709 // Helper methods. 710 // 711 // /////////////////////////////////////////////////////////// 712 protected String getResourceManagerId() throws JMSException { 713 return this.connection.getResourceManagerId(); 714 } 715 716 private void setXid(Xid xid) throws XAException { 717 718 try { 719 this.connection.checkClosedOrFailed(); 720 this.connection.ensureConnectionInfoSent(); 721 } catch (JMSException e) { 722 disassociate(); 723 throw toXAException(e); 724 } 725 726 if (xid != null) { 727 // associate 728 associatedXid = xid; 729 transactionId = new XATransactionId(xid); 730 731 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 732 try { 733 this.connection.asyncSendPacket(info); 734 LOG.debug("{} started XA transaction {}", this, transactionId); 735 } catch (JMSException e) { 736 disassociate(); 737 throw toXAException(e); 738 } 739 740 } else { 741 742 if (transactionId != null) { 743 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 744 try { 745 this.connection.syncSendPacket(info); 746 LOG.debug("{} ended XA transaction {}", this, transactionId); 747 } catch (JMSException e) { 748 disassociate(); 749 throw toXAException(e); 750 } 751 752 // Add our self to the list of contexts that are interested in 753 // post commit/rollback events. 754 List<TransactionContext> l; 755 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 756 l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 757 if (l == null) { 758 l = new ArrayList<TransactionContext>(3); 759 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 760 } 761 if (!l.contains(this)) { 762 l.add(this); 763 } 764 } 765 } 766 767 disassociate(); 768 } 769 } 770 771 private void disassociate() { 772 // dis-associate 773 associatedXid = null; 774 transactionId = null; 775 } 776 777 /** 778 * Converts a JMSException from the server to an XAException. if the 779 * JMSException contained a linked XAException that is returned instead. 780 * 781 * @param e JMSException to convert 782 * @return XAException wrapping original exception or its message 783 */ 784 private XAException toXAException(JMSException e) { 785 if (e.getCause() != null && e.getCause() instanceof XAException) { 786 XAException original = (XAException)e.getCause(); 787 XAException xae = new XAException(original.getMessage()); 788 xae.errorCode = original.errorCode; 789 if (xae.errorCode == XA_OK) { 790 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 791 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 792 } 793 xae.initCause(original); 794 return xae; 795 } 796 797 XAException xae = new XAException(e.getMessage()); 798 xae.errorCode = XAException.XAER_RMFAIL; 799 xae.initCause(e); 800 return xae; 801 } 802 803 private int parseFromMessageOr(String message, int fallbackCode) { 804 final String marker = "xaErrorCode:"; 805 final int index = message.lastIndexOf(marker); 806 if (index > -1) { 807 try { 808 return Integer.parseInt(message.substring(index + marker.length())); 809 } catch (Exception ignored) {} 810 } 811 return fallbackCode; 812 } 813 814 public ActiveMQConnection getConnection() { 815 return connection; 816 } 817 818 // for RAR xa recovery where xaresource connection is per request 819 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 820 ActiveMQConnection existing = this.connection; 821 this.connection = connection; 822 return existing; 823 } 824 825 public void cleanup() { 826 associatedXid = null; 827 transactionId = null; 828 } 829 830 @Override 831 public String toString() { 832 return "TransactionContext{" + 833 "transactionId=" + transactionId + 834 ",connection=" + connection + 835 '}'; 836 } 837}