Kaynağa Gözat

ioDta数据保存

home 2 yıl önce
ebeveyn
işleme
222dad3970

+ 69 - 0
hx-service-api/iot-production-api/src/main/java/com/fjhx/entity/amqp/AmqpData.java

@@ -0,0 +1,69 @@
+package com.fjhx.entity.amqp;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * <p>
+ *
+ * </p>
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+@Data
+public class AmqpData implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 主键id
+     */
+    @TableId(value = "id", type = IdType.ASSIGN_ID)
+    private Long id;
+
+    /**
+     * 设备编号
+     */
+    private String equipmentNo;
+
+    /**
+     * 设备IP地址
+     */
+    private String ipAddress;
+
+    /**
+     * 设备端口号
+     */
+    private Integer port;
+
+    /**
+     * 设备类型
+     */
+    private Integer type;
+
+    /**
+     * 数据获取时间
+     */
+    private Date obtainTime;
+
+    /**
+     * 数据
+     */
+    private String dataInfo;
+
+    /**
+     * 获取数据状态 1成功 0失败
+     */
+    private Integer status;
+
+    /**
+     * 创建时间
+     */
+    private Date createTime;
+
+}

+ 17 - 0
hx-service-api/iot-production-api/src/main/java/com/fjhx/params/amqp/AmqpDataEx.java

@@ -0,0 +1,17 @@
+package com.fjhx.params.amqp;
+
+import com.fjhx.entity.amqp.AmqpData;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * 
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class AmqpDataEx extends AmqpData {
+
+}

+ 17 - 0
hx-service-api/iot-production-api/src/main/java/com/fjhx/params/amqp/AmqpDataVo.java

@@ -0,0 +1,17 @@
+package com.fjhx.params.amqp;
+
+import com.fjhx.entity.amqp.AmqpData;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+/**
+ * 
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class AmqpDataVo extends AmqpData {
+
+}

+ 6 - 0
hx-service/iot-production/pom.xml

@@ -51,6 +51,12 @@
             <version>3.0.92</version>
         </dependency>
 
+        <!-- amqp -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-jms-client</artifactId>
+            <version>0.61.0</version>
+        </dependency>
 
     </dependencies>
 

+ 25 - 1
hx-service/iot-production/src/main/java/com/fjhx/config/IoTDAConfig.java

@@ -1,5 +1,8 @@
 package com.fjhx.config;
 
+import com.fjhx.config.amqp.AmqpClient;
+import com.fjhx.config.amqp.AmqpClientOptions;
+import com.fjhx.config.amqp.AmqpConstants;
 import com.huaweicloud.sdk.core.auth.ICredential;
 import com.huaweicloud.sdk.iotda.v5.IoTDAClient;
 import com.huaweicloud.sdk.iotda.v5.auth.IoTDACredentials;
@@ -19,8 +22,11 @@ public class IoTDAConfig {
     private static final String sk = "4mvyTAn75I0hvyAm8rKxuXi6TSyGDG5yblW43ci1";
     private static final String projectId = "0f130b202d00f4e52fc0c01deb21f36a";
 
+    /**
+     * IoTDA 操作
+     */
     @Bean
