001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.StringTokenizer; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.activemq.broker.SslContext; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionControl; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConsumerControl; 045import org.apache.activemq.command.MessageDispatch; 046import org.apache.activemq.command.MessagePull; 047import org.apache.activemq.command.RemoveInfo; 048import org.apache.activemq.command.Response; 049import org.apache.activemq.state.ConnectionStateTracker; 050import org.apache.activemq.state.Tracked; 051import org.apache.activemq.thread.Task; 052import org.apache.activemq.thread.TaskRunner; 053import org.apache.activemq.thread.TaskRunnerFactory; 054import org.apache.activemq.transport.CompositeTransport; 055import org.apache.activemq.transport.DefaultTransportListener; 056import org.apache.activemq.transport.FutureResponse; 057import org.apache.activemq.transport.ResponseCallback; 058import org.apache.activemq.transport.Transport; 059import org.apache.activemq.transport.TransportFactory; 060import org.apache.activemq.transport.TransportListener; 061import org.apache.activemq.util.IOExceptionSupport; 062import org.apache.activemq.util.ServiceSupport; 063import org.apache.activemq.util.URISupport; 064import org.slf4j.Logger; 065import org.slf4j.LoggerFactory; 066 067/** 068 * A Transport that is made reliable by being able to fail over to another 069 * transport when a transport failure is detected. 070 */ 071public class FailoverTransport implements CompositeTransport { 072 073 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 074 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 075 private static final int INFINITE = -1; 076 private TransportListener transportListener; 077 private boolean disposed; 078 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 079 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 080 081 private final Object reconnectMutex = new Object(); 082 private final Object backupMutex = new Object(); 083 private final Object sleepMutex = new Object(); 084 private final Object listenerMutex = new Object(); 085 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 086 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 087 088 private URI connectedTransportURI; 089 private URI failedConnectTransportURI; 090 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 091 private final TaskRunnerFactory reconnectTaskFactory; 092 private final TaskRunner reconnectTask; 093 private boolean started; 094 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 095 private long maxReconnectDelay = 1000 * 30; 096 private double backOffMultiplier = 2d; 097 private long timeout = INFINITE; 098 private boolean useExponentialBackOff = true; 099 private boolean randomize = true; 100 private int maxReconnectAttempts = INFINITE; 101 private int startupMaxReconnectAttempts = INFINITE; 102 private int connectFailures; 103 private int warnAfterReconnectAttempts = 10; 104 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 105 private Exception connectionFailure; 106 private boolean firstConnection = true; 107 // optionally always have a backup created 108 private boolean backup = false; 109 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 110 private int backupPoolSize = 1; 111 private boolean trackMessages = false; 112 private boolean trackTransactionProducers = true; 113 private int maxCacheSize = 128 * 1024; 114 private final TransportListener disposedListener = new DefaultTransportListener() {}; 115 private boolean updateURIsSupported = true; 116 private boolean reconnectSupported = true; 117 // remember for reconnect thread 118 private SslContext brokerSslContext; 119 private String updateURIsURL = null; 120 private boolean rebalanceUpdateURIs = true; 121 private boolean doRebalance = false; 122 private boolean connectedToPriority = false; 123 124 private boolean priorityBackup = false; 125 private final ArrayList<URI> priorityList = new ArrayList<URI>(); 126 private boolean priorityBackupAvailable = false; 127 private String nestedExtraQueryOptions; 128 private boolean shuttingDown = false; 129 130 public FailoverTransport() { 131 brokerSslContext = SslContext.getCurrentSslContext(); 132 stateTracker.setTrackTransactions(true); 133 // Setup a task that is used to reconnect the a connection async. 134 reconnectTaskFactory = new TaskRunnerFactory(); 135 reconnectTaskFactory.init(); 136 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 137 @Override 138 public boolean iterate() { 139 boolean result = false; 140 if (!started) { 141 return result; 142 } 143 boolean buildBackup = true; 144 synchronized (backupMutex) { 145 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 146 result = doReconnect(); 147 buildBackup = false; 148 } 149 } 150 if (buildBackup) { 151 buildBackups(); 152 if (priorityBackup && !connectedToPriority) { 153 try { 154 doDelay(); 155 if (reconnectTask == null) { 156 return true; 157 } 158 reconnectTask.wakeup(); 159 } catch (InterruptedException e) { 160 LOG.debug("Reconnect task has been interrupted.", e); 161 } 162 } 163 } else { 164 // build backups on the next iteration 165 buildBackup = true; 166 try { 167 if (reconnectTask == null) { 168 return true; 169 } 170 reconnectTask.wakeup(); 171 } catch (InterruptedException e) { 172 LOG.debug("Reconnect task has been interrupted.", e); 173 } 174 } 175 return result; 176 } 177 178 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 179 } 180 181 private void processCommand(Object incoming) { 182 Command command = (Command) incoming; 183 if (command == null) { 184 return; 185 } 186 if (command.isResponse()) { 187 Object object = null; 188 synchronized (requestMap) { 189 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 190 } 191 if (object != null && object.getClass() == Tracked.class) { 192 ((Tracked) object).onResponses(command); 193 } 194 } 195 196 if (command.isConnectionControl()) { 197 handleConnectionControl((ConnectionControl) command); 198 } else if (command.isConsumerControl()) { 199 ConsumerControl consumerControl = (ConsumerControl)command; 200 if (consumerControl.isClose()) { 201 stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 202 } 203 } 204 205 if (transportListener != null) { 206 transportListener.onCommand(command); 207 } 208 } 209 210 private TransportListener createTransportListener(final Transport owner) { 211 return new TransportListener() { 212 213 @Override 214 public void onCommand(Object o) { 215 processCommand(o); 216 } 217 218 @Override 219 public void onException(IOException error) { 220 try { 221 handleTransportFailure(owner, error); 222 } catch (InterruptedException e) { 223 Thread.currentThread().interrupt(); 224 if (transportListener != null) { 225 transportListener.onException(new InterruptedIOException()); 226 } 227 } 228 } 229 230 @Override 231 public void transportInterupted() { 232 } 233 234 @Override 235 public void transportResumed() { 236 } 237 }; 238 } 239 240 public final void disposeTransport(Transport transport) { 241 transport.setTransportListener(disposedListener); 242 ServiceSupport.dispose(transport); 243 } 244 245 public final void handleTransportFailure(IOException e) throws InterruptedException { 246 handleTransportFailure(getConnectedTransport(), e); 247 } 248 249 public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException { 250 if (shuttingDown) { 251 // shutdown info sent and remote socket closed and we see that before a local close 252 // let the close do the work 253 return; 254 } 255 256 if (LOG.isTraceEnabled()) { 257 LOG.trace(this + " handleTransportFailure: " + e, e); 258 } 259 260 // could be blocked in write with the reconnectMutex held, but still needs to be whacked 261 Transport transport = null; 262 263 if (connectedTransport.compareAndSet(failed, null)) { 264 transport = failed; 265 if (transport != null) { 266 disposeTransport(transport); 267 } 268 } 269 270 synchronized (reconnectMutex) { 271 if (transport != null && connectedTransport.get() == null) { 272 boolean reconnectOk = false; 273 274 if (canReconnect()) { 275 reconnectOk = true; 276 } 277 278 LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}", 279 connectedTransportURI, (reconnectOk ? "," : ", not"), e); 280 281 failedConnectTransportURI = connectedTransportURI; 282 connectedTransportURI = null; 283 connectedToPriority = false; 284 285 if (reconnectOk) { 286 // notify before any reconnect attempt so ack state can be whacked 287 if (transportListener != null) { 288 transportListener.transportInterupted(); 289 } 290 291 updated.remove(failedConnectTransportURI); 292 reconnectTask.wakeup(); 293 } else if (!isDisposed()) { 294 propagateFailureToExceptionListener(e); 295 } 296 } 297 } 298 } 299 300 private boolean canReconnect() { 301 return started && 0 != calculateReconnectAttemptLimit(); 302 } 303 304 public final void handleConnectionControl(ConnectionControl control) { 305 String reconnectStr = control.getReconnectTo(); 306 if (LOG.isTraceEnabled()) { 307 LOG.trace("Received ConnectionControl: {}", control); 308 } 309 310 if (reconnectStr != null) { 311 reconnectStr = reconnectStr.trim(); 312 if (reconnectStr.length() > 0) { 313 try { 314 URI uri = new URI(reconnectStr); 315 if (isReconnectSupported()) { 316 reconnect(uri); 317 LOG.info("Reconnected to: " + uri); 318 } 319 } catch (Exception e) { 320 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 321 } 322 } 323 } 324 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 325 } 326 327 private final void processNewTransports(boolean rebalance, String newTransports) { 328 if (newTransports != null) { 329 newTransports = newTransports.trim(); 330 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 331 List<URI> list = new ArrayList<URI>(); 332 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 333 while (tokenizer.hasMoreTokens()) { 334 String str = tokenizer.nextToken(); 335 try { 336 URI uri = new URI(str); 337 list.add(uri); 338 } catch (Exception e) { 339 LOG.error("Failed to parse broker address: " + str, e); 340 } 341 } 342 if (list.isEmpty() == false) { 343 try { 344 updateURIs(rebalance, list.toArray(new URI[list.size()])); 345 } catch (IOException e) { 346 LOG.error("Failed to update transport URI's from: " + newTransports, e); 347 } 348 } 349 } 350 } 351 } 352 353 @Override 354 public void start() throws Exception { 355 synchronized (reconnectMutex) { 356 LOG.debug("Started {}", this); 357 if (started) { 358 return; 359 } 360 started = true; 361 stateTracker.setMaxCacheSize(getMaxCacheSize()); 362 stateTracker.setTrackMessages(isTrackMessages()); 363 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 364 if (connectedTransport.get() != null) { 365 stateTracker.restore(connectedTransport.get()); 366 } else { 367 reconnect(false); 368 } 369 } 370 } 371 372 @Override 373 public void stop() throws Exception { 374 Transport transportToStop = null; 375 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); 376 377 try { 378 synchronized (reconnectMutex) { 379 if (LOG.isDebugEnabled()) { 380 LOG.debug("Stopped {}", this); 381 } 382 if (!started) { 383 return; 384 } 385 started = false; 386 disposed = true; 387 388 if (connectedTransport.get() != null) { 389 transportToStop = connectedTransport.getAndSet(null); 390 } 391 reconnectMutex.notifyAll(); 392 } 393 synchronized (sleepMutex) { 394 sleepMutex.notifyAll(); 395 } 396 } finally { 397 reconnectTask.shutdown(); 398 reconnectTaskFactory.shutdownNow(); 399 } 400 401 synchronized(backupMutex) { 402 for (BackupTransport backup : backups) { 403 backup.setDisposed(true); 404 Transport transport = backup.getTransport(); 405 if (transport != null) { 406 transport.setTransportListener(disposedListener); 407 backupsToStop.add(transport); 408 } 409 } 410 backups.clear(); 411 } 412 for (Transport transport : backupsToStop) { 413 try { 414 LOG.trace("Stopped backup: {}", transport); 415 disposeTransport(transport); 416 } catch (Exception e) { 417 } 418 } 419 if (transportToStop != null) { 420 transportToStop.stop(); 421 } 422 } 423 424 public long getInitialReconnectDelay() { 425 return initialReconnectDelay; 426 } 427 428 public void setInitialReconnectDelay(long initialReconnectDelay) { 429 this.initialReconnectDelay = initialReconnectDelay; 430 } 431 432 public long getMaxReconnectDelay() { 433 return maxReconnectDelay; 434 } 435 436 public void setMaxReconnectDelay(long maxReconnectDelay) { 437 this.maxReconnectDelay = maxReconnectDelay; 438 } 439 440 public long getReconnectDelay() { 441 return reconnectDelay; 442 } 443 444 public void setReconnectDelay(long reconnectDelay) { 445 this.reconnectDelay = reconnectDelay; 446 } 447 448 public double getReconnectDelayExponent() { 449 return backOffMultiplier; 450 } 451 452 public void setReconnectDelayExponent(double reconnectDelayExponent) { 453 this.backOffMultiplier = reconnectDelayExponent; 454 } 455 456 public Transport getConnectedTransport() { 457 return connectedTransport.get(); 458 } 459 460 public URI getConnectedTransportURI() { 461 return connectedTransportURI; 462 } 463 464 public int getMaxReconnectAttempts() { 465 return maxReconnectAttempts; 466 } 467 468 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 469 this.maxReconnectAttempts = maxReconnectAttempts; 470 } 471 472 public int getStartupMaxReconnectAttempts() { 473 return this.startupMaxReconnectAttempts; 474 } 475 476 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 477 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 478 } 479 480 public long getTimeout() { 481 return timeout; 482 } 483 484 public void setTimeout(long timeout) { 485 this.timeout = timeout; 486 } 487 488 /** 489 * @return Returns the randomize. 490 */ 491 public boolean isRandomize() { 492 return randomize; 493 } 494 495 /** 496 * @param randomize The randomize to set. 497 */ 498 public void setRandomize(boolean randomize) { 499 this.randomize = randomize; 500 } 501 502 public boolean isBackup() { 503 return backup; 504 } 505 506 public void setBackup(boolean backup) { 507 this.backup = backup; 508 } 509 510 public int getBackupPoolSize() { 511 return backupPoolSize; 512 } 513 514 public void setBackupPoolSize(int backupPoolSize) { 515 this.backupPoolSize = backupPoolSize; 516 } 517 518 public int getCurrentBackups() { 519 return this.backups.size(); 520 } 521 522 public boolean isTrackMessages() { 523 return trackMessages; 524 } 525 526 public void setTrackMessages(boolean trackMessages) { 527 this.trackMessages = trackMessages; 528 } 529 530 public boolean isTrackTransactionProducers() { 531 return this.trackTransactionProducers; 532 } 533 534 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 535 this.trackTransactionProducers = trackTransactionProducers; 536 } 537 538 public int getMaxCacheSize() { 539 return maxCacheSize; 540 } 541 542 public void setMaxCacheSize(int maxCacheSize) { 543 this.maxCacheSize = maxCacheSize; 544 } 545 546 public boolean isPriorityBackup() { 547 return priorityBackup; 548 } 549 550 public void setPriorityBackup(boolean priorityBackup) { 551 this.priorityBackup = priorityBackup; 552 } 553 554 public void setPriorityURIs(String priorityURIs) { 555 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 556 while (tokenizer.hasMoreTokens()) { 557 String str = tokenizer.nextToken(); 558 try { 559 URI uri = new URI(str); 560 priorityList.add(uri); 561 } catch (Exception e) { 562 LOG.error("Failed to parse broker address: " + str, e); 563 } 564 } 565 } 566 567 @Override 568 public void oneway(Object o) throws IOException { 569 570 Command command = (Command) o; 571 Exception error = null; 572 try { 573 574 synchronized (reconnectMutex) { 575 576 if (command != null && connectedTransport.get() == null) { 577 if (command.isShutdownInfo()) { 578 // Skipping send of ShutdownInfo command when not connected. 579 return; 580 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 581 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 582 stateTracker.track(command); 583 if (command.isResponseRequired()) { 584 Response response = new Response(); 585 response.setCorrelationId(command.getCommandId()); 586 processCommand(response); 587 } 588 return; 589 } else if (command instanceof MessagePull) { 590 // Simulate response to MessagePull if timed as we can't honor that now. 591 MessagePull pullRequest = (MessagePull) command; 592 if (pullRequest.getTimeout() != 0) { 593 MessageDispatch dispatch = new MessageDispatch(); 594 dispatch.setConsumerId(pullRequest.getConsumerId()); 595 dispatch.setDestination(pullRequest.getDestination()); 596 processCommand(dispatch); 597 } 598 return; 599 } 600 } 601 602 // Keep trying until the message is sent. 603 for (int i = 0; !disposed; i++) { 604 try { 605 606 // Wait for transport to be connected. 607 Transport transport = connectedTransport.get(); 608 long start = System.currentTimeMillis(); 609 boolean timedout = false; 610 while (transport == null && !disposed && connectionFailure == null 611 && !Thread.currentThread().isInterrupted() && willReconnect()) { 612 613 LOG.trace("Waiting for transport to reconnect..: {}", command); 614 long end = System.currentTimeMillis(); 615 if (command.isMessage() && timeout > 0 && (end - start > timeout)) { 616 timedout = true; 617 LOG.info("Failover timed out after {} ms", (end - start)); 618 break; 619 } 620 try { 621 reconnectMutex.wait(100); 622 } catch (InterruptedException e) { 623 Thread.currentThread().interrupt(); 624 LOG.debug("Interupted:", e); 625 } 626 transport = connectedTransport.get(); 627 } 628 629 if (transport == null) { 630 // Previous loop may have exited due to use being 631 // disposed. 632 if (disposed) { 633 error = new IOException("Transport disposed."); 634 } else if (connectionFailure != null) { 635 error = connectionFailure; 636 } else if (timedout == true) { 637 error = new IOException("Failover timeout of " + timeout + " ms reached."); 638 } else if (!willReconnect()) { 639 error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded"); 640 } else { 641 error = new IOException("Unexpected failure."); 642 } 643 break; 644 } 645 646 Tracked tracked = null; 647 try { 648 tracked = stateTracker.track(command); 649 } catch (IOException ioe) { 650 LOG.debug("Cannot track the command {} {}", command, ioe); 651 } 652 // If it was a request and it was not being tracked by 653 // the state tracker, 654 // then hold it in the requestMap so that we can replay 655 // it later. 656 synchronized (requestMap) { 657 if (tracked != null && tracked.isWaitingForResponse()) { 658 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 659 } else if (tracked == null && command.isResponseRequired()) { 660 requestMap.put(Integer.valueOf(command.getCommandId()), command); 661 } 662 } 663 664 // Send the message. 665 try { 666 transport.oneway(command); 667 stateTracker.trackBack(command); 668 if (command.isShutdownInfo()) { 669 shuttingDown = true; 670 } 671 } catch (IOException e) { 672 673 // If the command was not tracked.. we will retry in 674 // this method 675 if (tracked == null && canReconnect()) { 676 677 // since we will retry in this method.. take it 678 // out of the request 679 // map so that it is not sent 2 times on 680 // recovery 681 if (command.isResponseRequired()) { 682 requestMap.remove(Integer.valueOf(command.getCommandId())); 683 } 684 685 // Rethrow the exception so it will handled by 686 // the outer catch 687 throw e; 688 } else { 689 // Handle the error but allow the method to return since the 690 // tracked commands are replayed on reconnect. 691 LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); 692 handleTransportFailure(e); 693 } 694 } 695 696 return; 697 } catch (IOException e) { 698 LOG.debug("Send oneway attempt: {} failed for command: {}", i, command); 699 handleTransportFailure(e); 700 } 701 } 702 } 703 } catch (InterruptedException e) { 704 // Some one may be trying to stop our thread. 705 Thread.currentThread().interrupt(); 706 throw new InterruptedIOException(); 707 } 708 709 if (!disposed) { 710 if (error != null) { 711 if (error instanceof IOException) { 712 throw (IOException) error; 713 } 714 throw IOExceptionSupport.create(error); 715 } 716 } 717 } 718 719 private boolean willReconnect() { 720 return firstConnection || 0 != calculateReconnectAttemptLimit(); 721 } 722 723 @Override 724 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 725 throw new AssertionError("Unsupported Method"); 726 } 727 728 @Override 729 public Object request(Object command) throws IOException { 730 throw new AssertionError("Unsupported Method"); 731 } 732 733 @Override 734 public Object request(Object command, int timeout) throws IOException { 735 throw new AssertionError("Unsupported Method"); 736 } 737 738 @Override 739 public void add(boolean rebalance, URI u[]) { 740 boolean newURI = false; 741 for (URI uri : u) { 742 if (!contains(uri)) { 743 uris.add(uri); 744 newURI = true; 745 } 746 } 747 if (newURI) { 748 reconnect(rebalance); 749 } 750 } 751 752 @Override 753 public void remove(boolean rebalance, URI u[]) { 754 for (URI uri : u) { 755 uris.remove(uri); 756 } 757 // rebalance is automatic if any connected to removed/stopped broker 758 } 759 760 public void add(boolean rebalance, String u) { 761 try { 762 URI newURI = new URI(u); 763 if (contains(newURI) == false) { 764 uris.add(newURI); 765 reconnect(rebalance); 766 } 767 768 } catch (Exception e) { 769 LOG.error("Failed to parse URI: {}", u); 770 } 771 } 772 773 public void reconnect(boolean rebalance) { 774 synchronized (reconnectMutex) { 775 if (started) { 776 if (rebalance) { 777 doRebalance = true; 778 } 779 LOG.debug("Waking up reconnect task"); 780 try { 781 reconnectTask.wakeup(); 782 } catch (InterruptedException e) { 783 Thread.currentThread().interrupt(); 784 } 785 } else { 786 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 787 } 788 } 789 } 790 791 private List<URI> getConnectList() { 792 if (!updated.isEmpty()) { 793 return updated; 794 } 795 ArrayList<URI> l = new ArrayList<URI>(uris); 796 boolean removed = false; 797 if (failedConnectTransportURI != null) { 798 removed = l.remove(failedConnectTransportURI); 799 } 800 if (randomize) { 801 // Randomly, reorder the list by random swapping 802 for (int i = 0; i < l.size(); i++) { 803 // meed parenthesis due other JDKs (see AMQ-4826) 804 int p = ((int) (Math.random() * 100)) % l.size(); 805 URI t = l.get(p); 806 l.set(p, l.get(i)); 807 l.set(i, t); 808 } 809 } 810 if (removed) { 811 l.add(failedConnectTransportURI); 812 } 813 814 LOG.debug("urlList connectionList:{}, from: {}", l, uris); 815 816 return l; 817 } 818 819 @Override 820 public TransportListener getTransportListener() { 821 return transportListener; 822 } 823 824 @Override 825 public void setTransportListener(TransportListener commandListener) { 826 synchronized (listenerMutex) { 827 this.transportListener = commandListener; 828 listenerMutex.notifyAll(); 829 } 830 } 831 832 @Override 833 public <T> T narrow(Class<T> target) { 834 835 if (target.isAssignableFrom(getClass())) { 836 return target.cast(this); 837 } 838 Transport transport = connectedTransport.get(); 839 if (transport != null) { 840 return transport.narrow(target); 841 } 842 return null; 843 844 } 845 846 protected void restoreTransport(Transport t) throws Exception, IOException { 847 t.start(); 848 // send information to the broker - informing it we are an ft client 849 ConnectionControl cc = new ConnectionControl(); 850 cc.setFaultTolerant(true); 851 t.oneway(cc); 852 stateTracker.restore(t); 853 Map<Integer, Command> tmpMap = null; 854 synchronized (requestMap) { 855 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 856 } 857 for (Command command : tmpMap.values()) { 858 LOG.trace("restore requestMap, replay: {}", command); 859 t.oneway(command); 860 } 861 } 862 863 public boolean isUseExponentialBackOff() { 864 return useExponentialBackOff; 865 } 866 867 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 868 this.useExponentialBackOff = useExponentialBackOff; 869 } 870 871 @Override 872 public String toString() { 873 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 874 } 875 876 @Override 877 public String getRemoteAddress() { 878 Transport transport = connectedTransport.get(); 879 if (transport != null) { 880 return transport.getRemoteAddress(); 881 } 882 return null; 883 } 884 885 @Override 886 public boolean isFaultTolerant() { 887 return true; 888 } 889 890 private void doUpdateURIsFromDisk() { 891 // If updateURIsURL is specified, read the file and add any new 892 // transport URI's to this FailOverTransport. 893 // Note: Could track file timestamp to avoid unnecessary reading. 894 String fileURL = getUpdateURIsURL(); 895 if (fileURL != null) { 896 BufferedReader in = null; 897 String newUris = null; 898 StringBuffer buffer = new StringBuffer(); 899 900 try { 901 in = new BufferedReader(getURLStream(fileURL)); 902 while (true) { 903 String line = in.readLine(); 904 if (line == null) { 905 break; 906 } 907 buffer.append(line); 908 } 909 newUris = buffer.toString(); 910 } catch (IOException ioe) { 911 LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe); 912 } finally { 913 if (in != null) { 914 try { 915 in.close(); 916 } catch (IOException ioe) { 917 // ignore 918 } 919 } 920 } 921 922 processNewTransports(isRebalanceUpdateURIs(), newUris); 923 } 924 } 925 926 final boolean doReconnect() { 927 Exception failure = null; 928 synchronized (reconnectMutex) { 929 930 // First ensure we are up to date. 931 doUpdateURIsFromDisk(); 932 933 if (disposed || connectionFailure != null) { 934 reconnectMutex.notifyAll(); 935 } 936 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 937 return false; 938 } else { 939 List<URI> connectList = getConnectList(); 940 if (connectList.isEmpty()) { 941 failure = new IOException("No uris available to connect to."); 942 } else { 943 if (doRebalance) { 944 if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { 945 // already connected to first in the list, no need to rebalance 946 doRebalance = false; 947 return false; 948 } else { 949 LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList); 950 951 try { 952 Transport transport = this.connectedTransport.getAndSet(null); 953 if (transport != null) { 954 disposeTransport(transport); 955 } 956 } catch (Exception e) { 957 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 958 } 959 } 960 doRebalance = false; 961 } 962 963 resetReconnectDelay(); 964 965 Transport transport = null; 966 URI uri = null; 967 968 // If we have a backup already waiting lets try it. 969 synchronized (backupMutex) { 970 if ((priorityBackup || backup) && !backups.isEmpty()) { 971 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 972 if (randomize) { 973 Collections.shuffle(l); 974 } 975 BackupTransport bt = l.remove(0); 976 backups.remove(bt); 977 transport = bt.getTransport(); 978 uri = bt.getUri(); 979 processCommand(bt.getBrokerInfo()); 980 if (priorityBackup && priorityBackupAvailable) { 981 Transport old = this.connectedTransport.getAndSet(null); 982 if (old != null) { 983 disposeTransport(old); 984 } 985 priorityBackupAvailable = false; 986 } 987 } 988 } 989 990 // When there was no backup and we are reconnecting for the first time 991 // we honor the initialReconnectDelay before trying a new connection, after 992 // this normal reconnect delay happens following a failed attempt. 993 if (transport == null && !firstConnection && connectFailures == 0 && initialReconnectDelay > 0 && !disposed) { 994 // reconnectDelay will be equal to initialReconnectDelay since we are on 995 // the first connect attempt after we had a working connection, doDelay 996 // will apply updates to move to the next reconnectDelay value based on 997 // configuration. 998 doDelay(); 999 } 1000 1001 Iterator<URI> iter = connectList.iterator(); 1002 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 1003 1004 try { 1005 SslContext.setCurrentSslContext(brokerSslContext); 1006 1007 // We could be starting with a backup and if so we wait to grab a 1008 // URI from the pool until next time around. 1009 if (transport == null) { 1010 uri = addExtraQueryOptions(iter.next()); 1011 transport = TransportFactory.compositeConnect(uri); 1012 } 1013 1014 LOG.debug("Attempting {}th connect to: {}", connectFailures, uri); 1015 1016 transport.setTransportListener(createTransportListener(transport)); 1017 transport.start(); 1018 1019 if (started && !firstConnection) { 1020 restoreTransport(transport); 1021 } 1022 1023 LOG.debug("Connection established"); 1024 1025 reconnectDelay = initialReconnectDelay; 1026 connectedTransportURI = uri; 1027 connectedTransport.set(transport); 1028 connectedToPriority = isPriority(connectedTransportURI); 1029 reconnectMutex.notifyAll(); 1030 connectFailures = 0; 1031 1032 // Make sure on initial startup, that the transportListener 1033 // has been initialized for this instance. 1034 synchronized (listenerMutex) { 1035 if (transportListener == null) { 1036 try { 1037 // if it isn't set after 2secs - it probably never will be 1038 listenerMutex.wait(2000); 1039 } catch (InterruptedException ex) { 1040 } 1041 } 1042 } 1043 1044 if (transportListener != null) { 1045 transportListener.transportResumed(); 1046 } else { 1047 LOG.debug("transport resumed by transport listener not set"); 1048 } 1049 1050 if (firstConnection) { 1051 firstConnection = false; 1052 LOG.info("Successfully connected to {}", uri); 1053 } else { 1054 LOG.info("Successfully reconnected to {}", uri); 1055 } 1056 1057 return false; 1058 } catch (Exception e) { 1059 failure = e; 1060 LOG.debug("Connect fail to: {}, reason: {}", uri, e); 1061 if (transport != null) { 1062 try { 1063 transport.stop(); 1064 transport = null; 1065 } catch (Exception ee) { 1066 LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee); 1067 } 1068 } 1069 } finally { 1070 SslContext.setCurrentSslContext(null); 1071 } 1072 } 1073 } 1074 } 1075 1076 int reconnectLimit = calculateReconnectAttemptLimit(); 1077 1078 connectFailures++; 1079 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1080 LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures); 1081 connectionFailure = failure; 1082 1083 // Make sure on initial startup, that the transportListener has been 1084 // initialized for this instance. 1085 synchronized (listenerMutex) { 1086 if (transportListener == null) { 1087 try { 1088 listenerMutex.wait(2000); 1089 } catch (InterruptedException ex) { 1090 } 1091 } 1092 } 1093 1094 propagateFailureToExceptionListener(connectionFailure); 1095 return false; 1096 } 1097 1098 int warnInterval = getWarnAfterReconnectAttempts(); 1099 if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { 1100 LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", 1101 uris, connectFailures); 1102 } 1103 } 1104 1105 if (!disposed) { 1106 doDelay(); 1107 } 1108 1109 return !disposed; 1110 } 1111 1112 private void doDelay() { 1113 if (reconnectDelay > 0) { 1114 synchronized (sleepMutex) { 1115 LOG.debug("Waiting {} ms before attempting connection", reconnectDelay); 1116 try { 1117 sleepMutex.wait(reconnectDelay); 1118 } catch (InterruptedException e) { 1119 Thread.currentThread().interrupt(); 1120 } 1121 } 1122 } 1123 1124 if (useExponentialBackOff) { 1125 // Exponential increment of reconnect delay. 1126 reconnectDelay *= backOffMultiplier; 1127 if (reconnectDelay > maxReconnectDelay) { 1128 reconnectDelay = maxReconnectDelay; 1129 } 1130 } 1131 } 1132 1133 private void resetReconnectDelay() { 1134 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1135 reconnectDelay = initialReconnectDelay; 1136 } 1137 } 1138 1139 /* 1140 * called with reconnectMutex held 1141 */ 1142 private void propagateFailureToExceptionListener(Exception exception) { 1143 if (transportListener != null) { 1144 if (exception instanceof IOException) { 1145 transportListener.onException((IOException)exception); 1146 } else { 1147 transportListener.onException(IOExceptionSupport.create(exception)); 1148 } 1149 } 1150 reconnectMutex.notifyAll(); 1151 } 1152 1153 private int calculateReconnectAttemptLimit() { 1154 int maxReconnectValue = this.maxReconnectAttempts; 1155 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1156 maxReconnectValue = this.startupMaxReconnectAttempts; 1157 } 1158 return maxReconnectValue; 1159 } 1160 1161 private boolean shouldBuildBackups() { 1162 return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); 1163 } 1164 1165 final boolean buildBackups() { 1166 synchronized (backupMutex) { 1167 if (!disposed && shouldBuildBackups()) { 1168 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1169 List<URI> connectList = getConnectList(); 1170 for (URI uri: connectList) { 1171 if (!backupList.contains(uri)) { 1172 backupList.add(uri); 1173 } 1174 } 1175 // removed disposed backups 1176 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1177 for (BackupTransport bt : backups) { 1178 if (bt.isDisposed()) { 1179 disposedList.add(bt); 1180 } 1181 } 1182 backups.removeAll(disposedList); 1183 disposedList.clear(); 1184 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { 1185 URI uri = addExtraQueryOptions(iter.next()); 1186 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1187 try { 1188 SslContext.setCurrentSslContext(brokerSslContext); 1189 BackupTransport bt = new BackupTransport(this); 1190 bt.setUri(uri); 1191 if (!backups.contains(bt)) { 1192 Transport t = TransportFactory.compositeConnect(uri); 1193 t.setTransportListener(bt); 1194 t.start(); 1195 bt.setTransport(t); 1196 if (priorityBackup && isPriority(uri)) { 1197 priorityBackupAvailable = true; 1198 backups.add(0, bt); 1199 // if this priority backup overflows the pool 1200 // remove the backup with the lowest priority 1201 if (backups.size() > backupPoolSize) { 1202 BackupTransport disposeTransport = backups.remove(backups.size() - 1); 1203 disposeTransport.setDisposed(true); 1204 Transport transport = disposeTransport.getTransport(); 1205 if (transport != null) { 1206 transport.setTransportListener(disposedListener); 1207 disposeTransport(transport); 1208 } 1209 } 1210 } else { 1211 backups.add(bt); 1212 } 1213 } 1214 } catch (Exception e) { 1215 LOG.debug("Failed to build backup ", e); 1216 } finally { 1217 SslContext.setCurrentSslContext(null); 1218 } 1219 } 1220 } 1221 } 1222 } 1223 return false; 1224 } 1225 1226 protected boolean isPriority(URI uri) { 1227 if (!priorityBackup) { 1228 return false; 1229 } 1230 1231 if (!priorityList.isEmpty()) { 1232 return priorityList.contains(uri); 1233 } 1234 return uris.indexOf(uri) == 0; 1235 } 1236 1237 @Override 1238 public boolean isDisposed() { 1239 return disposed; 1240 } 1241 1242 @Override 1243 public boolean isConnected() { 1244 return connectedTransport.get() != null; 1245 } 1246 1247 @Override 1248 public void reconnect(URI uri) throws IOException { 1249 add(true, new URI[]{uri}); 1250 } 1251 1252 @Override 1253 public boolean isReconnectSupported() { 1254 return this.reconnectSupported; 1255 } 1256 1257 public void setReconnectSupported(boolean value) { 1258 this.reconnectSupported = value; 1259 } 1260 1261 @Override 1262 public boolean isUpdateURIsSupported() { 1263 return this.updateURIsSupported; 1264 } 1265 1266 public void setUpdateURIsSupported(boolean value) { 1267 this.updateURIsSupported = value; 1268 } 1269 1270 @Override 1271 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1272 if (isUpdateURIsSupported()) { 1273 HashSet<URI> copy = new HashSet<URI>(); 1274 synchronized (reconnectMutex) { 1275 copy.addAll(this.updated); 1276 updated.clear(); 1277 if (updatedURIs != null && updatedURIs.length > 0) { 1278 for (URI uri : updatedURIs) { 1279 if (uri != null && !updated.contains(uri)) { 1280 updated.add(uri); 1281 } 1282 } 1283 } 1284 } 1285 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1286 buildBackups(); 1287 reconnect(rebalance); 1288 } 1289 } 1290 } 1291 1292 /** 1293 * @return the updateURIsURL 1294 */ 1295 public String getUpdateURIsURL() { 1296 return this.updateURIsURL; 1297 } 1298 1299 /** 1300 * @param updateURIsURL the updateURIsURL to set 1301 */ 1302 public void setUpdateURIsURL(String updateURIsURL) { 1303 this.updateURIsURL = updateURIsURL; 1304 } 1305 1306 /** 1307 * @return the rebalanceUpdateURIs 1308 */ 1309 public boolean isRebalanceUpdateURIs() { 1310 return this.rebalanceUpdateURIs; 1311 } 1312 1313 /** 1314 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1315 */ 1316 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1317 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1318 } 1319 1320 @Override 1321 public int getReceiveCounter() { 1322 Transport transport = connectedTransport.get(); 1323 if (transport == null) { 1324 return 0; 1325 } 1326 return transport.getReceiveCounter(); 1327 } 1328 1329 public int getConnectFailures() { 1330 return connectFailures; 1331 } 1332 1333 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1334 synchronized (reconnectMutex) { 1335 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1336 } 1337 } 1338 1339 public ConnectionStateTracker getStateTracker() { 1340 return stateTracker; 1341 } 1342 1343 private boolean contains(URI newURI) { 1344 boolean result = false; 1345 for (URI uri : uris) { 1346 if (compareURIs(newURI, uri)) { 1347 result = true; 1348 break; 1349 } 1350 } 1351 1352 return result; 1353 } 1354 1355 private boolean compareURIs(final URI first, final URI second) { 1356 1357 boolean result = false; 1358 if (first == null || second == null) { 1359 return result; 1360 } 1361 1362 if (first.getPort() == second.getPort()) { 1363 InetAddress firstAddr = null; 1364 InetAddress secondAddr = null; 1365 try { 1366 firstAddr = InetAddress.getByName(first.getHost()); 1367 secondAddr = InetAddress.getByName(second.getHost()); 1368 1369 if (firstAddr.equals(secondAddr)) { 1370 result = true; 1371 } 1372 1373 } catch(IOException e) { 1374 1375 if (firstAddr == null) { 1376 LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e); 1377 } else { 1378 LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e); 1379 } 1380 1381 if (first.getHost().equalsIgnoreCase(second.getHost())) { 1382 result = true; 1383 } 1384 } 1385 } 1386 1387 return result; 1388 } 1389 1390 private InputStreamReader getURLStream(String path) throws IOException { 1391 InputStreamReader result = null; 1392 URL url = null; 1393 try { 1394 url = new URL(path); 1395 result = new InputStreamReader(url.openStream()); 1396 } catch (MalformedURLException e) { 1397 // ignore - it could be a path to a a local file 1398 } 1399 if (result == null) { 1400 result = new FileReader(path); 1401 } 1402 return result; 1403 } 1404 1405 private URI addExtraQueryOptions(URI uri) { 1406 try { 1407 if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) { 1408 if( uri.getQuery() == null ) { 1409 uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions); 1410 } else { 1411 uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions); 1412 } 1413 } 1414 } catch (URISyntaxException e) { 1415 throw new RuntimeException(e); 1416 } 1417 return uri; 1418 } 1419 1420 public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) { 1421 this.nestedExtraQueryOptions = nestedExtraQueryOptions; 1422 } 1423 1424 public int getWarnAfterReconnectAttempts() { 1425 return warnAfterReconnectAttempts; 1426 } 1427 1428 /** 1429 * Sets the number of Connect / Reconnect attempts that must occur before a warn message 1430 * is logged indicating that the transport is not connected. This can be useful when the 1431 * client is running inside some container or service as it give an indication of some 1432 * problem with the client connection that might not otherwise be visible. To disable the 1433 * log messages this value should be set to a value @{code attempts <= 0} 1434 * 1435 * @param warnAfterReconnectAttempts 1436 * The number of failed connection attempts that must happen before a warning is logged. 1437 */ 1438 public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { 1439 this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; 1440 } 1441 1442}