|
@@ -1,25 +1,33 @@
|
|
package com.fjhx.iot.listener;
|
|
package com.fjhx.iot.listener;
|
|
|
|
|
|
-import cn.hutool.extra.spring.SpringUtil;
|
|
|
|
|
|
+import cn.hutool.core.io.IoUtil;
|
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
|
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
|
import com.fjhx.common.constant.SourceConstant;
|
|
import com.fjhx.common.constant.SourceConstant;
|
|
|
|
+import com.fjhx.iot.constants.RedisConstant;
|
|
|
|
+import com.fjhx.iot.entity.tda.dto.EbcDto;
|
|
import com.fjhx.iot.entity.tda.dto.MessageBody;
|
|
import com.fjhx.iot.entity.tda.dto.MessageBody;
|
|
import com.fjhx.iot.entity.tda.po.TdaConfig;
|
|
import com.fjhx.iot.entity.tda.po.TdaConfig;
|
|
|
|
+import com.fjhx.iot.entity.tda.po.TdaProduct;
|
|
import com.fjhx.iot.service.tda.TdaConfigService;
|
|
import com.fjhx.iot.service.tda.TdaConfigService;
|
|
|
|
+import com.fjhx.iot.service.tda.TdaProductService;
|
|
import com.fjhx.iot.utils.amqp.AmqpClient;
|
|
import com.fjhx.iot.utils.amqp.AmqpClient;
|
|
import com.fjhx.iot.utils.amqp.AmqpClientOptions;
|
|
import com.fjhx.iot.utils.amqp.AmqpClientOptions;
|
|
-import com.fjhx.iot.utils.amqp.AmqpConstants;
|
|
|
|
|
|
+import com.fjhx.iot.utils.amqp.AmqpConstant;
|
|
import com.ruoyi.common.core.redis.RedisCache;
|
|
import com.ruoyi.common.core.redis.RedisCache;
|
|
import com.ruoyi.common.exception.ServiceException;
|
|
import com.ruoyi.common.exception.ServiceException;
|
|
import com.ruoyi.framework.mybatis.holder.TenantHolder;
|
|
import com.ruoyi.framework.mybatis.holder.TenantHolder;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
import javax.annotation.PostConstruct;
|
|
import javax.jms.JMSException;
|
|
import javax.jms.JMSException;
|
|
import javax.jms.Message;
|
|
import javax.jms.Message;
|
|
import javax.jms.MessageConsumer;
|
|
import javax.jms.MessageConsumer;
|
|
|
|
+import java.util.Date;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -33,9 +41,14 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
@Component
|
|
@Component
|
|
public class DtaListener {
|
|
public class DtaListener {
|
|
|
|
|
|
- private static final RedisCache redisCache = SpringUtil.getBean(RedisCache.class);
|
|
|
|
|
|
+ @Autowired
|
|
|
|
+ private RedisCache redisCache;
|
|
|
|
|
|
- private static final TdaConfigService tdaConfigService = SpringUtil.getBean(TdaConfigService.class);
|
|
|
|
|
|
+ @Autowired
|
|
|
|
+ private TdaConfigService tdaConfigService;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private TdaProductService tdaProductService;
|
|
|
|
|
|
public static final Map<Long, MessageConsumer> listenerMap = new ConcurrentHashMap<>();
|
|
public static final Map<Long, MessageConsumer> listenerMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
@@ -57,32 +70,20 @@ public class DtaListener {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- public static MessageConsumer getMessageConsumer(TdaConfig tdaConfig) {
|
|
|
|
|
|
+ public MessageConsumer getMessageConsumer(TdaConfig tdaConfig) {
|
|
|
|
|
|
MessageConsumer consumer = null;
|
|
MessageConsumer consumer = null;
|
|
try {
|
|
try {
|
|
- AmqpClientOptions options = AmqpClientOptions.builder()
|
|
|
|
- .host(tdaConfig.getEndPoint())
|
|
|
|
- .port(AmqpConstants.PORT)
|
|
|
|
- .accessKey(tdaConfig.getAccessKey())
|
|
|
|
- .accessCode(tdaConfig.getAccessCode())
|
|
|
|
- .queuePrefetch(1000)
|
|
|
|
- .build();
|
|
|
|
|
|
+ AmqpClientOptions options = AmqpClientOptions.builder().host(tdaConfig.getEndPoint()).port(AmqpConstant.PORT).accessKey(tdaConfig.getAccessKey()).accessCode(tdaConfig.getAccessCode()).queuePrefetch(1000).build();
|
|
AmqpClient amqpClient = new AmqpClient(options);
|
|
AmqpClient amqpClient = new AmqpClient(options);
|
|
amqpClient.initialize();
|
|
amqpClient.initialize();
|
|
- consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
|
|
|
|
|
|
+ consumer = amqpClient.newConsumer(AmqpConstant.DEFAULT_QUEUE);
|
|
// 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
|
|
// 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
|
|
- consumer.setMessageListener(DtaListener::handleData);
|
|
|
|
|
|
+ consumer.setMessageListener(this::handleData);
|
|
return consumer;
|
|
return consumer;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- try {
|
|
|
|
- if (consumer != null) {
|
|
|
|
- consumer.close();
|
|
|
|
- }
|
|
|
|
- } catch (JMSException ex) {
|
|
|
|
- log.error("关闭失败", ex);
|
|
|
|
- }
|
|
|
|
- log.error("链接失败,tdaConfig:{}", tdaConfig.getId(), e);
|
|
|
|
|
|
+ IoUtil.close(consumer);
|
|
|
|
+ log.error("链接失败,tdaConfigId:{}", tdaConfig.getId(), e);
|
|
throw new ServiceException("连接iot失败");
|
|
throw new ServiceException("连接iot失败");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -95,12 +96,14 @@ public class DtaListener {
|
|
*
|
|
*
|
|
* @param message 推送消息
|
|
* @param message 推送消息
|
|
*/
|
|
*/
|
|
- public static void handleData(Message message) {
|
|
|
|
|
|
+ public void handleData(Message message) {
|
|
try {
|
|
try {
|
|
|
|
|
|
String messageBodyStr = message.getBody(String.class);
|
|
String messageBodyStr = message.getBody(String.class);
|
|
MessageBody messageBody = JSONObject.parseObject(messageBodyStr, MessageBody.class);
|
|
MessageBody messageBody = JSONObject.parseObject(messageBodyStr, MessageBody.class);
|
|
|
|
+
|
|
if (messageBody == null) {
|
|
if (messageBody == null) {
|
|
|
|
+ log.error(" messageBody == null ");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -112,14 +115,14 @@ public class DtaListener {
|
|
reportHandel(messageBody);
|
|
reportHandel(messageBody);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
- log.error("未知事件类型:{}\r\nmessageBody:{}", event, messageBodyStr);
|
|
|
|
|
|
+ log.error("未知事件类型:{}\r\n messageBody:{}", event, messageBodyStr);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
message.acknowledge();
|
|
message.acknowledge();
|
|
|
|
|
|
} catch (JMSException e) {
|
|
} catch (JMSException e) {
|
|
- e.printStackTrace();
|
|
|
|
|
|
+ log.error("iot监听发生未知异常", e);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -127,44 +130,100 @@ public class DtaListener {
|
|
/**
|
|
/**
|
|
* 处理报告数据
|
|
* 处理报告数据
|
|
*/
|
|
*/
|
|
- private static void reportHandel(MessageBody messageBody) {
|
|
|
|
|
|
+ private void reportHandel(MessageBody messageBody) {
|
|
|
|
|
|
MessageBody.NotifyData notifyData = messageBody.getNotifyData();
|
|
MessageBody.NotifyData notifyData = messageBody.getNotifyData();
|
|
|
|
|
|
if (notifyData == null) {
|
|
if (notifyData == null) {
|
|
- log.error(" notifyData == null , messageBody = {}", messageBody);
|
|
|
|
|
|
+ log.error(" notifyData == null , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
MessageBody.Header header = notifyData.getHeader();
|
|
MessageBody.Header header = notifyData.getHeader();
|
|
if (header == null) {
|
|
if (header == null) {
|
|
- log.error(" header == null , messageBody = {}", messageBody);
|
|
|
|
|
|
+ log.error(" header == null , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
MessageBody.Body body = notifyData.getBody();
|
|
MessageBody.Body body = notifyData.getBody();
|
|
if (body == null) {
|
|
if (body == null) {
|
|
- log.error(" body == null , messageBody = {}", messageBody);
|
|
|
|
|
|
+ log.error(" body == null , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
List<MessageBody.Service> services = body.getServices();
|
|
List<MessageBody.Service> services = body.getServices();
|
|
if (services == null || services.size() == 0) {
|
|
if (services == null || services.size() == 0) {
|
|
- log.error(" services == null || services.size() == 0 , messageBody = {}", messageBody);
|
|
|
|
|
|
+ log.error(" services == null || services.size() == 0 , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // 数据列表
|
|
|
|
+ MessageBody.Service service = services.get(0);
|
|
|
|
|
|
|
|
+ // 产品id
|
|
String productId = header.getProductId();
|
|
String productId = header.getProductId();
|
|
|
|
+ if (StrUtil.isBlank(productId)) {
|
|
|
|
+ log.error(" StrUtil.isBlank(productId) , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ // 设备id
|
|
String deviceId = header.getDeviceId();
|
|
String deviceId = header.getDeviceId();
|
|
|
|
|
|
- MessageBody.Service service = services.get(0);
|
|
|
|
|
|
+ // 数据推送时间
|
|
|
|
+ Date eventTime = messageBody.getEventTime();
|
|
|
|
|
|
|
|
+ // 实际生产数据
|
|
|
|
+ JSONObject properties = service.getProperties();
|
|
|
|
|
|
- System.out.println(service);
|
|
|
|
|
|
+ TenantHolder.setIgnore(true);
|
|
|
|
+ DynamicDataSourceContextHolder.push(SourceConstant.IOT);
|
|
|
|
+
|
|
|
|
+ TdaProduct tdaProduct = tdaProductService.getOne(q -> q.eq(TdaProduct::getProductId, productId));
|
|
|
|
+ if (tdaProduct == null) {
|
|
|
|
+ log.error(" tdaProduct == null , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String deviceType = tdaProduct.getDeviceType();
|
|
|
|
+ if (StrUtil.isBlank(deviceType)) {
|
|
|
|
+ log.error(" StrUtil.isBlank(deviceType) , messageBody = {} ", JSON.toJSONString(messageBody));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ switch (deviceType) {
|
|
|
|
+ case "ebc":
|
|
|
|
+ doEbc(productId, deviceId, eventTime, properties.toJavaObject(EbcDto.class));
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ log.error("未知 deviceType:{}", deviceType);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ DynamicDataSourceContextHolder.clear();
|
|
|
|
+ TenantHolder.clear();
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 经编机
|
|
|
|
+ *
|
|
|
|
+ * @param productId 产品id
|
|
|
|
+ * @param deviceId 设备id
|
|
|
|
+ * @param eventTime 推送时间
|
|
|
|
+ * @param ebcDto 推送数据
|
|
|
|
+ */
|
|
|
|
+ private void doEbc(String productId, String deviceId, Date eventTime, EbcDto ebcDto) {
|
|
|
|
+
|
|
|
|
+ EbcDto oldEbcDto = redisCache.getCacheObject(RedisConstant.DEVICE_PREFIX + deviceId);
|
|
|
|
+
|
|
|
|
+ if (oldEbcDto != null && eventTime.before(oldEbcDto.getEventTime())) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ebcDto.setDeviceId(deviceId);
|
|
|
|
+ ebcDto.setProductId(productId);
|
|
|
|
+ ebcDto.setEventTime(eventTime);
|
|
|
|
+ redisCache.setCacheObject(RedisConstant.DEVICE_PREFIX + deviceId, ebcDto);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
|
|
}
|
|
}
|