-    public IoTDAClient client() {
+    public IoTDAClient ioTDAClient() {
         // 创建认证
         ICredential auth = new IoTDACredentials()
                 .withAk(ak)
@@ -37,4 +43,22 @@ public class IoTDAConfig {
                 .build();
     }
 
+    /**
+     * IoTDA 监听
+     */
+    @Bean
+    public AmqpClient amqpClient() throws Exception {
+        AmqpClientOptions options = AmqpClientOptions.builder()
+                .host(AmqpConstants.HOST)
+                .port(AmqpConstants.PORT)
+                .accessKey(AmqpConstants.ACCESS_KEY)
+                .accessCode(AmqpConstants.ACCESS_CODE)
+                .queuePrefetch(1000) // sdk会在内存中分配该参数大小的队列,用来接收消息,客户端内存较小的情况可以调小该参数。
+                .build();
+        AmqpClient amqpClient = new AmqpClient(options);
+        amqpClient.initialize();
+        return amqpClient;
+    }
+
+
 }

+ 4 - 0
hx-service/iot-production/src/main/java/com/fjhx/config/MybatisConfig.java

@@ -30,6 +30,10 @@ public class MybatisConfig {
 
             @Override
             public boolean ignoreTable(String tableName) {
+                if ("amqp_data".equals(tableName)) {
+                    return true;
+                }
+
                 return false;
             }
 

+ 96 - 0
hx-service/iot-production/src/main/java/com/fjhx/config/amqp/AmqpClient.java

@@ -0,0 +1,96 @@
+package com.fjhx.config.amqp;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+@Slf4j
+public class AmqpClient {
+    private final AmqpClientOptions options;
+    private Connection connection;
+    private Session session;
+    private final Set<MessageConsumer> consumerSet = Collections.synchronizedSet(new HashSet<>());
+
+    public AmqpClient(AmqpClientOptions options) {
+        this.options = options;
+    }
+
+    public String getId() {
+        return options.getClientId();
+    }
+
+    public void initialize() throws Exception {
+        String connectionUrl = options.generateConnectUrl();
+        log.info("connectionUrl={}", connectionUrl);
+        JmsConnectionFactory cf = new JmsConnectionFactory(connectionUrl);
+        // 信任服务端
+        TransportOptions to = new TransportOptions();
+        to.setTrustAll(true);
+        cf.setSslContext(TransportSupport.createJdkSslContext(to));
+        String userName = "accessKey=" + options.getAccessKey();
+        cf.setExtension(JmsConnectionExtensions.USERNAME_OVERRIDE.toString(), (connection, uri) -> {
+            // IoTDA的userName组成格式如下:“accessKey=${accessKey}|timestamp=${timestamp}”
+            String newUserName = userName;
+            if (connection instanceof JmsConnection) {
+                newUserName = ((JmsConnection) connection).getUsername();
+            }
+            return newUserName + "|timestamp=" + System.currentTimeMillis();
+        });
+        // 创建连接
+        connection = cf.createConnection(userName, options.getAccessCode());
+        // 创建 Session, Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
+        session = connection.createSession(false, options.isAutoAcknowledge() ? Session.AUTO_ACKNOWLEDGE : Session.CLIENT_ACKNOWLEDGE);
+        connection.start();
+    }
+
+    public MessageConsumer newConsumer(String queueName) throws Exception {
+        if (connection == null || ((JmsConnection) connection).isClosed()) {
+            throw new Exception("create consumer failed,the connection is disconnected.");
+        }
+        MessageConsumer consumer;
+
+        consumer = session.createConsumer(new JmsQueue(queueName));
+        if (consumer != null) {
+            consumerSet.add(consumer);
+        }
+        return consumer;
+    }
+
+    public void close() {
+        consumerSet.forEach(consumer -> {
+            try {
+                consumer.close();
+            } catch (JMSException e) {
+                log.warn("consumer close error,exception is ", e);
+            }
+        });
+
+        if (session != null) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                log.warn("session close error,exception is ", e);
+            }
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+                log.warn("connection close error,exception is", e);
+            }
+        }
+    }
+}

+ 118 - 0
hx-service/iot-production/src/main/java/com/fjhx/config/amqp/AmqpClientOptions.java

