home 2 vuotta sitten
vanhempi
commit
76acd8815c

+ 50 - 0
hx-service-api/syringe-production-api/src/main/java/com/fjhx/enums/ProductEnum.java

@@ -0,0 +1,50 @@
+package com.fjhx.enums;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 设备id枚举
+ *
+ * @author zlj
+ */
+@Getter
+public enum ProductEnum {
+    // 经编机
+    JBJ("6350ae233ec34a6d03c89020", "经编机"),
+    // 验布机
+    YBJ("635897db3ec34a6d03c9616a", "验布机"),
+    ;
+
+    /**
+     * 设备id
+     */
+    private final String productId;
+    /**
+     * 设备名称
+     */
+    private final String productName;
+
+    ProductEnum(String productId, String productName) {
+        this.productId = productId;
+        this.productName = productName;
+    }
+
+    private static final Map<String, ProductEnum> CLASS_MAP = new HashMap<>();
+
+    static {
+        for (ProductEnum value : ProductEnum.values()) {
+            CLASS_MAP.put(value.getProductId(), value);
+        }
+    }
+
+    /**
+     * 根据设备id获取枚举
+     */
+    public static ProductEnum getEnumByProductId(String productId) {
+        return CLASS_MAP.get(productId);
+    }
+
+}

+ 50 - 0
hx-service-api/syringe-production-api/src/main/java/com/fjhx/params/IotData/MessageBody.java

@@ -0,0 +1,50 @@
+package com.fjhx.params.IotData;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 华为云推送数据
+ *
+ * @author zlj
+ */
+@Data
+public class MessageBody {
+
+    private String resource;
+    private String event;
+    private String requestId;
+    private String eventTime;
+    private NotifyData notifyData;
+
+    @Data
+    public static class NotifyData {
+        private Header header;
+        private Body body;
+    }
+
+    @Data
+    public static class Header {
+        private String deviceId;
+        private String productId;
+        private String appId;
+        private String gatewayId;
+        private String nodeId;
+    }
+
+    @Data
+    public static class Body {
+        private List<Services> services;
+    }
+
+    @Data
+    public static class Services {
+        private String serviceId;
+        private JSONObject properties;
+        private Date eventTime;
+    }
+
+}

+ 2 - 2
hx-service/syringe-production/src/main/java/com/fjhx/config/MybatisConfig.java

@@ -21,7 +21,7 @@ public class MybatisConfig {
     /**
      * 排除表集合
      */
-    private static final List<String> excludeTableName = Arrays.asList(
+    private static final List<String> EXCLUDE_TABLE_NAME = Arrays.asList(
             "amqp_data",
             "data_autoclaves"
     );
@@ -42,7 +42,7 @@ public class MybatisConfig {
 
             @Override
             public boolean ignoreTable(String tableName) {
-                return excludeTableName.contains(tableName);
+                return EXCLUDE_TABLE_NAME.contains(tableName);
             }
 
         }));

+ 94 - 0
hx-service/syringe-production/src/main/java/com/fjhx/listener/IotDataListener.java

