001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network.jms; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Queue; 024import javax.jms.QueueConnection; 025import javax.jms.QueueConnectionFactory; 026import javax.jms.QueueSession; 027import javax.jms.Session; 028import javax.naming.NamingException; 029 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 */ 035public class SimpleJmsQueueConnector extends JmsConnector { 036 private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsQueueConnector.class); 037 private String outboundQueueConnectionFactoryName; 038 private String localConnectionFactoryName; 039 private QueueConnectionFactory outboundQueueConnectionFactory; 040 private QueueConnectionFactory localQueueConnectionFactory; 041 private InboundQueueBridge[] inboundQueueBridges; 042 private OutboundQueueBridge[] outboundQueueBridges; 043 044 /** 045 * @return Returns the inboundQueueBridges. 046 */ 047 public InboundQueueBridge[] getInboundQueueBridges() { 048 return inboundQueueBridges; 049 } 050 051 /** 052 * @param inboundQueueBridges The inboundQueueBridges to set. 053 */ 054 public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges) { 055 this.inboundQueueBridges = inboundQueueBridges; 056 } 057 058 /** 059 * @return Returns the outboundQueueBridges. 060 */ 061 public OutboundQueueBridge[] getOutboundQueueBridges() { 062 return outboundQueueBridges; 063 } 064 065 /** 066 * @param outboundQueueBridges The outboundQueueBridges to set. 067 */ 068 public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges) { 069 this.outboundQueueBridges = outboundQueueBridges; 070 } 071 072 /** 073 * @return Returns the localQueueConnectionFactory. 074 */ 075 public QueueConnectionFactory getLocalQueueConnectionFactory() { 076 return localQueueConnectionFactory; 077 } 078 079 /** 080 * @param localConnectionFactory The localQueueConnectionFactory to 081 * set. 082 */ 083 public void setLocalQueueConnectionFactory(QueueConnectionFactory localConnectionFactory) { 084 this.localQueueConnectionFactory = localConnectionFactory; 085 } 086 087 /** 088 * @return Returns the outboundQueueConnectionFactory. 089 */ 090 public QueueConnectionFactory getOutboundQueueConnectionFactory() { 091 return outboundQueueConnectionFactory; 092 } 093 094 /** 095 * @return Returns the outboundQueueConnectionFactoryName. 096 */ 097 public String getOutboundQueueConnectionFactoryName() { 098 return outboundQueueConnectionFactoryName; 099 } 100 101 /** 102 * @param foreignQueueConnectionFactoryName The 103 * foreignQueueConnectionFactoryName to set. 104 */ 105 public void setOutboundQueueConnectionFactoryName(String foreignQueueConnectionFactoryName) { 106 this.outboundQueueConnectionFactoryName = foreignQueueConnectionFactoryName; 107 } 108 109 /** 110 * @return Returns the localConnectionFactoryName. 111 */ 112 public String getLocalConnectionFactoryName() { 113 return localConnectionFactoryName; 114 } 115 116 /** 117 * @param localConnectionFactoryName The localConnectionFactoryName to set. 118 */ 119 public void setLocalConnectionFactoryName(String localConnectionFactoryName) { 120 this.localConnectionFactoryName = localConnectionFactoryName; 121 } 122 123 /** 124 * @return Returns the localQueueConnection. 125 */ 126 public QueueConnection getLocalQueueConnection() { 127 return (QueueConnection) localConnection.get(); 128 } 129 130 /** 131 * @param localQueueConnection The localQueueConnection to set. 132 */ 133 public void setLocalQueueConnection(QueueConnection localQueueConnection) { 134 this.localConnection.set(localQueueConnection); 135 } 136 137 /** 138 * @return Returns the outboundQueueConnection. 139 */ 140 public QueueConnection getOutboundQueueConnection() { 141 return (QueueConnection) foreignConnection.get(); 142 } 143 144 /** 145 * @param foreignQueueConnection The foreignQueueConnection to set. 146 */ 147 public void setOutboundQueueConnection(QueueConnection foreignQueueConnection) { 148 this.foreignConnection.set(foreignQueueConnection); 149 } 150 151 /** 152 * @param foreignQueueConnectionFactory The foreignQueueConnectionFactory to set. 153 */ 154 public void setOutboundQueueConnectionFactory(QueueConnectionFactory foreignQueueConnectionFactory) { 155 this.outboundQueueConnectionFactory = foreignQueueConnectionFactory; 156 } 157 158 @Override 159 protected void initializeForeignConnection() throws NamingException, JMSException { 160 161 final QueueConnection newConnection; 162 163 if (foreignConnection.get() == null) { 164 // get the connection factories 165 if (outboundQueueConnectionFactory == null) { 166 // look it up from JNDI 167 if (outboundQueueConnectionFactoryName != null) { 168 outboundQueueConnectionFactory = jndiOutboundTemplate 169 .lookup(outboundQueueConnectionFactoryName, QueueConnectionFactory.class); 170 if (outboundUsername != null) { 171 newConnection = outboundQueueConnectionFactory 172 .createQueueConnection(outboundUsername, outboundPassword); 173 } else { 174 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 175 } 176 } else { 177 throw new JMSException("Cannot create foreignConnection - no information"); 178 } 179 } else { 180 if (outboundUsername != null) { 181 newConnection = outboundQueueConnectionFactory 182 .createQueueConnection(outboundUsername, outboundPassword); 183 } else { 184 newConnection = outboundQueueConnectionFactory.createQueueConnection(); 185 } 186 } 187 } else { 188 // Clear if for now in case something goes wrong during the init. 189 newConnection = (QueueConnection) foreignConnection.getAndSet(null); 190 } 191 192 if (outboundClientId != null && outboundClientId.length() > 0) { 193 newConnection.setClientID(getOutboundClientId()); 194 } 195 newConnection.start(); 196 197 outboundMessageConvertor.setConnection(newConnection); 198 199 // Configure the bridges with the new Outbound connection. 200 initializeInboundDestinationBridgesOutboundSide(newConnection); 201 initializeOutboundDestinationBridgesOutboundSide(newConnection); 202 203 // Register for any async error notifications now so we can reset in the 204 // case where there's not a lot of activity and a connection drops. 205 newConnection.setExceptionListener(new ExceptionListener() { 206 @Override 207 public void onException(JMSException exception) { 208 handleConnectionFailure(newConnection); 209 } 210 }); 211 212 // At this point all looks good, so this our current connection now. 213 foreignConnection.set(newConnection); 214 } 215 216 @Override 217 protected void initializeLocalConnection() throws NamingException, JMSException { 218 219 final QueueConnection newConnection; 220 221 if (localConnection.get() == null) { 222 // get the connection factories 223 if (localQueueConnectionFactory == null) { 224 if (embeddedConnectionFactory == null) { 225 // look it up from JNDI 226 if (localConnectionFactoryName != null) { 227 localQueueConnectionFactory = jndiLocalTemplate 228 .lookup(localConnectionFactoryName, QueueConnectionFactory.class); 229 if (localUsername != null) { 230 newConnection = localQueueConnectionFactory 231 .createQueueConnection(localUsername, localPassword); 232 } else { 233 newConnection = localQueueConnectionFactory.createQueueConnection(); 234 } 235 } else { 236 throw new JMSException("Cannot create localConnection - no information"); 237 } 238 } else { 239 newConnection = embeddedConnectionFactory.createQueueConnection(); 240 } 241 } else { 242 if (localUsername != null) { 243 newConnection = localQueueConnectionFactory. 244 createQueueConnection(localUsername, localPassword); 245 } else { 246 newConnection = localQueueConnectionFactory.createQueueConnection(); 247 } 248 } 249 250 } else { 251 // Clear if for now in case something goes wrong during the init. 252 newConnection = (QueueConnection) localConnection.getAndSet(null); 253 } 254 255 if (localClientId != null && localClientId.length() > 0) { 256 newConnection.setClientID(getLocalClientId()); 257 } 258 newConnection.start(); 259 260 inboundMessageConvertor.setConnection(newConnection); 261 262 // Configure the bridges with the new Local connection. 263 initializeInboundDestinationBridgesLocalSide(newConnection); 264 initializeOutboundDestinationBridgesLocalSide(newConnection); 265 266 // Register for any async error notifications now so we can reset in the 267 // case where there's not a lot of activity and a connection drops. 268 newConnection.setExceptionListener(new ExceptionListener() { 269 @Override 270 public void onException(JMSException exception) { 271 handleConnectionFailure(newConnection); 272 } 273 }); 274 275 // At this point all looks good, so this our current connection now. 276 localConnection.set(newConnection); 277 } 278 279 protected void initializeInboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 280 if (inboundQueueBridges != null) { 281 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 282 283 for (InboundQueueBridge bridge : inboundQueueBridges) { 284 String queueName = bridge.getInboundQueueName(); 285 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 286 bridge.setConsumer(null); 287 bridge.setConsumerQueue(foreignQueue); 288 bridge.setConsumerConnection(connection); 289 bridge.setJmsConnector(this); 290 addInboundBridge(bridge); 291 } 292 outboundSession.close(); 293 } 294 } 295 296 protected void initializeInboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 297 if (inboundQueueBridges != null) { 298 QueueSession localSession = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); 299 300 for (InboundQueueBridge bridge : inboundQueueBridges) { 301 String localQueueName = bridge.getLocalQueueName(); 302 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 303 bridge.setProducerQueue(activemqQueue); 304 bridge.setProducerConnection(connection); 305 if (bridge.getJmsMessageConvertor() == null) { 306 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 307 } 308 bridge.setJmsConnector(this); 309 addInboundBridge(bridge); 310 } 311 localSession.close(); 312 } 313 } 314 315 protected void initializeOutboundDestinationBridgesOutboundSide(QueueConnection connection) throws JMSException { 316 if (outboundQueueBridges != null) { 317 QueueSession outboundSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 318 319 for (OutboundQueueBridge bridge : outboundQueueBridges) { 320 String queueName = bridge.getOutboundQueueName(); 321 Queue foreignQueue = createForeignQueue(outboundSession, queueName); 322 bridge.setProducerQueue(foreignQueue); 323 bridge.setProducerConnection(connection); 324 if (bridge.getJmsMessageConvertor() == null) { 325 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 326 } 327 bridge.setJmsConnector(this); 328 addOutboundBridge(bridge); 329 } 330 outboundSession.close(); 331 } 332 } 333 334 protected void initializeOutboundDestinationBridgesLocalSide(QueueConnection connection) throws JMSException { 335 if (outboundQueueBridges != null) { 336 QueueSession localSession = 337 connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 338 339 for (OutboundQueueBridge bridge : outboundQueueBridges) { 340 String localQueueName = bridge.getLocalQueueName(); 341 Queue activemqQueue = createActiveMQQueue(localSession, localQueueName); 342 bridge.setConsumer(null); 343 bridge.setConsumerQueue(activemqQueue); 344 bridge.setConsumerConnection(connection); 345 bridge.setJmsConnector(this); 346 addOutboundBridge(bridge); 347 } 348 localSession.close(); 349 } 350 } 351 352 @Override 353 protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection, 354 Connection replyToConsumerConnection) { 355 Queue replyToProducerQueue = (Queue)destination; 356 boolean isInbound = replyToProducerConnection.equals(localConnection.get()); 357 358 if (isInbound) { 359 InboundQueueBridge bridge = (InboundQueueBridge)replyToBridges.get(replyToProducerQueue); 360 if (bridge == null) { 361 bridge = new InboundQueueBridge() { 362 @Override 363 protected Destination processReplyToDestination(Destination destination) { 364 return null; 365 } 366 }; 367 try { 368 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 369 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 370 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 371 replyToConsumerSession.close(); 372 bridge.setConsumerQueue(replyToConsumerQueue); 373 bridge.setProducerQueue(replyToProducerQueue); 374 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 375 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 376 bridge.setDoHandleReplyTo(false); 377 if (bridge.getJmsMessageConvertor() == null) { 378 bridge.setJmsMessageConvertor(getInboundMessageConvertor()); 379 } 380 bridge.setJmsConnector(this); 381 bridge.start(); 382 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 383 } catch (Exception e) { 384 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 385 return null; 386 } 387 replyToBridges.put(replyToProducerQueue, bridge); 388 } 389 return bridge.getConsumerQueue(); 390 } else { 391 OutboundQueueBridge bridge = (OutboundQueueBridge)replyToBridges.get(replyToProducerQueue); 392 if (bridge == null) { 393 bridge = new OutboundQueueBridge() { 394 @Override 395 protected Destination processReplyToDestination(Destination destination) { 396 return null; 397 } 398 }; 399 try { 400 QueueSession replyToConsumerSession = ((QueueConnection)replyToConsumerConnection) 401 .createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 402 Queue replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue(); 403 replyToConsumerSession.close(); 404 bridge.setConsumerQueue(replyToConsumerQueue); 405 bridge.setProducerQueue(replyToProducerQueue); 406 bridge.setProducerConnection((QueueConnection)replyToProducerConnection); 407 bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection); 408 bridge.setDoHandleReplyTo(false); 409 if (bridge.getJmsMessageConvertor() == null) { 410 bridge.setJmsMessageConvertor(getOutboundMessageConvertor()); 411 } 412 bridge.setJmsConnector(this); 413 bridge.start(); 414 LOG.info("Created replyTo bridge for {}", replyToProducerQueue); 415 } catch (Exception e) { 416 LOG.error("Failed to create replyTo bridge for queue: {}", replyToProducerQueue, e); 417 return null; 418 } 419 replyToBridges.put(replyToProducerQueue, bridge); 420 } 421 return bridge.getConsumerQueue(); 422 } 423 } 424 425 protected Queue createActiveMQQueue(QueueSession session, String queueName) throws JMSException { 426 return session.createQueue(queueName); 427 } 428 429 protected Queue createForeignQueue(QueueSession session, String queueName) throws JMSException { 430 Queue result = null; 431 432 if (preferJndiDestinationLookup) { 433 try { 434 // look-up the Queue 435 result = jndiOutboundTemplate.lookup(queueName, Queue.class); 436 } catch (NamingException e) { 437 try { 438 result = session.createQueue(queueName); 439 } catch (JMSException e1) { 440 String errStr = "Failed to look-up or create Queue for name: " + queueName; 441 LOG.error(errStr, e); 442 JMSException jmsEx = new JMSException(errStr); 443 jmsEx.setLinkedException(e1); 444 throw jmsEx; 445 } 446 } 447 } else { 448 try { 449 result = session.createQueue(queueName); 450 } catch (JMSException e) { 451 // look-up the Queue 452 try { 453 result = jndiOutboundTemplate.lookup(queueName, Queue.class); 454 } catch (NamingException e1) { 455 String errStr = "Failed to look-up Queue for name: " + queueName; 456 LOG.error(errStr, e); 457 JMSException jmsEx = new JMSException(errStr); 458 jmsEx.setLinkedException(e1); 459 throw jmsEx; 460 } 461 } 462 } 463 464 return result; 465 } 466}