|
@@ -0,0 +1,252 @@
|
|
|
+package com.fjhx.iot.listener;
|
|
|
+
|
|
|
+import cn.hutool.core.convert.Convert;
|
|
|
+import cn.hutool.extra.spring.SpringUtil;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
|
|
+import com.fjhx.common.constant.SourceConstant;
|
|
|
+import com.fjhx.iot.constants.RedisConstant;
|
|
|
+import com.fjhx.iot.entity.tda.dto.MessageBody;
|
|
|
+import com.fjhx.iot.entity.tda.dto.OpcUaData;
|
|
|
+import com.fjhx.iot.entity.tda.dto.UpperComputerData;
|
|
|
+import com.fjhx.iot.entity.tda.dto.ZSJDto;
|
|
|
+import com.fjhx.iot.entity.tda.po.TdaConfig;
|
|
|
+import com.fjhx.iot.enums.ProductEnum;
|
|
|
+import com.fjhx.iot.enums.UpperComputerEnum;
|
|
|
+import com.fjhx.iot.service.tda.TdaConfigService;
|
|
|
+import com.fjhx.iot.utils.amqp.AmqpClient;
|
|
|
+import com.fjhx.iot.utils.amqp.AmqpClientOptions;
|
|
|
+import com.fjhx.iot.utils.amqp.AmqpConstants;
|
|
|
+import com.ruoyi.common.core.redis.RedisCache;
|
|
|
+import com.ruoyi.common.exception.ServiceException;
|
|
|
+import com.ruoyi.framework.mybatis.holder.TenantHolder;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.jms.JMSException;
|
|
|
+import javax.jms.Message;
|
|
|
+import javax.jms.MessageConsumer;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.math.RoundingMode;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * iot数据推送监听器
|
|
|
+ *
|
|
|
+ * @author zlj
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class DtaListener {
|
|
|
+
|
|
|
+ private static final RedisCache redisCache = SpringUtil.getBean(RedisCache.class);
|
|
|
+
|
|
|
+ private static final TdaConfigService tdaConfigService = SpringUtil.getBean(TdaConfigService.class);
|
|
|
+
|
|
|
+ public static final Map<Long, MessageConsumer> listenerMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void listener() {
|
|
|
+
|
|
|
+ TenantHolder.setIgnore(true);
|
|
|
+ DynamicDataSourceContextHolder.push(SourceConstant.IOT);
|
|
|
+
|
|
|
+ List<TdaConfig> list = tdaConfigService.list();
|
|
|
+
|
|
|
+ for (TdaConfig tdaConfig : list) {
|
|
|
+ MessageConsumer consumer = getMessageConsumer(tdaConfig);
|
|
|
+ listenerMap.put(tdaConfig.getId(), consumer);
|
|
|
+ }
|
|
|
+
|
|
|
+ DynamicDataSourceContextHolder.clear();
|
|
|
+ TenantHolder.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ public static MessageConsumer getMessageConsumer(TdaConfig tdaConfig) {
|
|
|
+ try {
|
|
|
+ AmqpClientOptions options = AmqpClientOptions.builder()
|
|
|
+ .host(tdaConfig.getEndPoint())
|
|
|
+ .port(AmqpConstants.PORT)
|
|
|
+ .accessKey(tdaConfig.getAccessKey())
|
|
|
+ .accessCode(tdaConfig.getAccessCode())
|
|
|
+ .queuePrefetch(1000)
|
|
|
+ .build();
|
|
|
+ AmqpClient amqpClient = new AmqpClient(options);
|
|
|
+ amqpClient.initialize();
|
|
|
+ MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
|
|
|
+ // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
|
|
|
+ consumer.setMessageListener(DtaListener::handleData);
|
|
|
+ return consumer;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("链接失败,tdaConfig:{}", tdaConfig.getId(), e);
|
|
|
+ throw new ServiceException("连接iot失败");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理推送数据
|
|
|
+ * <p>
|
|
|
+ * remarks: 添加成功必须手动 message.acknowledge();
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @param message 推送消息
|
|
|
+ */
|
|
|
+ public static void handleData(Message message) {
|
|
|
+ try {
|
|
|
+ MessageBody messageBody = JSONObject.parseObject(message.getBody(String.class), MessageBody.class);
|
|
|
+ if (messageBody == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ MessageBody.NotifyData notifyData = messageBody.getNotifyData();
|
|
|
+ if (notifyData == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ MessageBody.Header header = notifyData.getHeader();
|
|
|
+ if (header == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String productId = header.getProductId();
|
|
|
+ if (productId == null || productId.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ MessageBody.Body body = notifyData.getBody();
|
|
|
+ if (body == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<MessageBody.Service> services = body.getServices();
|
|
|
+ if (services == null || services.size() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ ProductEnum productEnum = ProductEnum.getEnumByProductId(productId);
|
|
|
+ if (productEnum != null) {
|
|
|
+ appointProductHandle(header.getDeviceId(), message, services);
|
|
|
+ } else {
|
|
|
+ upperComputerHandle(message, services);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.error(JSONObject.toJSONString(messageBody));
|
|
|
+
|
|
|
+ } catch (JMSException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 指定设备处理方式
|
|
|
+ *
|
|
|
+ * @param deviceId 设备id
|
|
|
+ * @param message message
|
|
|
+ * @param services 设备消息列表
|
|
|
+ */
|
|
|
+ private static void appointProductHandle(String deviceId, Message message, List<MessageBody.Service> services) {
|
|
|
+ String redisKey = RedisConstant.DEVICE_PREFIX + deviceId;
|
|
|
+
|
|
|
+ MessageBody.Service oldService = redisCache.getCacheObject(redisKey);
|
|
|
+ MessageBody.Service newService = services.get(0);
|
|
|
+
|
|
|
+ if (oldService == null || newService.getEventTime().compareTo(oldService.getEventTime()) > 0) {
|
|
|
+ redisCache.setCacheObject(redisKey, newService);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ message.acknowledge();
|
|
|
+ } catch (JMSException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 上位机连接设备处理
|
|
|
+ */
|
|
|
+ private static void upperComputerHandle(Message message, List<MessageBody.Service> services) {
|
|
|
+ MessageBody.Service service = services.get(0);
|
|
|
+
|
|
|
+ UpperComputerData newData = service.getProperties().toJavaObject(UpperComputerData.class);
|
|
|
+
|
|
|
+ UpperComputerEnum upperComputerEnum = UpperComputerEnum.getUpperComputerEnum(newData.getType());
|
|
|
+
|
|
|
+ if (upperComputerEnum == UpperComputerEnum.ZSJ) {
|
|
|
+ handleZSJ(message, newData);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String redisKey = RedisConstant.DEVICE_PREFIX + newData.getEquipmentNo();
|
|
|
+ UpperComputerData oldData = redisCache.getCacheObject(redisKey);
|
|
|
+
|
|
|
+ if (oldData == null || newData.getCreateTime().compareTo(oldData.getCreateTime()) > 0) {
|
|
|
+ redisCache.setCacheObject(redisKey, newData);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ message.acknowledge();
|
|
|
+ } catch (JMSException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 注塑机解析
|
|
|
+ */
|
|
|
+ private static void handleZSJ(Message message, UpperComputerData upperComputerData) {
|
|
|
+
|
|
|
+ log.error("注塑机推送数据:{}", JSONObject.toJSONString(upperComputerData));
|
|
|
+
|
|
|
+ String redisKey = RedisConstant.DEVICE_PREFIX + upperComputerData.getEquipmentNo();
|
|
|
+ ZSJDto oldZsjDto = redisCache.getCacheObject(redisKey);
|
|
|
+ if (oldZsjDto != null && upperComputerData.getCreateTime().compareTo(oldZsjDto.getDate()) > 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ ZSJDto zsjDto = createZSJDto(upperComputerData);
|
|
|
+ redisCache.setCacheObject(redisKey, zsjDto);
|
|
|
+
|
|
|
+ try {
|
|
|
+ message.acknowledge();
|
|
|
+ } catch (JMSException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static ZSJDto createZSJDto(UpperComputerData upperComputerData) {
|
|
|
+ List<OpcUaData> opcUaDataList = Convert.toList(OpcUaData.class, upperComputerData.getData());
|
|
|
+
|
|
|
+ Map<String, String> collect = opcUaDataList.stream().collect(Collectors.toMap(
|
|
|
+ OpcUaData::getId,
|
|
|
+ OpcUaData::getNs
|
|
|
+ ));
|
|
|
+
|
|
|
+ ZSJDto zsjDto = new ZSJDto();
|
|
|
+ zsjDto.setDate(upperComputerData.getCreateTime());
|
|
|
+ zsjDto.setTmOnlineState(Convert.toInt(collect.get("tmOnlineState")));
|
|
|
+ zsjDto.setTmOperateMode(Convert.toInt(collect.get("tmOperateMode")));
|
|
|
+ zsjDto.setTmMotorState(Convert.toInt(collect.get("tmMotorState")));
|
|
|
+ zsjDto.setTmHeatState(Convert.toInt(collect.get("tmHeatState")));
|
|
|
+ zsjDto.setTmTempOilCurrent(Convert.toBigDecimal(collect.get("tmTempOil_Current")));
|
|
|
+
|
|
|
+ BigDecimal tmTempCurrent = Convert.toBigDecimal(collect.get("tmTemp1_Current"))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp2_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp3_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp4_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp5_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp6_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp7_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp8_Current")))
|
|
|
+ .add(Convert.toBigDecimal(collect.get("tmTemp9_Current")))
|
|
|
+ .divide(new BigDecimal("9"), 1, RoundingMode.HALF_UP);
|
|
|
+ zsjDto.setTmTempCurrent(tmTempCurrent);
|
|
|
+ zsjDto.setTmChargePressB1(Convert.toBigDecimal(collect.get("tmChargePressB1")));
|
|
|
+ zsjDto.setTmChargePressB2(Convert.toBigDecimal(collect.get("tmChargePressB2")));
|
|
|
+ zsjDto.setTmChargePressB3(Convert.toBigDecimal(collect.get("tmChargePressB3")));
|
|
|
+ zsjDto.setTmChargePressB4(Convert.toBigDecimal(collect.get("tmChargePressB4")));
|
|
|
+ zsjDto.setTmChargeSpeedB1(Convert.toBigDecimal(collect.get("tmChargeSpeedB1")));
|
|
|
+ zsjDto.setTmChargeSpeedB2(Convert.toBigDecimal(collect.get("tmChargeSpeedB2")));
|
|
|
+ zsjDto.setTmChargeSpeedB3(Convert.toBigDecimal(collect.get("tmChargeSpeedB3")));
|
|
|
+ zsjDto.setTmChargeSpeedB4(Convert.toBigDecimal(collect.get("tmChargeSpeedB4")));
|
|
|
+ return zsjDto;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|