001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import javax.transaction.xa.XAResource; 040 041import org.apache.activemq.advisory.AdvisorySupport; 042import org.apache.activemq.broker.region.ConnectionStatistics; 043import org.apache.activemq.broker.region.RegionBroker; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.command.Command; 047import org.apache.activemq.command.CommandTypes; 048import org.apache.activemq.command.ConnectionControl; 049import org.apache.activemq.command.ConnectionError; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerControl; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.ControlCommand; 056import org.apache.activemq.command.DataArrayResponse; 057import org.apache.activemq.command.DestinationInfo; 058import org.apache.activemq.command.ExceptionResponse; 059import org.apache.activemq.command.FlushCommand; 060import org.apache.activemq.command.IntegerResponse; 061import org.apache.activemq.command.KeepAliveInfo; 062import org.apache.activemq.command.Message; 063import org.apache.activemq.command.MessageAck; 064import org.apache.activemq.command.MessageDispatch; 065import org.apache.activemq.command.MessageDispatchNotification; 066import org.apache.activemq.command.MessagePull; 067import org.apache.activemq.command.ProducerAck; 068import org.apache.activemq.command.ProducerId; 069import org.apache.activemq.command.ProducerInfo; 070import org.apache.activemq.command.RemoveInfo; 071import org.apache.activemq.command.RemoveSubscriptionInfo; 072import org.apache.activemq.command.Response; 073import org.apache.activemq.command.SessionId; 074import org.apache.activemq.command.SessionInfo; 075import org.apache.activemq.command.ShutdownInfo; 076import org.apache.activemq.command.TransactionId; 077import org.apache.activemq.command.TransactionInfo; 078import org.apache.activemq.command.WireFormatInfo; 079import org.apache.activemq.network.DemandForwardingBridge; 080import org.apache.activemq.network.MBeanNetworkListener; 081import org.apache.activemq.network.NetworkBridgeConfiguration; 082import org.apache.activemq.network.NetworkBridgeFactory; 083import org.apache.activemq.security.MessageAuthorizationPolicy; 084import org.apache.activemq.state.CommandVisitor; 085import org.apache.activemq.state.ConnectionState; 086import org.apache.activemq.state.ConsumerState; 087import org.apache.activemq.state.ProducerState; 088import org.apache.activemq.state.SessionState; 089import org.apache.activemq.state.TransactionState; 090import org.apache.activemq.thread.Task; 091import org.apache.activemq.thread.TaskRunner; 092import org.apache.activemq.thread.TaskRunnerFactory; 093import org.apache.activemq.transaction.Transaction; 094import org.apache.activemq.transport.DefaultTransportListener; 095import org.apache.activemq.transport.ResponseCorrelator; 096import org.apache.activemq.transport.TransmitCallback; 097import org.apache.activemq.transport.Transport; 098import org.apache.activemq.transport.TransportDisposedIOException; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.MarshallingSupport; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103import org.slf4j.MDC; 104 105public class TransportConnection implements Connection, Task, CommandVisitor { 106 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 107 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 108 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 109 // Keeps track of the broker and connector that created this connection. 110 protected final Broker broker; 111 protected final TransportConnector connector; 112 // Keeps track of the state of the connections. 113 // protected final ConcurrentHashMap localConnectionStates=new 114 // ConcurrentHashMap(); 115 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 116 // The broker and wireformat info that was exchanged. 117 protected BrokerInfo brokerInfo; 118 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 119 protected TaskRunner taskRunner; 120 protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>(); 121 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 122 private final Transport transport; 123 private MessageAuthorizationPolicy messageAuthorizationPolicy; 124 private WireFormatInfo wireFormatInfo; 125 // Used to do async dispatch.. this should perhaps be pushed down into the 126 // transport layer.. 127 private boolean inServiceException; 128 private final ConnectionStatistics statistics = new ConnectionStatistics(); 129 private boolean manageable; 130 private boolean slow; 131 private boolean markedCandidate; 132 private boolean blockedCandidate; 133 private boolean blocked; 134 private boolean connected; 135 private boolean active; 136 private boolean starting; 137 private boolean pendingStop; 138 private long timeStamp; 139 private final AtomicBoolean stopping = new AtomicBoolean(false); 140 private final CountDownLatch stopped = new CountDownLatch(1); 141 private final AtomicBoolean asyncException = new AtomicBoolean(false); 142 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 143 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 144 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 145 private ConnectionContext context; 146 private boolean networkConnection; 147 private boolean faultTolerantConnection; 148 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 149 private DemandForwardingBridge duplexBridge; 150 private final TaskRunnerFactory taskRunnerFactory; 151 private final TaskRunnerFactory stopTaskRunnerFactory; 152 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 153 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 154 private String duplexNetworkConnectorId; 155 156 /** 157 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 158 * else commands are sent async. 159 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 160 */ 161 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 162 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 163 this.connector = connector; 164 this.broker = broker; 165 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 166 brokerConnectionStates = rb.getConnectionStates(); 167 if (connector != null) { 168 this.statistics.setParent(connector.getStatistics()); 169 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 170 } 171 this.taskRunnerFactory = taskRunnerFactory; 172 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 173 this.transport = transport; 174 final BrokerService brokerService = this.broker.getBrokerService(); 175 if( this.transport instanceof BrokerServiceAware ) { 176 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 177 } 178 this.transport.setTransportListener(new DefaultTransportListener() { 179 @Override 180 public void onCommand(Object o) { 181 serviceLock.readLock().lock(); 182 try { 183 if (!(o instanceof Command)) { 184 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 185 } 186 Command command = (Command) o; 187 if (!brokerService.isStopping()) { 188 Response response = service(command); 189 if (response != null && !brokerService.isStopping()) { 190 dispatchSync(response); 191 } 192 } else { 193 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 194 } 195 } finally { 196 serviceLock.readLock().unlock(); 197 } 198 } 199 200 @Override 201 public void onException(IOException exception) { 202 serviceLock.readLock().lock(); 203 try { 204 serviceTransportException(exception); 205 } finally { 206 serviceLock.readLock().unlock(); 207 } 208 } 209 }); 210 connected = true; 211 } 212 213 /** 214 * Returns the number of messages to be dispatched to this connection 215 * 216 * @return size of dispatch queue 217 */ 218 @Override 219 public int getDispatchQueueSize() { 220 synchronized (dispatchQueue) { 221 return dispatchQueue.size(); 222 } 223 } 224 225 public void serviceTransportException(IOException e) { 226 BrokerService bService = connector.getBrokerService(); 227 if (bService.isShutdownOnSlaveFailure()) { 228 if (brokerInfo != null) { 229 if (brokerInfo.isSlaveBroker()) { 230 LOG.error("Slave has exception: {} shutting down master now.", e.getMessage(), e); 231 try { 232 doStop(); 233 bService.stop(); 234 } catch (Exception ex) { 235 LOG.warn("Failed to stop the master", ex); 236 } 237 } 238 } 239 } 240 if (!stopping.get() && !pendingStop) { 241 transportException.set(e); 242 if (TRANSPORTLOG.isDebugEnabled()) { 243 TRANSPORTLOG.debug(this + " failed: " + e, e); 244 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { 245 TRANSPORTLOG.warn(this + " failed: " + e); 246 } 247 stopAsync(e); 248 } 249 } 250 251 private boolean expected(IOException e) { 252 return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 253 } 254 255 private boolean isStomp() { 256 URI uri = connector.getUri(); 257 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1; 258 } 259 260 /** 261 * Calls the serviceException method in an async thread. Since handling a 262 * service exception closes a socket, we should not tie up broker threads 263 * since client sockets may hang or cause deadlocks. 264 */ 265 @Override 266 public void serviceExceptionAsync(final IOException e) { 267 if (asyncException.compareAndSet(false, true)) { 268 new Thread("Async Exception Handler") { 269 @Override 270 public void run() { 271 serviceException(e); 272 } 273 }.start(); 274 } 275 } 276 277 /** 278 * Closes a clients connection due to a detected error. Errors are ignored 279 * if: the client is closing or broker is closing. Otherwise, the connection 280 * error transmitted to the client before stopping it's transport. 281 */ 282 @Override 283 public void serviceException(Throwable e) { 284 // are we a transport exception such as not being able to dispatch 285 // synchronously to a transport 286 if (e instanceof IOException) { 287 serviceTransportException((IOException) e); 288 } else if (e.getClass() == BrokerStoppedException.class) { 289 // Handle the case where the broker is stopped 290 // But the client is still connected. 291 if (!stopping.get()) { 292 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 293 ConnectionError ce = new ConnectionError(); 294 ce.setException(e); 295 dispatchSync(ce); 296 // Record the error that caused the transport to stop 297 transportException.set(e); 298 // Wait a little bit to try to get the output buffer to flush 299 // the exception notification to the client. 300 try { 301 Thread.sleep(500); 302 } catch (InterruptedException ie) { 303 Thread.currentThread().interrupt(); 304 } 305 // Worst case is we just kill the connection before the 306 // notification gets to him. 307 stopAsync(); 308 } 309 } else if (!stopping.get() && !inServiceException) { 310 inServiceException = true; 311 try { 312 if (SERVICELOG.isDebugEnabled()) { 313 SERVICELOG.debug("Async error occurred: " + e, e); 314 } else { 315 SERVICELOG.warn("Async error occurred: " + e); 316 } 317 ConnectionError ce = new ConnectionError(); 318 ce.setException(e); 319 if (pendingStop) { 320 dispatchSync(ce); 321 } else { 322 dispatchAsync(ce); 323 } 324 } finally { 325 inServiceException = false; 326 } 327 } 328 } 329 330 @Override 331 public Response service(Command command) { 332 MDC.put("activemq.connector", connector.getUri().toString()); 333 Response response = null; 334 boolean responseRequired = command.isResponseRequired(); 335 int commandId = command.getCommandId(); 336 try { 337 if (!pendingStop) { 338 response = command.visit(this); 339 } else { 340 response = new ExceptionResponse(transportException.get()); 341 } 342 } catch (Throwable e) { 343 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 344 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 345 + " command: " + command + ", exception: " + e, e); 346 } 347 348 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 349 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 350 responseRequired = false; 351 } 352 353 if (responseRequired) { 354 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 355 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 356 transport.getRemoteAddress(), e.getMessage()); 357 } 358 response = new ExceptionResponse(e); 359 } else { 360 serviceException(e); 361 } 362 } 363 if (responseRequired) { 364 if (response == null) { 365 response = new Response(); 366 } 367 response.setCorrelationId(commandId); 368 } 369 // The context may have been flagged so that the response is not 370 // sent. 371 if (context != null) { 372 if (context.isDontSendReponse()) { 373 context.setDontSendReponse(false); 374 response = null; 375 } 376 context = null; 377 } 378 MDC.remove("activemq.connector"); 379 return response; 380 } 381 382 @Override 383 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 384 return null; 385 } 386 387 @Override 388 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 389 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 390 return null; 391 } 392 393 @Override 394 public Response processWireFormat(WireFormatInfo info) throws Exception { 395 wireFormatInfo = info; 396 protocolVersion.set(info.getVersion()); 397 return null; 398 } 399 400 @Override 401 public Response processShutdown(ShutdownInfo info) throws Exception { 402 stopAsync(); 403 return null; 404 } 405 406 @Override 407 public Response processFlush(FlushCommand command) throws Exception { 408 return null; 409 } 410 411 @Override 412 public Response processBeginTransaction(TransactionInfo info) throws Exception { 413 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 414 context = null; 415 if (cs != null) { 416 context = cs.getContext(); 417 } 418 if (cs == null) { 419 throw new NullPointerException("Context is null"); 420 } 421 // Avoid replaying dup commands 422 if (cs.getTransactionState(info.getTransactionId()) == null) { 423 cs.addTransactionState(info.getTransactionId()); 424 broker.beginTransaction(context, info.getTransactionId()); 425 } 426 return null; 427 } 428 429 @Override 430 public int getActiveTransactionCount() { 431 int rc = 0; 432 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 433 Collection<TransactionState> transactions = cs.getTransactionStates(); 434 for (TransactionState transaction : transactions) { 435 rc++; 436 } 437 } 438 return rc; 439 } 440 441 @Override 442 public Long getOldestActiveTransactionDuration() { 443 TransactionState oldestTX = null; 444 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 445 Collection<TransactionState> transactions = cs.getTransactionStates(); 446 for (TransactionState transaction : transactions) { 447 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 448 oldestTX = transaction; 449 } 450 } 451 } 452 if( oldestTX == null ) { 453 return null; 454 } 455 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 456 } 457 458 @Override 459 public Response processEndTransaction(TransactionInfo info) throws Exception { 460 // No need to do anything. This packet is just sent by the client 461 // make sure he is synced with the server as commit command could 462 // come from a different connection. 463 return null; 464 } 465 466 @Override 467 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 468 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 469 context = null; 470 if (cs != null) { 471 context = cs.getContext(); 472 } 473 if (cs == null) { 474 throw new NullPointerException("Context is null"); 475 } 476 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 477 if (transactionState == null) { 478 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 479 + info.getTransactionId()); 480 } 481 // Avoid dups. 482 if (!transactionState.isPrepared()) { 483 transactionState.setPrepared(true); 484 int result = broker.prepareTransaction(context, info.getTransactionId()); 485 transactionState.setPreparedResult(result); 486 if (result == XAResource.XA_RDONLY) { 487 // we are done, no further rollback or commit from TM 488 cs.removeTransactionState(info.getTransactionId()); 489 } 490 IntegerResponse response = new IntegerResponse(result); 491 return response; 492 } else { 493 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 494 return response; 495 } 496 } 497 498 @Override 499 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 500 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 501 context = cs.getContext(); 502 cs.removeTransactionState(info.getTransactionId()); 503 broker.commitTransaction(context, info.getTransactionId(), true); 504 return null; 505 } 506 507 @Override 508 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 509 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 510 context = cs.getContext(); 511 cs.removeTransactionState(info.getTransactionId()); 512 broker.commitTransaction(context, info.getTransactionId(), false); 513 return null; 514 } 515 516 @Override 517 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 518 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 519 context = cs.getContext(); 520 cs.removeTransactionState(info.getTransactionId()); 521 broker.rollbackTransaction(context, info.getTransactionId()); 522 return null; 523 } 524 525 @Override 526 public Response processForgetTransaction(TransactionInfo info) throws Exception { 527 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 528 context = cs.getContext(); 529 broker.forgetTransaction(context, info.getTransactionId()); 530 return null; 531 } 532 533 @Override 534 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 535 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 536 context = cs.getContext(); 537 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 538 return new DataArrayResponse(preparedTransactions); 539 } 540 541 @Override 542 public Response processMessage(Message messageSend) throws Exception { 543 ProducerId producerId = messageSend.getProducerId(); 544 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 545 if (producerExchange.canDispatch(messageSend)) { 546 broker.send(producerExchange, messageSend); 547 } 548 return null; 549 } 550 551 @Override 552 public Response processMessageAck(MessageAck ack) throws Exception { 553 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 554 if (consumerExchange != null) { 555 broker.acknowledge(consumerExchange, ack); 556 } else if (ack.isInTransaction()) { 557 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 558 } 559 return null; 560 } 561 562 @Override 563 public Response processMessagePull(MessagePull pull) throws Exception { 564 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 565 } 566 567 @Override 568 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 569 broker.processDispatchNotification(notification); 570 return null; 571 } 572 573 @Override 574 public Response processAddDestination(DestinationInfo info) throws Exception { 575 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 576 broker.addDestinationInfo(cs.getContext(), info); 577 if (info.getDestination().isTemporary()) { 578 cs.addTempDestination(info); 579 } 580 return null; 581 } 582 583 @Override 584 public Response processRemoveDestination(DestinationInfo info) throws Exception { 585 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 586 broker.removeDestinationInfo(cs.getContext(), info); 587 if (info.getDestination().isTemporary()) { 588 cs.removeTempDestination(info.getDestination()); 589 } 590 return null; 591 } 592 593 @Override 594 public Response processAddProducer(ProducerInfo info) throws Exception { 595 SessionId sessionId = info.getProducerId().getParentId(); 596 ConnectionId connectionId = sessionId.getParentId(); 597 TransportConnectionState cs = lookupConnectionState(connectionId); 598 if (cs == null) { 599 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 600 + connectionId); 601 } 602 SessionState ss = cs.getSessionState(sessionId); 603 if (ss == null) { 604 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 605 + sessionId); 606 } 607 // Avoid replaying dup commands 608 if (!ss.getProducerIds().contains(info.getProducerId())) { 609 ActiveMQDestination destination = info.getDestination(); 610 // Do not check for null here as it would cause the count of max producers to exclude 611 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 612 // call it from here with a null Destination value. 613 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 614 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 615 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 616 } 617 } 618 broker.addProducer(cs.getContext(), info); 619 try { 620 ss.addProducer(info); 621 } catch (IllegalStateException e) { 622 broker.removeProducer(cs.getContext(), info); 623 } 624 625 } 626 return null; 627 } 628 629 @Override 630 public Response processRemoveProducer(ProducerId id) throws Exception { 631 SessionId sessionId = id.getParentId(); 632 ConnectionId connectionId = sessionId.getParentId(); 633 TransportConnectionState cs = lookupConnectionState(connectionId); 634 SessionState ss = cs.getSessionState(sessionId); 635 if (ss == null) { 636 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 637 + sessionId); 638 } 639 ProducerState ps = ss.removeProducer(id); 640 if (ps == null) { 641 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 642 } 643 removeProducerBrokerExchange(id); 644 broker.removeProducer(cs.getContext(), ps.getInfo()); 645 return null; 646 } 647 648 @Override 649 public Response processAddConsumer(ConsumerInfo info) throws Exception { 650 SessionId sessionId = info.getConsumerId().getParentId(); 651 ConnectionId connectionId = sessionId.getParentId(); 652 TransportConnectionState cs = lookupConnectionState(connectionId); 653 if (cs == null) { 654 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 655 + connectionId); 656 } 657 SessionState ss = cs.getSessionState(sessionId); 658 if (ss == null) { 659 throw new IllegalStateException(broker.getBrokerName() 660 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 661 } 662 // Avoid replaying dup commands 663 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 664 ActiveMQDestination destination = info.getDestination(); 665 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 666 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 667 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 668 } 669 } 670 671 broker.addConsumer(cs.getContext(), info); 672 try { 673 ss.addConsumer(info); 674 addConsumerBrokerExchange(info.getConsumerId()); 675 } catch (IllegalStateException e) { 676 broker.removeConsumer(cs.getContext(), info); 677 } 678 679 } 680 return null; 681 } 682 683 @Override 684 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 685 SessionId sessionId = id.getParentId(); 686 ConnectionId connectionId = sessionId.getParentId(); 687 TransportConnectionState cs = lookupConnectionState(connectionId); 688 if (cs == null) { 689 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 690 + connectionId); 691 } 692 SessionState ss = cs.getSessionState(sessionId); 693 if (ss == null) { 694 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 695 + sessionId); 696 } 697 ConsumerState consumerState = ss.removeConsumer(id); 698 if (consumerState == null) { 699 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 700 } 701 ConsumerInfo info = consumerState.getInfo(); 702 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 703 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 704 removeConsumerBrokerExchange(id); 705 return null; 706 } 707 708 @Override 709 public Response processAddSession(SessionInfo info) throws Exception { 710 ConnectionId connectionId = info.getSessionId().getParentId(); 711 TransportConnectionState cs = lookupConnectionState(connectionId); 712 // Avoid replaying dup commands 713 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 714 broker.addSession(cs.getContext(), info); 715 try { 716 cs.addSession(info); 717 } catch (IllegalStateException e) { 718 LOG.warn("Failed to add session: {}", info.getSessionId(), e); 719 broker.removeSession(cs.getContext(), info); 720 } 721 } 722 return null; 723 } 724 725 @Override 726 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 727 ConnectionId connectionId = id.getParentId(); 728 TransportConnectionState cs = lookupConnectionState(connectionId); 729 if (cs == null) { 730 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 731 } 732 SessionState session = cs.getSessionState(id); 733 if (session == null) { 734 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 735 } 736 // Don't let new consumers or producers get added while we are closing 737 // this down. 738 session.shutdown(); 739 // Cascade the connection stop to the consumers and producers. 740 for (ConsumerId consumerId : session.getConsumerIds()) { 741 try { 742 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 743 } catch (Throwable e) { 744 LOG.warn("Failed to remove consumer: {}", consumerId, e); 745 } 746 } 747 for (ProducerId producerId : session.getProducerIds()) { 748 try { 749 processRemoveProducer(producerId); 750 } catch (Throwable e) { 751 LOG.warn("Failed to remove producer: {}", producerId, e); 752 } 753 } 754 cs.removeSession(id); 755 broker.removeSession(cs.getContext(), session.getInfo()); 756 return null; 757 } 758 759 @Override 760 public Response processAddConnection(ConnectionInfo info) throws Exception { 761 // Older clients should have been defaulting this field to true.. but 762 // they were not. 763 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 764 info.setClientMaster(true); 765 } 766 TransportConnectionState state; 767 // Make sure 2 concurrent connections by the same ID only generate 1 768 // TransportConnectionState object. 769 synchronized (brokerConnectionStates) { 770 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 771 if (state == null) { 772 state = new TransportConnectionState(info, this); 773 brokerConnectionStates.put(info.getConnectionId(), state); 774 } 775 state.incrementReference(); 776 } 777 // If there are 2 concurrent connections for the same connection id, 778 // then last one in wins, we need to sync here 779 // to figure out the winner. 780 synchronized (state.getConnectionMutex()) { 781 if (state.getConnection() != this) { 782 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 783 state.getConnection().stop(); 784 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 785 state.setConnection(this); 786 state.reset(info); 787 } 788 } 789 registerConnectionState(info.getConnectionId(), state); 790 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 791 this.faultTolerantConnection = info.isFaultTolerant(); 792 // Setup the context. 793 String clientId = info.getClientId(); 794 context = new ConnectionContext(); 795 context.setBroker(broker); 796 context.setClientId(clientId); 797 context.setClientMaster(info.isClientMaster()); 798 context.setConnection(this); 799 context.setConnectionId(info.getConnectionId()); 800 context.setConnector(connector); 801 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 802 context.setNetworkConnection(networkConnection); 803 context.setFaultTolerant(faultTolerantConnection); 804 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 805 context.setUserName(info.getUserName()); 806 context.setWireFormatInfo(wireFormatInfo); 807 context.setReconnect(info.isFailoverReconnect()); 808 this.manageable = info.isManageable(); 809 context.setConnectionState(state); 810 state.setContext(context); 811 state.setConnection(this); 812 if (info.getClientIp() == null) { 813 info.setClientIp(getRemoteAddress()); 814 } 815 816 try { 817 broker.addConnection(context, info); 818 } catch (Exception e) { 819 synchronized (brokerConnectionStates) { 820 brokerConnectionStates.remove(info.getConnectionId()); 821 } 822 unregisterConnectionState(info.getConnectionId()); 823 LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e); 824 if (e instanceof SecurityException) { 825 // close this down - in case the peer of this transport doesn't play nice 826 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 827 } 828 throw e; 829 } 830 if (info.isManageable()) { 831 // send ConnectionCommand 832 ConnectionControl command = this.connector.getConnectionControl(); 833 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 834 if (info.isFailoverReconnect()) { 835 command.setRebalanceConnection(false); 836 } 837 dispatchAsync(command); 838 } 839 return null; 840 } 841 842 @Override 843 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 844 throws InterruptedException { 845 LOG.debug("remove connection id: {}", id); 846 TransportConnectionState cs = lookupConnectionState(id); 847 if (cs != null) { 848 // Don't allow things to be added to the connection state while we 849 // are shutting down. 850 cs.shutdown(); 851 // Cascade the connection stop to the sessions. 852 for (SessionId sessionId : cs.getSessionIds()) { 853 try { 854 processRemoveSession(sessionId, lastDeliveredSequenceId); 855 } catch (Throwable e) { 856 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 857 } 858 } 859 // Cascade the connection stop to temp destinations. 860 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 861 DestinationInfo di = iter.next(); 862 try { 863 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 864 } catch (Throwable e) { 865 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 866 } 867 iter.remove(); 868 } 869 try { 870 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 871 } catch (Throwable e) { 872 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 873 } 874 TransportConnectionState state = unregisterConnectionState(id); 875 if (state != null) { 876 synchronized (brokerConnectionStates) { 877 // If we are the last reference, we should remove the state 878 // from the broker. 879 if (state.decrementReference() == 0) { 880 brokerConnectionStates.remove(id); 881 } 882 } 883 } 884 } 885 return null; 886 } 887 888 @Override 889 public Response processProducerAck(ProducerAck ack) throws Exception { 890 // A broker should not get ProducerAck messages. 891 return null; 892 } 893 894 @Override 895 public Connector getConnector() { 896 return connector; 897 } 898 899 @Override 900 public void dispatchSync(Command message) { 901 try { 902 processDispatch(message); 903 } catch (IOException e) { 904 serviceExceptionAsync(e); 905 } 906 } 907 908 @Override 909 public void dispatchAsync(Command message) { 910 if (!stopping.get()) { 911 if (taskRunner == null) { 912 dispatchSync(message); 913 } else { 914 synchronized (dispatchQueue) { 915 dispatchQueue.add(message); 916 } 917 try { 918 taskRunner.wakeup(); 919 } catch (InterruptedException e) { 920 Thread.currentThread().interrupt(); 921 } 922 } 923 } else { 924 if (message.isMessageDispatch()) { 925 MessageDispatch md = (MessageDispatch) message; 926 TransmitCallback sub = md.getTransmitCallback(); 927 broker.postProcessDispatch(md); 928 if (sub != null) { 929 sub.onFailure(); 930 } 931 } 932 } 933 } 934 935 protected void processDispatch(Command command) throws IOException { 936 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 937 try { 938 if (!stopping.get()) { 939 if (messageDispatch != null) { 940 try { 941 broker.preProcessDispatch(messageDispatch); 942 } catch (RuntimeException convertToIO) { 943 throw new IOException(convertToIO); 944 } 945 } 946 dispatch(command); 947 } 948 } catch (IOException e) { 949 if (messageDispatch != null) { 950 TransmitCallback sub = messageDispatch.getTransmitCallback(); 951 broker.postProcessDispatch(messageDispatch); 952 if (sub != null) { 953 sub.onFailure(); 954 } 955 messageDispatch = null; 956 throw e; 957 } 958 } finally { 959 if (messageDispatch != null) { 960 TransmitCallback sub = messageDispatch.getTransmitCallback(); 961 broker.postProcessDispatch(messageDispatch); 962 if (sub != null) { 963 sub.onSuccess(); 964 } 965 } 966 } 967 } 968 969 @Override 970 public boolean iterate() { 971 try { 972 if (pendingStop || stopping.get()) { 973 if (dispatchStopped.compareAndSet(false, true)) { 974 if (transportException.get() == null) { 975 try { 976 dispatch(new ShutdownInfo()); 977 } catch (Throwable ignore) { 978 } 979 } 980 dispatchStoppedLatch.countDown(); 981 } 982 return false; 983 } 984 if (!dispatchStopped.get()) { 985 Command command = null; 986 synchronized (dispatchQueue) { 987 if (dispatchQueue.isEmpty()) { 988 return false; 989 } 990 command = dispatchQueue.remove(0); 991 } 992 processDispatch(command); 993 return true; 994 } 995 return false; 996 } catch (IOException e) { 997 if (dispatchStopped.compareAndSet(false, true)) { 998 dispatchStoppedLatch.countDown(); 999 } 1000 serviceExceptionAsync(e); 1001 return false; 1002 } 1003 } 1004 1005 /** 1006 * Returns the statistics for this connection 1007 */ 1008 @Override 1009 public ConnectionStatistics getStatistics() { 1010 return statistics; 1011 } 1012 1013 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1014 return messageAuthorizationPolicy; 1015 } 1016 1017 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1018 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1019 } 1020 1021 @Override 1022 public boolean isManageable() { 1023 return manageable; 1024 } 1025 1026 @Override 1027 public void start() throws Exception { 1028 try { 1029 synchronized (this) { 1030 starting = true; 1031 if (taskRunnerFactory != null) { 1032 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1033 + getRemoteAddress()); 1034 } else { 1035 taskRunner = null; 1036 } 1037 transport.start(); 1038 active = true; 1039 BrokerInfo info = connector.getBrokerInfo().copy(); 1040 if (connector.isUpdateClusterClients()) { 1041 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1042 } else { 1043 info.setPeerBrokerInfos(null); 1044 } 1045 dispatchAsync(info); 1046 1047 connector.onStarted(this); 1048 } 1049 } catch (Exception e) { 1050 // Force clean up on an error starting up. 1051 pendingStop = true; 1052 throw e; 1053 } finally { 1054 // stop() can be called from within the above block, 1055 // but we want to be sure start() completes before 1056 // stop() runs, so queue the stop until right now: 1057 setStarting(false); 1058 if (isPendingStop()) { 1059 LOG.debug("Calling the delayed stop() after start() {}", this); 1060 stop(); 1061 } 1062 } 1063 } 1064 1065 @Override 1066 public void stop() throws Exception { 1067 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1068 // as their lifecycle is handled elsewhere 1069 1070 stopAsync(); 1071 while (!stopped.await(5, TimeUnit.SECONDS)) { 1072 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1073 } 1074 } 1075 1076 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1077 if (waitTime > 0) { 1078 synchronized (this) { 1079 pendingStop = true; 1080 transportException.set(cause); 1081 } 1082 try { 1083 stopTaskRunnerFactory.execute(new Runnable() { 1084 @Override 1085 public void run() { 1086 try { 1087 Thread.sleep(waitTime); 1088 stopAsync(); 1089 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1090 } catch (InterruptedException e) { 1091 } 1092 } 1093 }); 1094 } catch (Throwable t) { 1095 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1096 } 1097 } 1098 } 1099 1100 public void stopAsync(Throwable cause) { 1101 transportException.set(cause); 1102 stopAsync(); 1103 } 1104 1105 public void stopAsync() { 1106 // If we're in the middle of starting then go no further... for now. 1107 synchronized (this) { 1108 pendingStop = true; 1109 if (starting) { 1110 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1111 return; 1112 } 1113 } 1114 if (stopping.compareAndSet(false, true)) { 1115 // Let all the connection contexts know we are shutting down 1116 // so that in progress operations can notice and unblock. 1117 List<TransportConnectionState> connectionStates = listConnectionStates(); 1118 for (TransportConnectionState cs : connectionStates) { 1119 ConnectionContext connectionContext = cs.getContext(); 1120 if (connectionContext != null) { 1121 connectionContext.getStopping().set(true); 1122 } 1123 } 1124 try { 1125 stopTaskRunnerFactory.execute(new Runnable() { 1126 @Override 1127 public void run() { 1128 serviceLock.writeLock().lock(); 1129 try { 1130 doStop(); 1131 } catch (Throwable e) { 1132 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1133 } finally { 1134 stopped.countDown(); 1135 serviceLock.writeLock().unlock(); 1136 } 1137 } 1138 }); 1139 } catch (Throwable t) { 1140 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1141 stopped.countDown(); 1142 } 1143 } 1144 } 1145 1146 @Override 1147 public String toString() { 1148 return "Transport Connection to: " + transport.getRemoteAddress(); 1149 } 1150 1151 protected void doStop() throws Exception { 1152 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1153 connector.onStopped(this); 1154 try { 1155 synchronized (this) { 1156 if (duplexBridge != null) { 1157 duplexBridge.stop(); 1158 } 1159 } 1160 } catch (Exception ignore) { 1161 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1162 } 1163 try { 1164 transport.stop(); 1165 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1166 } catch (Exception e) { 1167 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1168 } 1169 if (taskRunner != null) { 1170 taskRunner.shutdown(1); 1171 taskRunner = null; 1172 } 1173 active = false; 1174 // Run the MessageDispatch callbacks so that message references get 1175 // cleaned up. 1176 synchronized (dispatchQueue) { 1177 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1178 Command command = iter.next(); 1179 if (command.isMessageDispatch()) { 1180 MessageDispatch md = (MessageDispatch) command; 1181 TransmitCallback sub = md.getTransmitCallback(); 1182 broker.postProcessDispatch(md); 1183 if (sub != null) { 1184 sub.onFailure(); 1185 } 1186 } 1187 } 1188 dispatchQueue.clear(); 1189 } 1190 // 1191 // Remove all logical connection associated with this connection 1192 // from the broker. 1193 if (!broker.isStopped()) { 1194 List<TransportConnectionState> connectionStates = listConnectionStates(); 1195 connectionStates = listConnectionStates(); 1196 for (TransportConnectionState cs : connectionStates) { 1197 cs.getContext().getStopping().set(true); 1198 try { 1199 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1200 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1201 } catch (Throwable ignore) { 1202 LOG.debug("Exception caught removing connection {}. This exception is ignored.", cs.getInfo().getConnectionId(), ignore); 1203 } 1204 } 1205 } 1206 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1207 } 1208 1209 /** 1210 * @return Returns the blockedCandidate. 1211 */ 1212 public boolean isBlockedCandidate() { 1213 return blockedCandidate; 1214 } 1215 1216 /** 1217 * @param blockedCandidate The blockedCandidate to set. 1218 */ 1219 public void setBlockedCandidate(boolean blockedCandidate) { 1220 this.blockedCandidate = blockedCandidate; 1221 } 1222 1223 /** 1224 * @return Returns the markedCandidate. 1225 */ 1226 public boolean isMarkedCandidate() { 1227 return markedCandidate; 1228 } 1229 1230 /** 1231 * @param markedCandidate The markedCandidate to set. 1232 */ 1233 public void setMarkedCandidate(boolean markedCandidate) { 1234 this.markedCandidate = markedCandidate; 1235 if (!markedCandidate) { 1236 timeStamp = 0; 1237 blockedCandidate = false; 1238 } 1239 } 1240 1241 /** 1242 * @param slow The slow to set. 1243 */ 1244 public void setSlow(boolean slow) { 1245 this.slow = slow; 1246 } 1247 1248 /** 1249 * @return true if the Connection is slow 1250 */ 1251 @Override 1252 public boolean isSlow() { 1253 return slow; 1254 } 1255 1256 /** 1257 * @return true if the Connection is potentially blocked 1258 */ 1259 public boolean isMarkedBlockedCandidate() { 1260 return markedCandidate; 1261 } 1262 1263 /** 1264 * Mark the Connection, so we can deem if it's collectable on the next sweep 1265 */ 1266 public void doMark() { 1267 if (timeStamp == 0) { 1268 timeStamp = System.currentTimeMillis(); 1269 } 1270 } 1271 1272 /** 1273 * @return if after being marked, the Connection is still writing 1274 */ 1275 @Override 1276 public boolean isBlocked() { 1277 return blocked; 1278 } 1279 1280 /** 1281 * @return true if the Connection is connected 1282 */ 1283 @Override 1284 public boolean isConnected() { 1285 return connected; 1286 } 1287 1288 /** 1289 * @param blocked The blocked to set. 1290 */ 1291 public void setBlocked(boolean blocked) { 1292 this.blocked = blocked; 1293 } 1294 1295 /** 1296 * @param connected The connected to set. 1297 */ 1298 public void setConnected(boolean connected) { 1299 this.connected = connected; 1300 } 1301 1302 /** 1303 * @return true if the Connection is active 1304 */ 1305 @Override 1306 public boolean isActive() { 1307 return active; 1308 } 1309 1310 /** 1311 * @param active The active to set. 1312 */ 1313 public void setActive(boolean active) { 1314 this.active = active; 1315 } 1316 1317 /** 1318 * @return true if the Connection is starting 1319 */ 1320 public synchronized boolean isStarting() { 1321 return starting; 1322 } 1323 1324 @Override 1325 public synchronized boolean isNetworkConnection() { 1326 return networkConnection; 1327 } 1328 1329 @Override 1330 public boolean isFaultTolerantConnection() { 1331 return this.faultTolerantConnection; 1332 } 1333 1334 protected synchronized void setStarting(boolean starting) { 1335 this.starting = starting; 1336 } 1337 1338 /** 1339 * @return true if the Connection needs to stop 1340 */ 1341 public synchronized boolean isPendingStop() { 1342 return pendingStop; 1343 } 1344 1345 protected synchronized void setPendingStop(boolean pendingStop) { 1346 this.pendingStop = pendingStop; 1347 } 1348 1349 @Override 1350 public Response processBrokerInfo(BrokerInfo info) { 1351 if (info.isSlaveBroker()) { 1352 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1353 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1354 // so this TransportConnection is the rear end of a network bridge 1355 // We have been requested to create a two way pipe ... 1356 try { 1357 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1358 Map<String, String> props = createMap(properties); 1359 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1360 IntrospectionSupport.setProperties(config, props, ""); 1361 config.setBrokerName(broker.getBrokerName()); 1362 1363 // check for existing duplex connection hanging about 1364 1365 // We first look if existing network connection already exists for the same broker Id and network connector name 1366 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1367 // and the duplex network connector side wanting to open a new one 1368 // In this case, the old connection must be broken 1369 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1370 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1371 synchronized (connections) { 1372 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1373 TransportConnection c = iter.next(); 1374 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1375 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1376 c.stopAsync(); 1377 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1378 c.getStopped().await(1, TimeUnit.SECONDS); 1379 } 1380 } 1381 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1382 } 1383 Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); 1384 Transport remoteBridgeTransport = transport; 1385 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1386 // the vm transport case is already wrapped 1387 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1388 } 1389 String duplexName = localTransport.toString(); 1390 if (duplexName.contains("#")) { 1391 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1392 } 1393 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName)); 1394 listener.setCreatedByDuplex(true); 1395 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1396 duplexBridge.setBrokerService(broker.getBrokerService()); 1397 // now turn duplex off this side 1398 info.setDuplexConnection(false); 1399 duplexBridge.setCreatedByDuplex(true); 1400 duplexBridge.duplexStart(this, brokerInfo, info); 1401 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1402 return null; 1403 } catch (TransportDisposedIOException e) { 1404 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1405 return null; 1406 } catch (Exception e) { 1407 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1408 return null; 1409 } 1410 } 1411 // We only expect to get one broker info command per connection 1412 if (this.brokerInfo != null) { 1413 LOG.warn("Unexpected extra broker info command received: {}", info); 1414 } 1415 this.brokerInfo = info; 1416 networkConnection = true; 1417 List<TransportConnectionState> connectionStates = listConnectionStates(); 1418 for (TransportConnectionState cs : connectionStates) { 1419 cs.getContext().setNetworkConnection(true); 1420 } 1421 return null; 1422 } 1423 1424 @SuppressWarnings({"unchecked", "rawtypes"}) 1425 private HashMap<String, String> createMap(Properties properties) { 1426 return new HashMap(properties); 1427 } 1428 1429 protected void dispatch(Command command) throws IOException { 1430 try { 1431 setMarkedCandidate(true); 1432 transport.oneway(command); 1433 } finally { 1434 setMarkedCandidate(false); 1435 } 1436 } 1437 1438 @Override 1439 public String getRemoteAddress() { 1440 return transport.getRemoteAddress(); 1441 } 1442 1443 public Transport getTransport() { 1444 return transport; 1445 } 1446 1447 @Override 1448 public String getConnectionId() { 1449 List<TransportConnectionState> connectionStates = listConnectionStates(); 1450 for (TransportConnectionState cs : connectionStates) { 1451 if (cs.getInfo().getClientId() != null) { 1452 return cs.getInfo().getClientId(); 1453 } 1454 return cs.getInfo().getConnectionId().toString(); 1455 } 1456 return null; 1457 } 1458 1459 @Override 1460 public void updateClient(ConnectionControl control) { 1461 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1462 && this.wireFormatInfo.getVersion() >= 6) { 1463 dispatchAsync(control); 1464 } 1465 } 1466 1467 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1468 ProducerBrokerExchange result = null; 1469 if (producerInfo != null && producerInfo.getProducerId() != null){ 1470 synchronized (producerExchanges){ 1471 result = producerExchanges.get(producerInfo.getProducerId()); 1472 } 1473 } 1474 return result; 1475 } 1476 1477 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1478 ProducerBrokerExchange result = producerExchanges.get(id); 1479 if (result == null) { 1480 synchronized (producerExchanges) { 1481 result = new ProducerBrokerExchange(); 1482 TransportConnectionState state = lookupConnectionState(id); 1483 context = state.getContext(); 1484 result.setConnectionContext(context); 1485 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1486 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); 1487 } 1488 SessionState ss = state.getSessionState(id.getParentId()); 1489 if (ss != null) { 1490 result.setProducerState(ss.getProducerState(id)); 1491 ProducerState producerState = ss.getProducerState(id); 1492 if (producerState != null && producerState.getInfo() != null) { 1493 ProducerInfo info = producerState.getInfo(); 1494 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1495 } 1496 } 1497 producerExchanges.put(id, result); 1498 } 1499 } else { 1500 context = result.getConnectionContext(); 1501 } 1502 return result; 1503 } 1504 1505 private void removeProducerBrokerExchange(ProducerId id) { 1506 synchronized (producerExchanges) { 1507 producerExchanges.remove(id); 1508 } 1509 } 1510 1511 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1512 ConsumerBrokerExchange result = consumerExchanges.get(id); 1513 return result; 1514 } 1515 1516 private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) { 1517 ConsumerBrokerExchange result = consumerExchanges.get(id); 1518 if (result == null) { 1519 synchronized (consumerExchanges) { 1520 result = new ConsumerBrokerExchange(); 1521 TransportConnectionState state = lookupConnectionState(id); 1522 context = state.getContext(); 1523 result.setConnectionContext(context); 1524 SessionState ss = state.getSessionState(id.getParentId()); 1525 if (ss != null) { 1526 ConsumerState cs = ss.getConsumerState(id); 1527 if (cs != null) { 1528 ConsumerInfo info = cs.getInfo(); 1529 if (info != null) { 1530 if (info.getDestination() != null && info.getDestination().isPattern()) { 1531 result.setWildcard(true); 1532 } 1533 } 1534 } 1535 } 1536 consumerExchanges.put(id, result); 1537 } 1538 } 1539 return result; 1540 } 1541 1542 private void removeConsumerBrokerExchange(ConsumerId id) { 1543 synchronized (consumerExchanges) { 1544 consumerExchanges.remove(id); 1545 } 1546 } 1547 1548 public int getProtocolVersion() { 1549 return protocolVersion.get(); 1550 } 1551 1552 @Override 1553 public Response processControlCommand(ControlCommand command) throws Exception { 1554 return null; 1555 } 1556 1557 @Override 1558 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1559 return null; 1560 } 1561 1562 @Override 1563 public Response processConnectionControl(ConnectionControl control) throws Exception { 1564 if (control != null) { 1565 faultTolerantConnection = control.isFaultTolerant(); 1566 } 1567 return null; 1568 } 1569 1570 @Override 1571 public Response processConnectionError(ConnectionError error) throws Exception { 1572 return null; 1573 } 1574 1575 @Override 1576 public Response processConsumerControl(ConsumerControl control) throws Exception { 1577 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1578 broker.processConsumerControl(consumerExchange, control); 1579 return null; 1580 } 1581 1582 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1583 TransportConnectionState state) { 1584 TransportConnectionState cs = null; 1585 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1586 // swap implementations 1587 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1588 newRegister.intialize(connectionStateRegister); 1589 connectionStateRegister = newRegister; 1590 } 1591 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1592 return cs; 1593 } 1594 1595 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1596 return connectionStateRegister.unregisterConnectionState(connectionId); 1597 } 1598 1599 protected synchronized List<TransportConnectionState> listConnectionStates() { 1600 return connectionStateRegister.listConnectionStates(); 1601 } 1602 1603 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1604 return connectionStateRegister.lookupConnectionState(connectionId); 1605 } 1606 1607 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1608 return connectionStateRegister.lookupConnectionState(id); 1609 } 1610 1611 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1612 return connectionStateRegister.lookupConnectionState(id); 1613 } 1614 1615 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1616 return connectionStateRegister.lookupConnectionState(id); 1617 } 1618 1619 // public only for testing 1620 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1621 return connectionStateRegister.lookupConnectionState(connectionId); 1622 } 1623 1624 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1625 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1626 } 1627 1628 protected synchronized String getDuplexNetworkConnectorId() { 1629 return this.duplexNetworkConnectorId; 1630 } 1631 1632 public boolean isStopping() { 1633 return stopping.get(); 1634 } 1635 1636 protected CountDownLatch getStopped() { 1637 return stopped; 1638 } 1639 1640 private int getProducerCount(ConnectionId connectionId) { 1641 int result = 0; 1642 TransportConnectionState cs = lookupConnectionState(connectionId); 1643 if (cs != null) { 1644 for (SessionId sessionId : cs.getSessionIds()) { 1645 SessionState sessionState = cs.getSessionState(sessionId); 1646 if (sessionState != null) { 1647 result += sessionState.getProducerIds().size(); 1648 } 1649 } 1650 } 1651 return result; 1652 } 1653 1654 private int getConsumerCount(ConnectionId connectionId) { 1655 int result = 0; 1656 TransportConnectionState cs = lookupConnectionState(connectionId); 1657 if (cs != null) { 1658 for (SessionId sessionId : cs.getSessionIds()) { 1659 SessionState sessionState = cs.getSessionState(sessionId); 1660 if (sessionState != null) { 1661 result += sessionState.getConsumerIds().size(); 1662 } 1663 } 1664 } 1665 return result; 1666 } 1667 1668 public WireFormatInfo getRemoteWireFormatInfo() { 1669 return wireFormatInfo; 1670 } 1671}