001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.zip.DataFormatException; 025import java.util.zip.Inflater; 026 027import javax.jms.Destination; 028import javax.jms.InvalidClientIDException; 029import javax.jms.JMSException; 030import javax.jms.Message; 031import javax.security.auth.login.CredentialException; 032 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 036import org.apache.activemq.command.ActiveMQBytesMessage; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionError; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConnectionInfo; 045import org.apache.activemq.command.ExceptionResponse; 046import org.apache.activemq.command.MessageAck; 047import org.apache.activemq.command.MessageDispatch; 048import org.apache.activemq.command.MessageId; 049import org.apache.activemq.command.ProducerId; 050import org.apache.activemq.command.ProducerInfo; 051import org.apache.activemq.command.Response; 052import org.apache.activemq.command.SessionId; 053import org.apache.activemq.command.SessionInfo; 054import org.apache.activemq.command.ShutdownInfo; 055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy; 056import org.apache.activemq.util.ByteArrayOutputStream; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.FactoryFinder; 059import org.apache.activemq.util.IOExceptionSupport; 060import org.apache.activemq.util.IdGenerator; 061import org.apache.activemq.util.JMSExceptionSupport; 062import org.apache.activemq.util.LRUCache; 063import org.apache.activemq.util.LongSequenceGenerator; 064import org.fusesource.hawtbuf.Buffer; 065import org.fusesource.hawtbuf.UTF8Buffer; 066import org.fusesource.mqtt.client.QoS; 067import org.fusesource.mqtt.client.Topic; 068import org.fusesource.mqtt.codec.CONNACK; 069import org.fusesource.mqtt.codec.CONNECT; 070import org.fusesource.mqtt.codec.DISCONNECT; 071import org.fusesource.mqtt.codec.MQTTFrame; 072import org.fusesource.mqtt.codec.PINGREQ; 073import org.fusesource.mqtt.codec.PINGRESP; 074import org.fusesource.mqtt.codec.PUBACK; 075import org.fusesource.mqtt.codec.PUBCOMP; 076import org.fusesource.mqtt.codec.PUBLISH; 077import org.fusesource.mqtt.codec.PUBREC; 078import org.fusesource.mqtt.codec.PUBREL; 079import org.fusesource.mqtt.codec.SUBACK; 080import org.fusesource.mqtt.codec.SUBSCRIBE; 081import org.fusesource.mqtt.codec.UNSUBACK; 082import org.fusesource.mqtt.codec.UNSUBSCRIBE; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086public class MQTTProtocolConverter { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); 089 090 public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; 091 public static final int V3_1 = 3; 092 public static final int V3_1_1 = 4; 093 094 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 095 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); 096 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; 097 static final int DEFAULT_CACHE_SIZE = 5000; 098 099 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 100 private final SessionId sessionId = new SessionId(connectionId, -1); 101 private final ProducerId producerId = new ProducerId(sessionId, 1); 102 private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); 103 104 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 105 private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); 106 private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); 107 108 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); 109 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); 110 111 private final MQTTTransport mqttTransport; 112 private final BrokerService brokerService; 113 114 private final Object commnadIdMutex = new Object(); 115 private int lastCommandId; 116 private final AtomicBoolean connected = new AtomicBoolean(false); 117 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 118 private CONNECT connect; 119 private String clientId; 120 private long defaultKeepAlive; 121 private int activeMQSubscriptionPrefetch = -1; 122 private final MQTTPacketIdGenerator packetIdGenerator; 123 private boolean publishDollarTopics; 124 125 public int version; 126 127 private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); 128 129 /* 130 * Subscription strategy configuration element. 131 * > mqtt-default-subscriptions 132 * > mqtt-virtual-topic-subscriptions 133 */ 134 private String subscriptionStrategyName = "mqtt-default-subscriptions"; 135 private MQTTSubscriptionStrategy subsciptionStrategy; 136 137 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { 138 this.mqttTransport = mqttTransport; 139 this.brokerService = brokerService; 140 this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); 141 this.defaultKeepAlive = 0; 142 } 143 144 int generateCommandId() { 145 synchronized (commnadIdMutex) { 146 return lastCommandId++; 147 } 148 } 149 150 public void sendToActiveMQ(Command command, ResponseHandler handler) { 151 152 // Lets intercept message send requests.. 153 if (command instanceof ActiveMQMessage) { 154 ActiveMQMessage msg = (ActiveMQMessage) command; 155 try { 156 if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { 157 // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 158 // specification requirements for system assigned destinations. 159 if (handler != null) { 160 try { 161 handler.onResponse(this, new Response()); 162 } catch (IOException e) { 163 LOG.warn("Failed to send command " + command, e); 164 } 165 } 166 return; 167 } 168 } catch (IOException e) { 169 LOG.warn("Failed to send command " + command, e); 170 } 171 } 172 173 command.setCommandId(generateCommandId()); 174 if (handler != null) { 175 command.setResponseRequired(true); 176 resposeHandlers.put(command.getCommandId(), handler); 177 } 178 getMQTTTransport().sendToActiveMQ(command); 179 } 180 181 void sendToMQTT(MQTTFrame frame) { 182 try { 183 mqttTransport.sendToMQTT(frame); 184 } catch (IOException e) { 185 LOG.warn("Failed to send frame " + frame, e); 186 } 187 } 188 189 /** 190 * Convert a MQTT command 191 */ 192 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { 193 switch (frame.messageType()) { 194 case PINGREQ.TYPE: 195 LOG.debug("Received a ping from client: " + getClientId()); 196 sendToMQTT(PING_RESP_FRAME); 197 LOG.debug("Sent Ping Response to " + getClientId()); 198 break; 199 case CONNECT.TYPE: 200 CONNECT connect = new CONNECT().decode(frame); 201 onMQTTConnect(connect); 202 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version()); 203 break; 204 case DISCONNECT.TYPE: 205 LOG.debug("MQTT Client {} disconnecting", getClientId()); 206 onMQTTDisconnect(); 207 break; 208 case SUBSCRIBE.TYPE: 209 onSubscribe(new SUBSCRIBE().decode(frame)); 210 break; 211 case UNSUBSCRIBE.TYPE: 212 onUnSubscribe(new UNSUBSCRIBE().decode(frame)); 213 break; 214 case PUBLISH.TYPE: 215 onMQTTPublish(new PUBLISH().decode(frame)); 216 break; 217 case PUBACK.TYPE: 218 onMQTTPubAck(new PUBACK().decode(frame)); 219 break; 220 case PUBREC.TYPE: 221 onMQTTPubRec(new PUBREC().decode(frame)); 222 break; 223 case PUBREL.TYPE: 224 onMQTTPubRel(new PUBREL().decode(frame)); 225 break; 226 case PUBCOMP.TYPE: 227 onMQTTPubComp(new PUBCOMP().decode(frame)); 228 break; 229 default: 230 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); 231 } 232 } 233 234 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { 235 if (connected.get()) { 236 throw new MQTTProtocolException("Already connected."); 237 } 238 this.connect = connect; 239 240 String clientId = ""; 241 if (connect.clientId() != null) { 242 clientId = connect.clientId().toString(); 243 } 244 245 String userName = null; 246 if (connect.userName() != null) { 247 userName = connect.userName().toString(); 248 } 249 String passswd = null; 250 if (connect.password() != null) { 251 passswd = connect.password().toString(); 252 } 253 254 version = connect.version(); 255 256 configureInactivityMonitor(connect.keepAlive()); 257 258 connectionInfo.setConnectionId(connectionId); 259 if (clientId != null && !clientId.isEmpty()) { 260 connectionInfo.setClientId(clientId); 261 } else { 262 // Clean Session MUST be set for 0 length Client Id 263 if (!connect.cleanSession()) { 264 CONNACK ack = new CONNACK(); 265 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 266 try { 267 getMQTTTransport().sendToMQTT(ack.encode()); 268 getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null)); 269 } catch (IOException e) { 270 getMQTTTransport().onException(IOExceptionSupport.create(e)); 271 } 272 return; 273 } 274 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 275 } 276 277 connectionInfo.setResponseRequired(true); 278 connectionInfo.setUserName(userName); 279 connectionInfo.setPassword(passswd); 280 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); 281 282 sendToActiveMQ(connectionInfo, new ResponseHandler() { 283 @Override 284 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 285 286 if (response.isException()) { 287 // If the connection attempt fails we close the socket. 288 Throwable exception = ((ExceptionResponse) response).getException(); 289 //let the client know 290 CONNACK ack = new CONNACK(); 291 if (exception instanceof InvalidClientIDException) { 292 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 293 } else if (exception instanceof SecurityException) { 294 ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); 295 } else if (exception instanceof CredentialException) { 296 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 297 } else { 298 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); 299 } 300 getMQTTTransport().sendToMQTT(ack.encode()); 301 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 302 return; 303 } 304 305 final SessionInfo sessionInfo = new SessionInfo(sessionId); 306 sendToActiveMQ(sessionInfo, null); 307 308 final ProducerInfo producerInfo = new ProducerInfo(producerId); 309 sendToActiveMQ(producerInfo, new ResponseHandler() { 310 @Override 311 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 312 313 if (response.isException()) { 314 // If the connection attempt fails we close the socket. 315 Throwable exception = ((ExceptionResponse) response).getException(); 316 CONNACK ack = new CONNACK(); 317 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 318 getMQTTTransport().sendToMQTT(ack.encode()); 319 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 320 return; 321 } 322 323 CONNACK ack = new CONNACK(); 324 ack.code(CONNACK.Code.CONNECTION_ACCEPTED); 325 connected.set(true); 326 getMQTTTransport().sendToMQTT(ack.encode()); 327 328 if (connect.cleanSession()) { 329 packetIdGenerator.stopClientSession(getClientId()); 330 } else { 331 packetIdGenerator.startClientSession(getClientId()); 332 } 333 334 findSubscriptionStrategy().onConnect(connect); 335 } 336 }); 337 } 338 }); 339 } 340 341 void onMQTTDisconnect() throws MQTTProtocolException { 342 if (connected.get()) { 343 connected.set(false); 344 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 345 sendToActiveMQ(new ShutdownInfo(), null); 346 } 347 stopTransport(); 348 } 349 350 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { 351 checkConnected(); 352 LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", 353 command.messageId(), clientId, connectionInfo.getConnectionId()); 354 Topic[] topics = command.topics(); 355 if (topics != null) { 356 byte[] qos = new byte[topics.length]; 357 for (int i = 0; i < topics.length; i++) { 358 try { 359 qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); 360 } catch (IOException e) { 361 throw new MQTTProtocolException("Failed to process subscription request", true, e); 362 } 363 } 364 SUBACK ack = new SUBACK(); 365 ack.messageId(command.messageId()); 366 ack.grantedQos(qos); 367 try { 368 getMQTTTransport().sendToMQTT(ack.encode()); 369 } catch (IOException e) { 370 LOG.warn("Couldn't send SUBACK for " + command, e); 371 } 372 } else { 373 LOG.warn("No topics defined for Subscription " + command); 374 } 375 } 376 377 public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { 378 checkConnected(); 379 if (command.qos() != QoS.AT_LEAST_ONCE && (version != V3_1 || publishDollarTopics != true)) { 380 throw new MQTTProtocolException("Failed to process unsubscribe request", true, new Exception("UNSUBSCRIBE frame not properly formatted, QoS")); 381 } 382 UTF8Buffer[] topics = command.topics(); 383 if (topics != null) { 384 for (UTF8Buffer topic : topics) { 385 try { 386 findSubscriptionStrategy().onUnSubscribe(topic.toString()); 387 } catch (IOException e) { 388 throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); 389 } 390 } 391 } 392 UNSUBACK ack = new UNSUBACK(); 393 ack.messageId(command.messageId()); 394 sendToMQTT(ack.encode()); 395 } 396 397 /** 398 * Dispatch an ActiveMQ command 399 */ 400 public void onActiveMQCommand(Command command) throws Exception { 401 if (command.isResponse()) { 402 Response response = (Response) command; 403 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 404 if (rh != null) { 405 rh.onResponse(this, response); 406 } else { 407 // Pass down any unexpected errors. Should this close the connection? 408 if (response.isException()) { 409 Throwable exception = ((ExceptionResponse) response).getException(); 410 handleException(exception, null); 411 } 412 } 413 } else if (command.isMessageDispatch()) { 414 MessageDispatch md = (MessageDispatch) command; 415 MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); 416 if (sub != null) { 417 MessageAck ack = sub.createMessageAck(md); 418 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); 419 switch (publish.qos()) { 420 case AT_LEAST_ONCE: 421 case EXACTLY_ONCE: 422 publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); 423 case AT_MOST_ONCE: 424 } 425 if (ack != null && sub.expectAck(publish)) { 426 synchronized (consumerAcks) { 427 consumerAcks.put(publish.messageId(), ack); 428 } 429 } 430 LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", 431 publish.messageId(), clientId, connectionInfo.getConnectionId()); 432 getMQTTTransport().sendToMQTT(publish.encode()); 433 if (ack != null && !sub.expectAck(publish)) { 434 getMQTTTransport().sendToActiveMQ(ack); 435 } 436 } 437 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 438 // Pass down any unexpected async errors. Should this close the connection? 439 Throwable exception = ((ConnectionError) command).getException(); 440 handleException(exception, null); 441 } else if (command.isBrokerInfo()) { 442 //ignore 443 } else { 444 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 445 } 446 } 447 448 void onMQTTPublish(PUBLISH command) throws IOException, JMSException { 449 checkConnected(); 450 LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", 451 command.messageId(), clientId, connectionInfo.getConnectionId()); 452 ActiveMQMessage message = convertMessage(command); 453 message.setProducerId(producerId); 454 message.onSend(); 455 sendToActiveMQ(message, createResponseHandler(command)); 456 } 457 458 void onMQTTPubAck(PUBACK command) { 459 short messageId = command.messageId(); 460 LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", 461 messageId, clientId, connectionInfo.getConnectionId()); 462 packetIdGenerator.ackPacketId(getClientId(), messageId); 463 MessageAck ack; 464 synchronized (consumerAcks) { 465 ack = consumerAcks.remove(messageId); 466 } 467 if (ack != null) { 468 getMQTTTransport().sendToActiveMQ(ack); 469 } 470 } 471 472 void onMQTTPubRec(PUBREC commnand) { 473 //from a subscriber - send a PUBREL in response 474 PUBREL pubrel = new PUBREL(); 475 pubrel.messageId(commnand.messageId()); 476 sendToMQTT(pubrel.encode()); 477 } 478 479 void onMQTTPubRel(PUBREL command) { 480 PUBREC ack; 481 synchronized (publisherRecs) { 482 ack = publisherRecs.remove(command.messageId()); 483 } 484 if (ack == null) { 485 LOG.warn("Unknown PUBREL: {} received", command.messageId()); 486 } 487 PUBCOMP pubcomp = new PUBCOMP(); 488 pubcomp.messageId(command.messageId()); 489 sendToMQTT(pubcomp.encode()); 490 } 491 492 void onMQTTPubComp(PUBCOMP command) { 493 short messageId = command.messageId(); 494 packetIdGenerator.ackPacketId(getClientId(), messageId); 495 MessageAck ack; 496 synchronized (consumerAcks) { 497 ack = consumerAcks.remove(messageId); 498 } 499 if (ack != null) { 500 getMQTTTransport().sendToActiveMQ(ack); 501 } 502 } 503 504 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { 505 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 506 507 msg.setProducerId(producerId); 508 MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); 509 msg.setMessageId(id); 510 LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 511 command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); 512 msg.setTimestamp(System.currentTimeMillis()); 513 msg.setPriority((byte) Message.DEFAULT_PRIORITY); 514 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); 515 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); 516 if (command.retain()) { 517 msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); 518 } 519 520 ActiveMQDestination destination; 521 synchronized (activeMQDestinationMap) { 522 destination = activeMQDestinationMap.get(command.topicName()); 523 if (destination == null) { 524 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); 525 try { 526 destination = findSubscriptionStrategy().onSend(topicName); 527 } catch (IOException e) { 528 throw JMSExceptionSupport.create(e); 529 } 530 531 activeMQDestinationMap.put(command.topicName().toString(), destination); 532 } 533 } 534 535 msg.setJMSDestination(destination); 536 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); 537 return msg; 538 } 539 540 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { 541 PUBLISH result = new PUBLISH(); 542 // packet id is set in MQTTSubscription 543 QoS qoS; 544 if (message.propertyExists(QOS_PROPERTY_NAME)) { 545 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); 546 qoS = QoS.values()[ordinal]; 547 548 } else { 549 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; 550 } 551 result.qos(qoS); 552 if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { 553 result.retain(true); 554 } 555 556 String topicName; 557 synchronized (mqttTopicMap) { 558 topicName = mqttTopicMap.get(message.getJMSDestination()); 559 if (topicName == null) { 560 String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); 561 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); 562 mqttTopicMap.put(message.getJMSDestination(), topicName); 563 } 564 } 565 result.topicName(new UTF8Buffer(topicName)); 566 567 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 568 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); 569 msg.setReadOnlyBody(true); 570 String messageText = msg.getText(); 571 if (messageText != null) { 572 result.payload(new Buffer(messageText.getBytes("UTF-8"))); 573 } 574 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 575 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); 576 msg.setReadOnlyBody(true); 577 byte[] data = new byte[(int) msg.getBodyLength()]; 578 msg.readBytes(data); 579 result.payload(new Buffer(data)); 580 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 581 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 582 msg.setReadOnlyBody(true); 583 Map<String, Object> map = msg.getContentMap(); 584 if (map != null) { 585 result.payload(new Buffer(map.toString().getBytes("UTF-8"))); 586 } 587 } else { 588 ByteSequence byteSequence = message.getContent(); 589 if (byteSequence != null && byteSequence.getLength() > 0) { 590 if (message.isCompressed()) { 591 Inflater inflater = new Inflater(); 592 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); 593 byte[] data = new byte[4096]; 594 int read; 595 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 596 while ((read = inflater.inflate(data)) != 0) { 597 bytesOut.write(data, 0, read); 598 } 599 byteSequence = bytesOut.toByteSequence(); 600 bytesOut.close(); 601 } 602 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); 603 } 604 } 605 LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 606 result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); 607 return result; 608 } 609 610 public MQTTTransport getMQTTTransport() { 611 return mqttTransport; 612 } 613 614 boolean willSent = false; 615 public void onTransportError() { 616 if (connect != null) { 617 if (connected.get()) { 618 if (connect.willTopic() != null && connect.willMessage() != null && !willSent) { 619 willSent = true; 620 try { 621 PUBLISH publish = new PUBLISH(); 622 publish.topicName(connect.willTopic()); 623 publish.qos(connect.willQos()); 624 publish.messageId(packetIdGenerator.getNextSequenceId(getClientId())); 625 publish.payload(connect.willMessage()); 626 publish.retain(connect.willRetain()); 627 ActiveMQMessage message = convertMessage(publish); 628 message.setProducerId(producerId); 629 message.onSend(); 630 631 sendToActiveMQ(message, null); 632 } catch (Exception e) { 633 LOG.warn("Failed to publish Will Message " + connect.willMessage()); 634 } 635 } 636 // remove connection info 637 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 638 } 639 } 640 } 641 642 void configureInactivityMonitor(short keepAliveSeconds) { 643 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); 644 645 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, 646 // then ignore configuring it because it won't exist 647 if (monitor == null) { 648 return; 649 } 650 651 // Client has sent a valid CONNECT frame, we can stop the connect checker. 652 monitor.stopConnectChecker(); 653 654 long keepAliveMS = keepAliveSeconds * 1000; 655 656 LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS); 657 658 try { 659 // if we have a default keep-alive value, and the client is trying to turn off keep-alive, 660 661 // we'll observe the server-side configured default value (note, no grace period) 662 if (keepAliveMS == 0 && defaultKeepAlive > 0) { 663 keepAliveMS = defaultKeepAlive; 664 } 665 666 long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); 667 668 monitor.setProtocolConverter(this); 669 monitor.setReadKeepAliveTime(keepAliveMS); 670 monitor.setReadGraceTime(readGracePeriod); 671 monitor.startReadChecker(); 672 673 LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)", 674 new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod }); 675 } catch (Exception ex) { 676 LOG.warn("Failed to start MQTT InactivityMonitor ", ex); 677 } 678 } 679 680 void handleException(Throwable exception, MQTTFrame command) { 681 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 682 LOG.debug("Exception detail", exception); 683 684 if (connected.get() && connectionInfo != null) { 685 connected.set(false); 686 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 687 } 688 stopTransport(); 689 } 690 691 void checkConnected() throws MQTTProtocolException { 692 if (!connected.get()) { 693 throw new MQTTProtocolException("Not connected."); 694 } 695 } 696 697 private void stopTransport() { 698 try { 699 getMQTTTransport().stop(); 700 } catch (Throwable e) { 701 LOG.debug("Failed to stop MQTT transport ", e); 702 } 703 } 704 705 ResponseHandler createResponseHandler(final PUBLISH command) { 706 if (command != null) { 707 return new ResponseHandler() { 708 @Override 709 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 710 if (response.isException()) { 711 Throwable error = ((ExceptionResponse) response).getException(); 712 LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage()); 713 LOG.trace("Error trace: {}", error); 714 } 715 716 switch (command.qos()) { 717 case AT_LEAST_ONCE: 718 PUBACK ack = new PUBACK(); 719 ack.messageId(command.messageId()); 720 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 721 command.messageId(), clientId, connectionInfo.getConnectionId()); 722 converter.getMQTTTransport().sendToMQTT(ack.encode()); 723 break; 724 case EXACTLY_ONCE: 725 PUBREC req = new PUBREC(); 726 req.messageId(command.messageId()); 727 synchronized (publisherRecs) { 728 publisherRecs.put(command.messageId(), req); 729 } 730 LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}", 731 command.messageId(), clientId, connectionInfo.getConnectionId()); 732 converter.getMQTTTransport().sendToMQTT(req.encode()); 733 break; 734 default: 735 break; 736 } 737 } 738 }; 739 } 740 return null; 741 } 742 743 public long getDefaultKeepAlive() { 744 return defaultKeepAlive; 745 } 746 747 /** 748 * Set the default keep alive time (in milliseconds) that would be used if configured on server side 749 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame 750 * @param keepAlive the keepAlive in milliseconds 751 */ 752 public void setDefaultKeepAlive(long keepAlive) { 753 this.defaultKeepAlive = keepAlive; 754 } 755 756 public int getActiveMQSubscriptionPrefetch() { 757 return activeMQSubscriptionPrefetch; 758 } 759 760 /** 761 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 762 * The default = 1 763 * 764 * @param activeMQSubscriptionPrefetch 765 * set the prefetch for the corresponding ActiveMQ subscription 766 */ 767 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 768 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; 769 } 770 771 public MQTTPacketIdGenerator getPacketIdGenerator() { 772 return packetIdGenerator; 773 } 774 775 public void setPublishDollarTopics(boolean publishDollarTopics) { 776 this.publishDollarTopics = publishDollarTopics; 777 } 778 779 public boolean getPublishDollarTopics() { 780 return publishDollarTopics; 781 } 782 783 public ConnectionId getConnectionId() { 784 return connectionId; 785 } 786 787 public SessionId getSessionId() { 788 return sessionId; 789 } 790 791 public boolean isCleanSession() { 792 return this.connect.cleanSession(); 793 } 794 795 public String getSubscriptionStrategy() { 796 return subscriptionStrategyName; 797 } 798 799 public void setSubscriptionStrategy(String name) { 800 this.subscriptionStrategyName = name; 801 } 802 803 public String getClientId() { 804 if (clientId == null) { 805 if (connect != null && connect.clientId() != null) { 806 clientId = connect.clientId().toString(); 807 } else { 808 clientId = ""; 809 } 810 } 811 return clientId; 812 } 813 814 protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { 815 if (subsciptionStrategy == null) { 816 synchronized (STRATAGY_FINDER) { 817 if (subsciptionStrategy != null) { 818 return subsciptionStrategy; 819 } 820 821 MQTTSubscriptionStrategy strategy = null; 822 if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) { 823 try { 824 strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName); 825 LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName); 826 if (strategy instanceof BrokerServiceAware) { 827 ((BrokerServiceAware)strategy).setBrokerService(brokerService); 828 } 829 strategy.initialize(this); 830 } catch (Exception e) { 831 throw IOExceptionSupport.create(e); 832 } 833 } else { 834 throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName); 835 } 836 837 this.subsciptionStrategy = strategy; 838 } 839 } 840 return subsciptionStrategy; 841 } 842}