@@ -0,0 +1,118 @@
+package com.fjhx.config.amqp;
+
+
+import cn.hutool.core.util.ObjectUtil;
+import lombok.Builder;
+import lombok.Data;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Data
+@Builder
+public class AmqpClientOptions {
+    private String host;
+    @Builder.Default
+    private int port = 5671;
+    private String accessKey;
+    private String accessCode;
+    private String clientId;
+    /**
+     * 仅支持true
+     */
+    @Builder.Default
+    private boolean useSsl = true;
+
+    /**
+     * IoTDA仅支持default
+     */
+    @Builder.Default
+    private String vhost = "default";
+
+    /**
+     * IoTDA仅支持PLAIN
+     */
+    @Builder.Default
+    private String saslMechanisms = "PLAIN";
+
+    /**
+     * true: SDK自动ACK(默认)
+     * false:收到消息后,需要手动调用message.acknowledge()
+     */
+    @Builder.Default
+    private boolean isAutoAcknowledge = true;
+
+    /**
+     * 重连时延(ms)
+     */
+    @Builder.Default
+    private long reconnectDelay = 3000L;
+
+    /**
+     * 最大重连时延(ms),随着重连次数增加重连时延逐渐增加
+     */
+    @Builder.Default
+    private long maxReconnectDelay = 30 * 1000L;
+
+    /**
+     * 最大重连次数,默认值-1,代表没有限制
+     */
+    @Builder.Default
+    private long maxReconnectAttempts = -1;
+
+    /**
+     * 空闲超时,对端在这个时间段内没有发送AMQP帧则会导致连接断开。默认值为30000。单位:毫秒。
+     */
+    @Builder.Default
+    private long idleTimeout = 30 * 1000L;
+
+    /**
+     * The values below control how many messages the remote peer can send to the client and be held in a pre-fetch buffer for each consumer instance.
+     */
+    @Builder.Default
+    private int queuePrefetch = 1000;
+
+    /**
+     * 扩展参数
+     */
+    private Map<String, String> extendedOptions;
+
+    public String generateConnectUrl() {
+        String uri = MessageFormat.format("{0}://{1}:{2}", (useSsl ? "amqps" : "amqp"), host, String.valueOf(port));
+        Map<String, String> uriOptions = new HashMap<>();
+        uriOptions.put("amqp.vhost", vhost);
+        uriOptions.put("amqp.idleTimeout", String.valueOf(idleTimeout));
+        uriOptions.put("amqp.saslMechanisms", saslMechanisms);
+
+        Map<String, String> jmsOptions = new HashMap<>();
+        jmsOptions.put("jms.prefetchPolicy.queuePrefetch", String.valueOf(queuePrefetch));
+        if (ObjectUtil.isNotEmpty(clientId)) {
+            jmsOptions.put("jms.clientID", clientId);
+        } else {
+            jmsOptions.put("jms.clientID", UUID.randomUUID().toString());
+        }
+        jmsOptions.put("failover.reconnectDelay", String.valueOf(reconnectDelay));
+        jmsOptions.put("failover.maxReconnectDelay", String.valueOf(maxReconnectDelay));
+        if (maxReconnectAttempts > 0) {
+            jmsOptions.put("failover.maxReconnectAttempts", String.valueOf(maxReconnectAttempts));
+        }
+        if (extendedOptions != null) {
+            for (Map.Entry<String, String> option : extendedOptions.entrySet()) {
+                if (option.getKey().startsWith("amqp.") || option.getKey().startsWith("transport.")) {
+                    uriOptions.put(option.getKey(), option.getValue());
+                } else {
+                    jmsOptions.put(option.getKey(), option.getValue());
+                }
+            }
+        }
+        return uriOptions.entrySet().stream()
+                .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
+                .collect(Collectors.joining("&", "failover:(" + uri + "?", ")")) +
+                jmsOptions.entrySet().stream()
+                        .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue()))
+                        .collect(Collectors.joining("&", "?", ""));
+    }
+}

+ 36 - 0
hx-service/iot-production/src/main/java/com/fjhx/config/amqp/AmqpConstants.java

@@ -0,0 +1,36 @@
+package com.fjhx.config.amqp;
+
+
+public interface AmqpConstants {
+    /**
+     * AMQP接入域名
+     * 参考:https://support.huaweicloud.com/usermanual-iothub/iot_01_00100_2.html#section2
+     * eg: "****.iot-amqps.cn-north-4.myhuaweicloud.com";
+     */
+    String HOST = "a1625d5cc8.iot-amqps.cn-north-4.myhuaweicloud.com";
+
+    /**
+     * AMQP接入端口
+     * 参考:https://support.huaweicloud.com/usermanual-iothub/iot_01_00100_2.html#section2
+     */
+    int PORT = 5671;
+
+    /**
+     * 接入凭证键值
+     * 参考:https://support.huaweicloud.com/usermanual-iothub/iot_01_00100_2.html#section3
+     * 不需要拼接时间戳timestamp
+     */
+    String ACCESS_KEY = "SwGUHYzA";
+
+    /**
+     * 接入凭证密钥
+     * 参考:https://support.huaweicloud.com/usermanual-iothub/iot_01_00100_2.html#section3
+     */
+    String ACCESS_CODE = "mGYgxmOtPhWDNHaAMU76aiSy0P3OtaWE";
+
+    /**
+     * 默认队列
+     */
+    String DEFAULT_QUEUE = "DefaultQueue";
+
+}

