Forráskód Böngészése

ioDta保存设备最新状态到redis

home 2 éve
szülő
commit
8023990274

+ 14 - 0
hx-service-api/iot-production-api/src/main/java/com/fjhx/constant/IotProductionRedisPrefixConstant.java

@@ -0,0 +1,14 @@
+package com.fjhx.constant;
+
+/**
+ * 锁常量
+ */
+public interface IotProductionRedisPrefixConstant {
+
+    /**
+     * 添加前缀,防止其他模块出现共用一把锁的情况
+     */
+    String INSTANTANEOUS_STATE = "IotProduction-instantaneousState:";
+
+
+}

+ 36 - 4
hx-service/iot-production/src/main/java/com/fjhx/service/amqp/impl/AmqpDataServiceImpl.java

@@ -4,17 +4,21 @@ import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fjhx.config.amqp.AmqpClient;
 import com.fjhx.config.amqp.AmqpClient;
 import com.fjhx.config.amqp.AmqpConstants;
 import com.fjhx.config.amqp.AmqpConstants;
+import com.fjhx.constant.IotProductionRedisPrefixConstant;
 import com.fjhx.entity.amqp.AmqpData;
 import com.fjhx.entity.amqp.AmqpData;
 import com.fjhx.mapper.amqp.AmqpDataMapper;
 import com.fjhx.mapper.amqp.AmqpDataMapper;
 import com.fjhx.service.amqp.AmqpDataService;
 import com.fjhx.service.amqp.AmqpDataService;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.redis.cache.BladeRedis;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageConsumer;
+import java.util.Collection;
 import java.util.Date;
 import java.util.Date;
 import java.util.List;
 import java.util.List;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * <p>
  * <p>
@@ -31,6 +35,9 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
     @Autowired
     @Autowired
     private AmqpClient amqpClient;
     private AmqpClient amqpClient;
 
 
+    @Autowired
+    private BladeRedis redisCache;
+
     @Override
     @Override
     public void run(String... args) throws Exception {
     public void run(String... args) throws Exception {
         MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
         MessageConsumer consumer = amqpClient.newConsumer(AmqpConstants.DEFAULT_QUEUE);
@@ -56,6 +63,7 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
         String status = bodyObj.getString("status");
         String status = bodyObj.getString("status");
         if (status != null) {
         if (status != null) {
             System.out.println(jsonObject);
             System.out.println(jsonObject);
+
             return;
             return;
         }
         }
 
 
@@ -64,11 +72,35 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
                 .getJSONObject("properties")
                 .getJSONObject("properties")
                 .getString("DeviceData");
                 .getString("DeviceData");
 
 
-        List<AmqpData> amqpData = JSONObject.parseArray(dataStr, AmqpData.class);
-
+        List<AmqpData> amqpDataList = JSONObject.parseArray(dataStr, AmqpData.class);
         Date date = new Date();
         Date date = new Date();
-        amqpData.forEach(item -> item.setCreateTime(date));
-        saveBatch(amqpData);
+        amqpDataList.forEach(item -> item.setCreateTime(date));
+
+        saveInstantaneousState(amqpDataList);
+        saveBatch(amqpDataList);
     }
     }
 
 
+    /**
+     * 保存设备最新状态
+     */
+    private void saveInstantaneousState(List<AmqpData> amqpDataList) {
+        Collection<AmqpData> amqpDataCollection = amqpDataList.stream().collect(Collectors.toMap(
+                AmqpData::getEquipmentNo,
+                item -> item,
+                (v1, v2) -> (v1.getObtainTime().compareTo(v2.getObtainTime()) > 0) ? v1 : v2
+        )).values();
+
+        for (AmqpData item : amqpDataCollection) {
+            String equipmentNo = item.getEquipmentNo();
+
+            String redisKey = IotProductionRedisPrefixConstant.INSTANTANEOUS_STATE + equipmentNo;
+            AmqpData amqpData = redisCache.get(redisKey);
+            if (amqpData == null || item.getObtainTime().compareTo(amqpData.getObtainTime()) > 0) {
+                redisCache.setEx(redisKey, item, 40L);
+            }
+        }
+
+    }
+
+
 }
 }