@@ -0,0 +1,94 @@
+package com.fjhx.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.enums.ProductEnum;
+import com.fjhx.params.IotData.MessageBody;
+import com.fjhx.utils.amqp.AmqpClient;
+import com.fjhx.utils.amqp.AmqpClientOptions;
+import com.fjhx.utils.amqp.AmqpConstants;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import java.util.List;
+
+/**
+ * iot数据推送监听器
+ *
+ * @author zlj
+ */
+@Component
+public class IotDataListener {
+
+    @PostConstruct
+    public void listener() throws Exception {
+        AmqpClientOptions options = AmqpClientOptions.builder()
+                .host(AmqpConstants.HOST)
+                .port(AmqpConstants.PORT)
+                .accessKey(AmqpConstants.ACCESS_KEY)
+                .accessCode(AmqpConstants.ACCESS_CODE)
+                .queuePrefetch(1000)
+                .build();
+        AmqpClient amqpClient = new AmqpClient(options);
+        amqpClient.initialize();
+        MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
+        // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
+        consumer.setMessageListener(this::handleData);
+    }
+
+    /**
+     * 处理推送数据
+     * <p>
+     * remarks: 添加成功必须手动 message.acknowledge();
+     * </p>
+     *
+     * @param message 推送消息
+     */
+    public void handleData(Message message) {
+        try {
+            MessageBody messageBody = JSONObject.parseObject(message.getBody(String.class), MessageBody.class);
+            if (messageBody == null) {
+                return;
+            }
+            MessageBody.NotifyData notifyData = messageBody.getNotifyData();
+            if (notifyData == null) {
+                return;
+            }
+            MessageBody.Header header = notifyData.getHeader();
+            if (header == null) {
+                return;
+            }
+            String productId = header.getProductId();
+            if (productId == null || productId.isEmpty()) {
+                return;
+            }
+            MessageBody.Body body = notifyData.getBody();
+            if (body == null) {
+                return;
+            }
+            List<MessageBody.Services> services = body.getServices();
+            if (services == null || services.size() == 0) {
+                return;
+            }
+
+            switch (ProductEnum.getEnumByProductId(productId)) {
+                case JBJ:
+
+
+                    break;
+                case YBJ:
+
+                    break;
+                default:
+                    return;
+            }
+            System.err.println(JSONObject.toJSONString(messageBody));
+            message.acknowledge();
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 0 - 2
hx-service/syringe-production/src/main/java/com/fjhx/service/amqp/AmqpDataService.java

@@ -13,6 +13,4 @@ import com.fjhx.entity.amqp.AmqpData;
  */
 public interface AmqpDataService extends BaseService<AmqpData> {
 
-
-
 }

+ 0 - 51
hx-service/syringe-production/src/main/java/com/fjhx/service/amqp/impl/AmqpDataServiceImpl.java

@@ -2,9 +2,6 @@ package com.fjhx.service.amqp.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import com.fjhx.utils.amqp.AmqpClient;
-import com.fjhx.utils.amqp.AmqpClientOptions;
-import com.fjhx.utils.amqp.AmqpConstants;
 import com.fjhx.constants.IotProductionRedisPrefixConstant;
 import com.fjhx.entity.amqp.AmqpData;
 import com.fjhx.mapper.amqp.AmqpDataMapper;
@@ -13,11 +10,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.springblade.core.redis.cache.BladeRedis;
 import org.springframework.stereotype.Service;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageConsumer;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
@@ -38,50 +33,6 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
     @Resource
     private BladeRedis redisCache;
 
-    @PostConstruct
-    public void listener() throws Exception {
-        AmqpClientOptions options = AmqpClientOptions.builder()
-                .host(AmqpConstants.HOST)
-                .port(AmqpConstants.PORT)
-                .accessKey(AmqpConstants.ACCESS_KEY)
-                .accessCode(AmqpConstants.ACCESS_CODE)
-                .queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。
-                .build();
-        AmqpClient amqpClient = new AmqpClient(options);
-        amqpClient.initialize();
-        MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
-        consumer.setMessageListener(message -> {
-            try {
-                // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
-                saveData(message);
-            } catch (Exception e) {
-                log.warn("message.getBody error,exception is ", e);
-            }
-        });
-    }
-
-//    @PostConstruct
-//    public void listener2() throws Exception {
-//        AmqpClientOptions options = AmqpClientOptions.builder()
-//                .host(AmqpConstants.HOST)
-//                .port(AmqpConstants.PORT)
-//                .accessKey("oZvmUesgndzlw75ShbswJOe36algwcvaGBmOAsxc")
-//                .accessCode("Y6JLYH6QV0LX5AJDXREM")
-//                .queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。
-//                .build();
-//        AmqpClient amqpClient = new AmqpClient(options);
-//        amqpClient.initialize();
-//        MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
-//        consumer.setMessageListener(message -> {
-//            try {
-//                System.err.println(message);
-//            } catch (Exception e) {
-//                log.warn("message.getBody error,exception is ", e);
-//            }
-//        });
-//
-//    }
-
     private void saveData(Message message) {
         try {
 
@@ -95,7 +46,6 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
             String status = bodyObj.getString("status");
             if (status != null) {
                 System.out.println(jsonObject);
-
                 return;
             }
 
@@ -114,7 +64,6 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
             message.acknowledge();
 
         } catch (JMSException e) {
-            // TODO 消息保存失败处理
             e.printStackTrace();
         }
     }