001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Session;
024import javax.jms.Topic;
025import javax.jms.TopicConnection;
026import javax.jms.TopicConnectionFactory;
027import javax.jms.TopicSession;
028import javax.naming.NamingException;
029
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * A Bridge to other JMS Topic providers
035 */
036public class SimpleJmsTopicConnector extends JmsConnector {
037    private static final Logger LOG = LoggerFactory.getLogger(SimpleJmsTopicConnector.class);
038    private String outboundTopicConnectionFactoryName;
039    private String localConnectionFactoryName;
040    private TopicConnectionFactory outboundTopicConnectionFactory;
041    private TopicConnectionFactory localTopicConnectionFactory;
042    private InboundTopicBridge[] inboundTopicBridges;
043    private OutboundTopicBridge[] outboundTopicBridges;
044
045    /**
046     * @return Returns the inboundTopicBridges.
047     */
048    public InboundTopicBridge[] getInboundTopicBridges() {
049        return inboundTopicBridges;
050    }
051
052    /**
053     * @param inboundTopicBridges The inboundTopicBridges to set.
054     */
055    public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges) {
056        this.inboundTopicBridges = inboundTopicBridges;
057    }
058
059    /**
060     * @return Returns the outboundTopicBridges.
061     */
062    public OutboundTopicBridge[] getOutboundTopicBridges() {
063        return outboundTopicBridges;
064    }
065
066    /**
067     * @param outboundTopicBridges The outboundTopicBridges to set.
068     */
069    public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges) {
070        this.outboundTopicBridges = outboundTopicBridges;
071    }
072
073    /**
074     * @return Returns the localTopicConnectionFactory.
075     */
076    public TopicConnectionFactory getLocalTopicConnectionFactory() {
077        return localTopicConnectionFactory;
078    }
079
080    /**
081     * @param localTopicConnectionFactory The localTopicConnectionFactory to set.
082     */
083    public void setLocalTopicConnectionFactory(TopicConnectionFactory localTopicConnectionFactory) {
084        this.localTopicConnectionFactory = localTopicConnectionFactory;
085    }
086
087    /**
088     * @return Returns the outboundTopicConnectionFactory.
089     */
090    public TopicConnectionFactory getOutboundTopicConnectionFactory() {
091        return outboundTopicConnectionFactory;
092    }
093
094    /**
095     * @return Returns the outboundTopicConnectionFactoryName.
096     */
097    public String getOutboundTopicConnectionFactoryName() {
098        return outboundTopicConnectionFactoryName;
099    }
100
101    /**
102     * @param foreignTopicConnectionFactoryName The foreignTopicConnectionFactoryName to set.
103     */
104    public void setOutboundTopicConnectionFactoryName(String foreignTopicConnectionFactoryName) {
105        this.outboundTopicConnectionFactoryName = foreignTopicConnectionFactoryName;
106    }
107
108    /**
109     * @return Returns the localConnectionFactoryName.
110     */
111    public String getLocalConnectionFactoryName() {
112        return localConnectionFactoryName;
113    }
114
115    /**
116     * @param localConnectionFactoryName The localConnectionFactoryName to set.
117     */
118    public void setLocalConnectionFactoryName(String localConnectionFactoryName) {
119        this.localConnectionFactoryName = localConnectionFactoryName;
120    }
121
122    /**
123     * @return Returns the localTopicConnection.
124     */
125    public TopicConnection getLocalTopicConnection() {
126        return (TopicConnection) localConnection.get();
127    }
128
129    /**
130     * @param localTopicConnection The localTopicConnection to set.
131     */
132    public void setLocalTopicConnection(TopicConnection localTopicConnection) {
133        this.localConnection.set(localTopicConnection);
134    }
135
136    /**
137     * @return Returns the outboundTopicConnection.
138     */
139    public TopicConnection getOutboundTopicConnection() {
140        return (TopicConnection) foreignConnection.get();
141    }
142
143    /**
144     * @param foreignTopicConnection The foreignTopicConnection to set.
145     */
146    public void setOutboundTopicConnection(TopicConnection foreignTopicConnection) {
147        this.foreignConnection.set(foreignTopicConnection);
148    }
149
150    /**
151     * @param foreignTopicConnectionFactory The foreignTopicConnectionFactory to set.
152     */
153    public void setOutboundTopicConnectionFactory(TopicConnectionFactory foreignTopicConnectionFactory) {
154        this.outboundTopicConnectionFactory = foreignTopicConnectionFactory;
155    }
156
157    @Override
158    protected void initializeForeignConnection() throws NamingException, JMSException {
159
160        final TopicConnection newConnection;
161
162        if (foreignConnection.get() == null) {
163            // get the connection factories
164            if (outboundTopicConnectionFactory == null) {
165                // look it up from JNDI
166                if (outboundTopicConnectionFactoryName != null) {
167                    outboundTopicConnectionFactory = jndiOutboundTemplate
168                        .lookup(outboundTopicConnectionFactoryName, TopicConnectionFactory.class);
169                    if (outboundUsername != null) {
170                        newConnection = outboundTopicConnectionFactory
171                            .createTopicConnection(outboundUsername, outboundPassword);
172                    } else {
173                        newConnection = outboundTopicConnectionFactory.createTopicConnection();
174                    }
175                } else {
176                    throw new JMSException("Cannot create foreignConnection - no information");
177                }
178            } else {
179                if (outboundUsername != null) {
180                    newConnection = outboundTopicConnectionFactory
181                        .createTopicConnection(outboundUsername, outboundPassword);
182                } else {
183                    newConnection = outboundTopicConnectionFactory.createTopicConnection();
184                }
185            }
186        } else {
187            // Clear if for now in case something goes wrong during the init.
188            newConnection = (TopicConnection) foreignConnection.getAndSet(null);
189        }
190
191        if (outboundClientId != null && outboundClientId.length() > 0) {
192            newConnection.setClientID(getOutboundClientId());
193        }
194        newConnection.start();
195
196        outboundMessageConvertor.setConnection(newConnection);
197
198        // Configure the bridges with the new Outbound connection.
199        initializeInboundDestinationBridgesOutboundSide(newConnection);
200        initializeOutboundDestinationBridgesOutboundSide(newConnection);
201
202        // Register for any async error notifications now so we can reset in the
203        // case where there's not a lot of activity and a connection drops.
204        newConnection.setExceptionListener(new ExceptionListener() {
205            @Override
206            public void onException(JMSException exception) {
207                handleConnectionFailure(newConnection);
208            }
209        });
210
211        // At this point all looks good, so this our current connection now.
212        foreignConnection.set(newConnection);
213    }
214
215    @Override
216    protected void initializeLocalConnection() throws NamingException, JMSException {
217
218        final TopicConnection newConnection;
219
220        if (localConnection.get() == null) {
221            // get the connection factories
222            if (localTopicConnectionFactory == null) {
223                if (embeddedConnectionFactory == null) {
224                    // look it up from JNDI
225                    if (localConnectionFactoryName != null) {
226                        localTopicConnectionFactory = jndiLocalTemplate
227                            .lookup(localConnectionFactoryName, TopicConnectionFactory.class);
228                        if (localUsername != null) {
229                            newConnection = localTopicConnectionFactory
230                                .createTopicConnection(localUsername, localPassword);
231                        } else {
232                            newConnection = localTopicConnectionFactory.createTopicConnection();
233                        }
234                    } else {
235                        throw new JMSException("Cannot create localConnection - no information");
236                    }
237                } else {
238                    newConnection = embeddedConnectionFactory.createTopicConnection();
239                }
240            } else {
241                if (localUsername != null) {
242                    newConnection = localTopicConnectionFactory.
243                            createTopicConnection(localUsername, localPassword);
244                } else {
245                    newConnection = localTopicConnectionFactory.createTopicConnection();
246                }
247            }
248
249        } else {
250            // Clear if for now in case something goes wrong during the init.
251            newConnection = (TopicConnection) localConnection.getAndSet(null);
252        }
253
254        if (localClientId != null && localClientId.length() > 0) {
255            newConnection.setClientID(getLocalClientId());
256        }
257        newConnection.start();
258
259        inboundMessageConvertor.setConnection(newConnection);
260
261        // Configure the bridges with the new Local connection.
262        initializeInboundDestinationBridgesLocalSide(newConnection);
263        initializeOutboundDestinationBridgesLocalSide(newConnection);
264
265        // Register for any async error notifications now so we can reset in the
266        // case where there's not a lot of activity and a connection drops.
267        newConnection.setExceptionListener(new ExceptionListener() {
268            @Override
269            public void onException(JMSException exception) {
270                handleConnectionFailure(newConnection);
271            }
272        });
273
274        // At this point all looks good, so this our current connection now.
275        localConnection.set(newConnection);
276    }
277
278    protected void initializeInboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
279        if (inboundTopicBridges != null) {
280            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
281
282            for (InboundTopicBridge bridge : inboundTopicBridges) {
283                String TopicName = bridge.getInboundTopicName();
284                Topic foreignTopic = createForeignTopic(outboundSession, TopicName);
285                bridge.setConsumer(null);
286                bridge.setConsumerTopic(foreignTopic);
287                bridge.setConsumerConnection(connection);
288                bridge.setJmsConnector(this);
289                addInboundBridge(bridge);
290            }
291            outboundSession.close();
292        }
293    }
294
295    protected void initializeInboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
296        if (inboundTopicBridges != null) {
297            TopicSession localSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
298
299            for (InboundTopicBridge bridge : inboundTopicBridges) {
300                String localTopicName = bridge.getLocalTopicName();
301                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
302                bridge.setProducerTopic(activemqTopic);
303                bridge.setProducerConnection(connection);
304                if (bridge.getJmsMessageConvertor() == null) {
305                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
306                }
307                bridge.setJmsConnector(this);
308                addInboundBridge(bridge);
309            }
310            localSession.close();
311        }
312    }
313
314    protected void initializeOutboundDestinationBridgesOutboundSide(TopicConnection connection) throws JMSException {
315        if (outboundTopicBridges != null) {
316            TopicSession outboundSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
317
318            for (OutboundTopicBridge bridge : outboundTopicBridges) {
319                String topicName = bridge.getOutboundTopicName();
320                Topic foreignTopic = createForeignTopic(outboundSession, topicName);
321                bridge.setProducerTopic(foreignTopic);
322                bridge.setProducerConnection(connection);
323                if (bridge.getJmsMessageConvertor() == null) {
324                    bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
325                }
326                bridge.setJmsConnector(this);
327                addOutboundBridge(bridge);
328            }
329            outboundSession.close();
330        }
331    }
332
333    protected void initializeOutboundDestinationBridgesLocalSide(TopicConnection connection) throws JMSException {
334        if (outboundTopicBridges != null) {
335            TopicSession localSession =
336                    connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
337
338            for (OutboundTopicBridge bridge : outboundTopicBridges) {
339                String localTopicName = bridge.getLocalTopicName();
340                Topic activemqTopic = createActiveMQTopic(localSession, localTopicName);
341                bridge.setConsumer(null);
342                bridge.setConsumerTopic(activemqTopic);
343                bridge.setConsumerConnection(connection);
344                bridge.setJmsConnector(this);
345                addOutboundBridge(bridge);
346            }
347            localSession.close();
348        }
349    }
350
351    @Override
352    protected Destination createReplyToBridge(Destination destination, Connection replyToProducerConnection,
353                                              Connection replyToConsumerConnection) {
354        Topic replyToProducerTopic = (Topic)destination;
355        boolean isInbound = replyToProducerConnection.equals(localConnection.get());
356
357        if (isInbound) {
358            InboundTopicBridge bridge = (InboundTopicBridge)replyToBridges.get(replyToProducerTopic);
359            if (bridge == null) {
360                bridge = new InboundTopicBridge() {
361                    @Override
362                    protected Destination processReplyToDestination(Destination destination) {
363                        return null;
364                    }
365                };
366                try {
367                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
368                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
369                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
370                    replyToConsumerSession.close();
371                    bridge.setConsumerTopic(replyToConsumerTopic);
372                    bridge.setProducerTopic(replyToProducerTopic);
373                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
374                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
375                    bridge.setDoHandleReplyTo(false);
376                    if (bridge.getJmsMessageConvertor() == null) {
377                        bridge.setJmsMessageConvertor(getInboundMessageConvertor());
378                    }
379                    bridge.setJmsConnector(this);
380                    bridge.start();
381                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
382                } catch (Exception e) {
383                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
384                    return null;
385                }
386                replyToBridges.put(replyToProducerTopic, bridge);
387            }
388            return bridge.getConsumerTopic();
389        } else {
390            OutboundTopicBridge bridge = (OutboundTopicBridge)replyToBridges.get(replyToProducerTopic);
391            if (bridge == null) {
392                bridge = new OutboundTopicBridge() {
393                    @Override
394                    protected Destination processReplyToDestination(Destination destination) {
395                        return null;
396                    }
397                };
398                try {
399                    TopicSession replyToConsumerSession = ((TopicConnection)replyToConsumerConnection)
400                        .createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
401                    Topic replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
402                    replyToConsumerSession.close();
403                    bridge.setConsumerTopic(replyToConsumerTopic);
404                    bridge.setProducerTopic(replyToProducerTopic);
405                    bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
406                    bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
407                    bridge.setDoHandleReplyTo(false);
408                    if (bridge.getJmsMessageConvertor() == null) {
409                        bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
410                    }
411                    bridge.setJmsConnector(this);
412                    bridge.start();
413                    LOG.info("Created replyTo bridge for {}", replyToProducerTopic);
414                } catch (Exception e) {
415                    LOG.error("Failed to create replyTo bridge for topic: {}", replyToProducerTopic, e);
416                    return null;
417                }
418                replyToBridges.put(replyToProducerTopic, bridge);
419            }
420            return bridge.getConsumerTopic();
421        }
422    }
423
424    protected Topic createActiveMQTopic(TopicSession session, String topicName) throws JMSException {
425        return session.createTopic(topicName);
426    }
427
428    protected Topic createForeignTopic(TopicSession session, String topicName) throws JMSException {
429        Topic result = null;
430
431        if (preferJndiDestinationLookup) {
432            try {
433                // look-up the Queue
434                result = jndiOutboundTemplate.lookup(topicName, Topic.class);
435            } catch (NamingException e) {
436                try {
437                    result = session.createTopic(topicName);
438                } catch (JMSException e1) {
439                    String errStr = "Failed to look-up or create Topic for name: " + topicName;
440                    LOG.error(errStr, e);
441                    JMSException jmsEx = new JMSException(errStr);
442                    jmsEx.setLinkedException(e1);
443                    throw jmsEx;
444                }
445            }
446        } else {
447            try {
448                result = session.createTopic(topicName);
449            } catch (JMSException e) {
450                // look-up the Topic
451                try {
452                    result = jndiOutboundTemplate.lookup(topicName, Topic.class);
453                } catch (NamingException e1) {
454                    String errStr = "Failed to look-up Topic for name: " + topicName;
455                    LOG.error(errStr, e);
456                    JMSException jmsEx = new JMSException(errStr);
457                    jmsEx.setLinkedException(e1);
458                    throw jmsEx;
459                }
460            }
461        }
462
463        return result;
464    }
465}