|
@@ -67,6 +67,8 @@ public class DtaListener {
|
|
|
}
|
|
|
|
|
|
public static MessageConsumer getMessageConsumer(TdaConfig tdaConfig) {
|
|
|
+
|
|
|
+ MessageConsumer consumer = null;
|
|
|
try {
|
|
|
AmqpClientOptions options = AmqpClientOptions.builder()
|
|
|
.host(tdaConfig.getEndPoint())
|
|
@@ -77,11 +79,18 @@ public class DtaListener {
|
|
|
.build();
|
|
|
AmqpClient amqpClient = new AmqpClient(options);
|
|
|
amqpClient.initialize();
|
|
|
- MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
|
|
|
+ consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
|
|
|
// 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
|
|
|
consumer.setMessageListener(DtaListener::handleData);
|
|
|
return consumer;
|
|
|
} catch (Exception e) {
|
|
|
+ try {
|
|
|
+ if (consumer != null) {
|
|
|
+ consumer.close();
|
|
|
+ }
|
|
|
+ } catch (JMSException ex) {
|
|
|
+ log.error("关闭失败", ex);
|
|
|
+ }
|
|
|
log.error("链接失败,tdaConfig:{}", tdaConfig.getId(), e);
|
|
|
throw new ServiceException("连接iot失败");
|
|
|
}
|