001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020 021import org.apache.activemq.transport.Transport; 022import org.apache.activemq.transport.TransportFactory; 023import org.apache.activemq.util.ServiceStopper; 024 025/** 026 * A network connector which uses some kind of multicast-like transport that 027 * communicates with potentially many remote brokers over a single logical 028 * {@link Transport} instance such as when using multicast. 029 * 030 * This implementation does not depend on multicast at all; any other group 031 * based transport could be used. 032 * 033 * @org.apache.xbean.XBean 034 * 035 */ 036public class MulticastNetworkConnector extends NetworkConnector { 037 038 private Transport localTransport; 039 private Transport remoteTransport; 040 private URI remoteURI; 041 private DemandForwardingBridgeSupport bridge; 042 043 public MulticastNetworkConnector() { 044 } 045 046 public MulticastNetworkConnector(URI remoteURI) { 047 this.remoteURI = remoteURI; 048 } 049 050 // Properties 051 // ------------------------------------------------------------------------- 052 053 public DemandForwardingBridgeSupport getBridge() { 054 return bridge; 055 } 056 057 public void setBridge(DemandForwardingBridgeSupport bridge) { 058 this.bridge = bridge; 059 } 060 061 public Transport getLocalTransport() { 062 return localTransport; 063 } 064 065 public void setLocalTransport(Transport localTransport) { 066 this.localTransport = localTransport; 067 } 068 069 public Transport getRemoteTransport() { 070 return remoteTransport; 071 } 072 073 /** 074 * Sets the remote transport implementation 075 */ 076 public void setRemoteTransport(Transport remoteTransport) { 077 this.remoteTransport = remoteTransport; 078 } 079 080 public URI getRemoteURI() { 081 return remoteURI; 082 } 083 084 /** 085 * Sets the remote transport URI to some group transport like 086 * <code>multicast://address:port</code> 087 */ 088 public void setRemoteURI(URI remoteURI) { 089 this.remoteURI = remoteURI; 090 } 091 092 // Implementation methods 093 // ------------------------------------------------------------------------- 094 095 protected void handleStart() throws Exception { 096 if (remoteTransport == null) { 097 if (remoteURI == null) { 098 throw new IllegalArgumentException("You must specify the remoteURI property"); 099 } 100 remoteTransport = TransportFactory.connect(remoteURI); 101 } 102 103 if (localTransport == null) { 104 localTransport = createLocalTransport(); 105 } 106 107 bridge = createBridge(localTransport, remoteTransport); 108 configureBridge(bridge); 109 bridge.start(); 110 111 // we need to start the transports after we've created the bridge 112 remoteTransport.start(); 113 localTransport.start(); 114 115 super.handleStart(); 116 } 117 118 protected void handleStop(ServiceStopper stopper) throws Exception { 119 super.handleStop(stopper); 120 if (bridge != null) { 121 try { 122 bridge.stop(); 123 } catch (Exception e) { 124 stopper.onException(this, e); 125 } 126 } 127 if (remoteTransport != null) { 128 try { 129 remoteTransport.stop(); 130 } catch (Exception e) { 131 stopper.onException(this, e); 132 } 133 } 134 if (localTransport != null) { 135 try { 136 localTransport.stop(); 137 } catch (Exception e) { 138 stopper.onException(this, e); 139 } 140 } 141 } 142 143 @Override 144 public String toString() { 145 return getClass().getName() + ":" + getName() + "[" + remoteTransport.toString() + "]"; 146 } 147 148 protected DemandForwardingBridgeSupport createBridge(Transport local, Transport remote) { 149 CompositeDemandForwardingBridge bridge = new CompositeDemandForwardingBridge(this, local, remote); 150 bridge.setBrokerService(getBrokerService()); 151 return bridge; 152 } 153}