+ 16 - 0
hx-service/iot-production/src/main/java/com/fjhx/mapper/amqp/AmqpDataMapper.java

@@ -0,0 +1,16 @@
+package com.fjhx.mapper.amqp;
+
+import com.fjhx.entity.amqp.AmqpData;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+public interface AmqpDataMapper extends BaseMapper<AmqpData> {
+
+}

+ 5 - 0
hx-service/iot-production/src/main/java/com/fjhx/mapper/amqp/AmqpDataMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fjhx.mapper.amqp.AmqpDataMapper">
+
+</mapper>

+ 18 - 0
hx-service/iot-production/src/main/java/com/fjhx/service/amqp/AmqpDataService.java

@@ -0,0 +1,18 @@
+package com.fjhx.service.amqp;
+
+import com.fjhx.base.BaseService;
+import com.fjhx.entity.amqp.AmqpData;
+
+/**
+ * <p>
+ * 服务类
+ * </p>
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+public interface AmqpDataService extends BaseService<AmqpData> {
+
+
+
+}

+ 75 - 0
hx-service/iot-production/src/main/java/com/fjhx/service/amqp/impl/AmqpDataServiceImpl.java

@@ -0,0 +1,75 @@
+package com.fjhx.service.amqp.impl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fjhx.config.amqp.AmqpClient;
+import com.fjhx.config.amqp.AmqpConstants;
+import com.fjhx.entity.amqp.AmqpData;
+import com.fjhx.mapper.amqp.AmqpDataMapper;
+import com.fjhx.service.amqp.AmqpDataService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Service;
+
+import javax.jms.MessageConsumer;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * <p>
+ * 服务实现类
+ * </p>
+ *
+ * @author ${author}
+ * @since 2022-07-27
+ */
+@Service
+@Slf4j
+public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> implements AmqpDataService, CommandLineRunner {
+
+    @Autowired
+    private AmqpClient amqpClient;
+
+    @Override
+    public void run(String... args) throws Exception {
+        MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
+        consumer.setMessageListener(message -> {
+            try {
+                // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
+                String body = message.getBody(String.class);
+                saveData(body);
+            } catch (Exception e) {
+                log.warn("message.getBody error,exception is ", e);
+            }
+        });
+    }
+
+    private void saveData(String body) {
+        JSONObject jsonObject = JSONObject.parseObject(body);
+
+
+        JSONObject bodyObj = jsonObject
+                .getJSONObject("notify_data")
+                .getJSONObject("body");
+
+        // 设备状态
+        String status = bodyObj.getString("status");
+        if (status != null) {
+            System.out.println(jsonObject);
+            return;
+        }
+
+        String dataStr = bodyObj.getJSONArray("services")
+                .getJSONObject(0)
+                .getJSONObject("properties")
+                .getString("DeviceData");
+
+        List<AmqpData> amqpData = JSONObject.parseArray(dataStr, AmqpData.class);
+
+        Date date = new Date();
+        amqpData.forEach(item -> item.setCreateTime(date));
+        saveBatch(amqpData);
+    }
+
+}

+ 0 - 3
hx-service/storage/src/main/java/com/fjhx/supplier/service/impl/SupplierServiceImpl.java

@@ -289,7 +289,4 @@ public class SupplierServiceImpl extends ServiceImpl<SupplierMapper, Supplier> i
         return baseMapper.selectList(condition);
     }
 
-    public static void main(String[] args) {
-        System.out.println(new BigDecimal("170,850.00"));
-    }
 }