001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import static org.apache.activemq.network.jms.ReconnectionPolicy.INFINITE; 020 021import java.util.Iterator; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.ThreadFactory; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030import java.util.concurrent.atomic.AtomicReference; 031 032import javax.jms.Connection; 033import javax.jms.Destination; 034 035import org.apache.activemq.ActiveMQConnectionFactory; 036import org.apache.activemq.Service; 037import org.apache.activemq.broker.BrokerService; 038import org.apache.activemq.util.LRUCache; 039import org.apache.activemq.util.ThreadPoolUtils; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some 045 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself 046 * aimed to be in compliance with the JMS 1.0.2 specification. 047 */ 048public abstract class JmsConnector implements Service { 049 050 private static int nextId; 051 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class); 052 053 protected boolean preferJndiDestinationLookup = false; 054 protected JndiLookupFactory jndiLocalTemplate; 055 protected JndiLookupFactory jndiOutboundTemplate; 056 protected JmsMesageConvertor inboundMessageConvertor; 057 protected JmsMesageConvertor outboundMessageConvertor; 058 protected AtomicBoolean initialized = new AtomicBoolean(false); 059 protected AtomicBoolean localSideInitialized = new AtomicBoolean(false); 060 protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false); 061 protected AtomicBoolean started = new AtomicBoolean(false); 062 protected AtomicBoolean failed = new AtomicBoolean(); 063 protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>(); 064 protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>(); 065 protected ActiveMQConnectionFactory embeddedConnectionFactory; 066 protected int replyToDestinationCacheSize = 10000; 067 protected String outboundUsername; 068 protected String outboundPassword; 069 protected String localUsername; 070 protected String localPassword; 071 protected String outboundClientId; 072 protected String localClientId; 073 protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache(); 074 075 private ReconnectionPolicy policy = new ReconnectionPolicy(); 076 protected ThreadPoolExecutor connectionService; 077 private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 078 private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>(); 079 private String name; 080 081 private static LRUCache<Destination, DestinationBridge> createLRUCache() { 082 return new LRUCache<Destination, DestinationBridge>() { 083 private static final long serialVersionUID = -7446792754185879286L; 084 085 @Override 086 protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) { 087 if (size() > maxCacheSize) { 088 Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator(); 089 Map.Entry<Destination, DestinationBridge> lru = iter.next(); 090 remove(lru.getKey()); 091 DestinationBridge bridge = lru.getValue(); 092 try { 093 bridge.stop(); 094 LOG.info("Expired bridge: {}", bridge); 095 } catch (Exception e) { 096 LOG.warn("Stopping expired bridge {} caused an exception", bridge, e); 097 } 098 } 099 return false; 100 } 101 }; 102 } 103 104 public boolean init() { 105 boolean result = initialized.compareAndSet(false, true); 106 if (result) { 107 if (jndiLocalTemplate == null) { 108 jndiLocalTemplate = new JndiLookupFactory(); 109 } 110 if (jndiOutboundTemplate == null) { 111 jndiOutboundTemplate = new JndiLookupFactory(); 112 } 113 if (inboundMessageConvertor == null) { 114 inboundMessageConvertor = new SimpleJmsMessageConvertor(); 115 } 116 if (outboundMessageConvertor == null) { 117 outboundMessageConvertor = new SimpleJmsMessageConvertor(); 118 } 119 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize()); 120 121 connectionService = createExecutor(); 122 123 // Subclasses can override this to customize their own it. 124 result = doConnectorInit(); 125 } 126 return result; 127 } 128 129 protected boolean doConnectorInit() { 130 131 // We try to make a connection via a sync call first so that the 132 // JmsConnector is fully initialized before the start call returns 133 // in order to avoid missing any messages that are dispatched 134 // immediately after startup. If either side fails we queue an 135 // asynchronous task to manage the reconnect attempts. 136 137 try { 138 initializeLocalConnection(); 139 localSideInitialized.set(true); 140 } catch(Exception e) { 141 // Queue up the task to attempt the local connection. 142 scheduleAsyncLocalConnectionReconnect(); 143 } 144 145 try { 146 initializeForeignConnection(); 147 foreignSideInitialized.set(true); 148 } catch(Exception e) { 149 // Queue up the task for the foreign connection now. 150 scheduleAsyncForeignConnectionReconnect(); 151 } 152 153 return true; 154 } 155 156 @Override 157 public void start() throws Exception { 158 if (started.compareAndSet(false, true)) { 159 init(); 160 for (DestinationBridge bridge : inboundBridges) { 161 bridge.start(); 162 } 163 for (DestinationBridge bridge : outboundBridges) { 164 bridge.start(); 165 } 166 LOG.info("JMS Connector {} started", getName()); 167 } 168 } 169 170 @Override 171 public void stop() throws Exception { 172 if (started.compareAndSet(true, false)) { 173 174 ThreadPoolUtils.shutdown(connectionService); 175 connectionService = null; 176 177 if (foreignConnection.get() != null) { 178 try { 179 foreignConnection.get().close(); 180 } catch (Exception e) { 181 } 182 } 183 184 if (localConnection.get() != null) { 185 try { 186 localConnection.get().close(); 187 } catch (Exception e) { 188 } 189 } 190 191 for (DestinationBridge bridge : inboundBridges) { 192 bridge.stop(); 193 } 194 for (DestinationBridge bridge : outboundBridges) { 195 bridge.stop(); 196 } 197 LOG.info("JMS Connector {} stopped", getName()); 198 } 199 } 200 201 public void clearBridges() { 202 inboundBridges.clear(); 203 outboundBridges.clear(); 204 replyToBridges.clear(); 205 } 206 207 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection); 208 209 /** 210 * One way to configure the local connection - this is called by The 211 * BrokerService when the Connector is embedded 212 * 213 * @param service 214 */ 215 public void setBrokerService(BrokerService service) { 216 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI()); 217 } 218 219 public Connection getLocalConnection() { 220 return this.localConnection.get(); 221 } 222 223 public Connection getForeignConnection() { 224 return this.foreignConnection.get(); 225 } 226 227 /** 228 * @return Returns the jndiTemplate. 229 */ 230 public JndiLookupFactory getJndiLocalTemplate() { 231 return jndiLocalTemplate; 232 } 233 234 /** 235 * @param jndiTemplate The jndiTemplate to set. 236 */ 237 public void setJndiLocalTemplate(JndiLookupFactory jndiTemplate) { 238 this.jndiLocalTemplate = jndiTemplate; 239 } 240 241 /** 242 * @return Returns the jndiOutboundTemplate. 243 */ 244 public JndiLookupFactory getJndiOutboundTemplate() { 245 return jndiOutboundTemplate; 246 } 247 248 /** 249 * @param jndiOutboundTemplate The jndiOutboundTemplate to set. 250 */ 251 public void setJndiOutboundTemplate(JndiLookupFactory jndiOutboundTemplate) { 252 this.jndiOutboundTemplate = jndiOutboundTemplate; 253 } 254 255 /** 256 * @return Returns the inboundMessageConvertor. 257 */ 258 public JmsMesageConvertor getInboundMessageConvertor() { 259 return inboundMessageConvertor; 260 } 261 262 /** 263 * @param jmsMessageConvertor The jmsMessageConvertor to set. 264 */ 265 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) { 266 this.inboundMessageConvertor = jmsMessageConvertor; 267 } 268 269 /** 270 * @return Returns the outboundMessageConvertor. 271 */ 272 public JmsMesageConvertor getOutboundMessageConvertor() { 273 return outboundMessageConvertor; 274 } 275 276 /** 277 * @param outboundMessageConvertor The outboundMessageConvertor to set. 278 */ 279 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) { 280 this.outboundMessageConvertor = outboundMessageConvertor; 281 } 282 283 /** 284 * @return Returns the replyToDestinationCacheSize. 285 */ 286 public int getReplyToDestinationCacheSize() { 287 return replyToDestinationCacheSize; 288 } 289 290 /** 291 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set. 292 */ 293 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) { 294 this.replyToDestinationCacheSize = replyToDestinationCacheSize; 295 } 296 297 /** 298 * @return Returns the localPassword. 299 */ 300 public String getLocalPassword() { 301 return localPassword; 302 } 303 304 /** 305 * @param localPassword The localPassword to set. 306 */ 307 public void setLocalPassword(String localPassword) { 308 this.localPassword = localPassword; 309 } 310 311 /** 312 * @return Returns the localUsername. 313 */ 314 public String getLocalUsername() { 315 return localUsername; 316 } 317 318 /** 319 * @param localUsername The localUsername to set. 320 */ 321 public void setLocalUsername(String localUsername) { 322 this.localUsername = localUsername; 323 } 324 325 /** 326 * @return Returns the outboundPassword. 327 */ 328 public String getOutboundPassword() { 329 return outboundPassword; 330 } 331 332 /** 333 * @param outboundPassword The outboundPassword to set. 334 */ 335 public void setOutboundPassword(String outboundPassword) { 336 this.outboundPassword = outboundPassword; 337 } 338 339 /** 340 * @return Returns the outboundUsername. 341 */ 342 public String getOutboundUsername() { 343 return outboundUsername; 344 } 345 346 /** 347 * @param outboundUsername The outboundUsername to set. 348 */ 349 public void setOutboundUsername(String outboundUsername) { 350 this.outboundUsername = outboundUsername; 351 } 352 353 /** 354 * @return the outboundClientId 355 */ 356 public String getOutboundClientId() { 357 return outboundClientId; 358 } 359 360 /** 361 * @param outboundClientId the outboundClientId to set 362 */ 363 public void setOutboundClientId(String outboundClientId) { 364 this.outboundClientId = outboundClientId; 365 } 366 367 /** 368 * @return the localClientId 369 */ 370 public String getLocalClientId() { 371 return localClientId; 372 } 373 374 /** 375 * @param localClientId the localClientId to set 376 */ 377 public void setLocalClientId(String localClientId) { 378 this.localClientId = localClientId; 379 } 380 381 /** 382 * @return the currently configured reconnection policy. 383 */ 384 public ReconnectionPolicy getReconnectionPolicy() { 385 return this.policy; 386 } 387 388 /** 389 * @param policy The new reconnection policy this {@link JmsConnector} should use. 390 */ 391 public void setReconnectionPolicy(ReconnectionPolicy policy) { 392 this.policy = policy; 393 } 394 395 /** 396 * @return the preferJndiDestinationLookup 397 */ 398 public boolean isPreferJndiDestinationLookup() { 399 return preferJndiDestinationLookup; 400 } 401 402 /** 403 * Sets whether the connector should prefer to first try to find a destination in JNDI before 404 * using JMS semantics to create a Destination. By default the connector will first use JMS 405 * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that 406 * ordering. 407 * 408 * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set 409 */ 410 public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) { 411 this.preferJndiDestinationLookup = preferJndiDestinationLookup; 412 } 413 414 /** 415 * @return returns true if the {@link JmsConnector} is connected to both brokers. 416 */ 417 public boolean isConnected() { 418 return localConnection.get() != null && foreignConnection.get() != null; 419 } 420 421 protected void addInboundBridge(DestinationBridge bridge) { 422 if (!inboundBridges.contains(bridge)) { 423 inboundBridges.add(bridge); 424 } 425 } 426 427 protected void addOutboundBridge(DestinationBridge bridge) { 428 if (!outboundBridges.contains(bridge)) { 429 outboundBridges.add(bridge); 430 } 431 } 432 433 protected void removeInboundBridge(DestinationBridge bridge) { 434 inboundBridges.remove(bridge); 435 } 436 437 protected void removeOutboundBridge(DestinationBridge bridge) { 438 outboundBridges.remove(bridge); 439 } 440 441 public String getName() { 442 if (name == null) { 443 name = "Connector:" + getNextId(); 444 } 445 return name; 446 } 447 448 public void setName(String name) { 449 this.name = name; 450 } 451 452 private static synchronized int getNextId() { 453 return nextId++; 454 } 455 456 public boolean isFailed() { 457 return this.failed.get(); 458 } 459 460 /** 461 * Performs the work of connection to the local side of the Connection. 462 * <p> 463 * This creates the initial connection to the local end of the {@link JmsConnector} 464 * and then sets up all the destination bridges with the information needed to bridge 465 * on the local side of the connection. 466 * 467 * @throws Exception if the connection cannot be established for any reason. 468 */ 469 protected abstract void initializeLocalConnection() throws Exception; 470 471 /** 472 * Performs the work of connection to the foreign side of the Connection. 473 * <p> 474 * This creates the initial connection to the foreign end of the {@link JmsConnector} 475 * and then sets up all the destination bridges with the information needed to bridge 476 * on the foreign side of the connection. 477 * 478 * @throws Exception if the connection cannot be established for any reason. 479 */ 480 protected abstract void initializeForeignConnection() throws Exception; 481 482 /** 483 * Callback method that the Destination bridges can use to report an exception to occurs 484 * during normal bridging operations. 485 * 486 * @param connection 487 * The connection that was in use when the failure occured. 488 */ 489 void handleConnectionFailure(Connection connection) { 490 491 // Can happen if async exception listener kicks in at the same time. 492 if (connection == null || !this.started.get()) { 493 return; 494 } 495 496 LOG.info("JmsConnector handling loss of connection [{}]", connection.toString()); 497 498 // TODO - How do we handle the re-wiring of replyToBridges in this case. 499 replyToBridges.clear(); 500 501 if (this.foreignConnection.compareAndSet(connection, null)) { 502 503 // Stop the inbound bridges when the foreign connection is dropped since 504 // the bridge has no consumer and needs to be restarted once a new connection 505 // to the foreign side is made. 506 for (DestinationBridge bridge : inboundBridges) { 507 try { 508 bridge.stop(); 509 } catch(Exception e) { 510 } 511 } 512 513 // We got here first and cleared the connection, now we queue a reconnect. 514 this.connectionService.execute(new Runnable() { 515 516 @Override 517 public void run() { 518 try { 519 doInitializeConnection(false); 520 } catch (Exception e) { 521 LOG.error("Failed to initialize foreign connection for the JMSConnector", e); 522 } 523 } 524 }); 525 526 } else if (this.localConnection.compareAndSet(connection, null)) { 527 528 // Stop the outbound bridges when the local connection is dropped since 529 // the bridge has no consumer and needs to be restarted once a new connection 530 // to the local side is made. 531 for (DestinationBridge bridge : outboundBridges) { 532 try { 533 bridge.stop(); 534 } catch(Exception e) { 535 } 536 } 537 538 // We got here first and cleared the connection, now we queue a reconnect. 539 this.connectionService.execute(new Runnable() { 540 541 @Override 542 public void run() { 543 try { 544 doInitializeConnection(true); 545 } catch (Exception e) { 546 LOG.error("Failed to initialize local connection for the JMSConnector", e); 547 } 548 } 549 }); 550 } 551 } 552 553 private void scheduleAsyncLocalConnectionReconnect() { 554 this.connectionService.execute(new Runnable() { 555 @Override 556 public void run() { 557 try { 558 doInitializeConnection(true); 559 } catch (Exception e) { 560 LOG.error("Failed to initialize local connection for the JMSConnector", e); 561 } 562 } 563 }); 564 } 565 566 private void scheduleAsyncForeignConnectionReconnect() { 567 this.connectionService.execute(new Runnable() { 568 @Override 569 public void run() { 570 try { 571 doInitializeConnection(false); 572 } catch (Exception e) { 573 LOG.error("Failed to initialize foreign connection for the JMSConnector", e); 574 } 575 } 576 }); 577 } 578 579 private void doInitializeConnection(boolean local) throws Exception { 580 581 ThreadPoolExecutor connectionService = this.connectionService; 582 int attempt = 0; 583 584 final int maxRetries; 585 if (local) { 586 maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 587 policy.getMaxReconnectAttempts(); 588 } else { 589 maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() : 590 policy.getMaxReconnectAttempts(); 591 } 592 593 do { 594 if (attempt > 0) { 595 try { 596 Thread.sleep(policy.getNextDelay(attempt)); 597 } catch(InterruptedException e) { 598 } 599 } 600 601 if (connectionService.isTerminating()) { 602 return; 603 } 604 605 try { 606 607 if (local) { 608 initializeLocalConnection(); 609 localSideInitialized.set(true); 610 } else { 611 initializeForeignConnection(); 612 foreignSideInitialized.set(true); 613 } 614 615 // Once we are connected we ensure all the bridges are started. 616 if (localConnection.get() != null && foreignConnection.get() != null) { 617 for (DestinationBridge bridge : inboundBridges) { 618 bridge.start(); 619 } 620 for (DestinationBridge bridge : outboundBridges) { 621 bridge.start(); 622 } 623 } 624 625 return; 626 } catch(Exception e) { 627 LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e); 628 } 629 } 630 while ((maxRetries == INFINITE || maxRetries > ++attempt) && !connectionService.isShutdown()); 631 632 this.failed.set(true); 633 } 634 635 private final ThreadFactory factory = new ThreadFactory() { 636 @Override 637 public Thread newThread(Runnable runnable) { 638 Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); 639 thread.setDaemon(true); 640 return thread; 641 } 642 }; 643 644 private ThreadPoolExecutor createExecutor() { 645 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory); 646 exec.allowCoreThreadTimeOut(true); 647 return exec; 648 } 649}