|
@@ -1,92 +0,0 @@
|
|
|
-package com.fjhx.service.amqp.impl;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
-import com.fjhx.constants.IotProductionRedisPrefixConstant;
|
|
|
-import com.fjhx.entity.amqp.AmqpData;
|
|
|
-import com.fjhx.mapper.amqp.AmqpDataMapper;
|
|
|
-import com.fjhx.service.amqp.AmqpDataService;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.springblade.core.redis.cache.BladeRedis;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import javax.annotation.Resource;
|
|
|
-import javax.jms.JMSException;
|
|
|
-import javax.jms.Message;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-
|
|
|
-
|
|
|
- * <p>
|
|
|
- * 服务实现类
|
|
|
- * </p>
|
|
|
- *
|
|
|
- * @author ${author}
|
|
|
- * @since 2022-07-27
|
|
|
- */
|
|
|
-@Service
|
|
|
-@Slf4j
|
|
|
-public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> implements AmqpDataService {
|
|
|
-
|
|
|
- @Resource
|
|
|
- private BladeRedis redisCache;
|
|
|
-
|
|
|
- private void saveData(Message message) {
|
|
|
- try {
|
|
|
-
|
|
|
- JSONObject jsonObject = JSONObject.parseObject(message.getBody(String.class));
|
|
|
-
|
|
|
- JSONObject bodyObj = jsonObject
|
|
|
- .getJSONObject("notify_data")
|
|
|
- .getJSONObject("body");
|
|
|
-
|
|
|
-
|
|
|
- String status = bodyObj.getString("status");
|
|
|
- if (status != null) {
|
|
|
- System.out.println(jsonObject);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- String dataStr = bodyObj.getJSONArray("services")
|
|
|
- .getJSONObject(0)
|
|
|
- .getJSONObject("properties")
|
|
|
- .getString("DeviceData");
|
|
|
-
|
|
|
- List<AmqpData> amqpDataList = JSONObject.parseArray(dataStr, AmqpData.class);
|
|
|
- Date date = new Date();
|
|
|
- amqpDataList.forEach(item -> item.setCreateTime(date));
|
|
|
-
|
|
|
- saveInstantaneousState(amqpDataList);
|
|
|
- saveBatch(amqpDataList);
|
|
|
-
|
|
|
- message.acknowledge();
|
|
|
-
|
|
|
- } catch (JMSException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- * 保存设备最新状态
|
|
|
- */
|
|
|
- private void saveInstantaneousState(List<AmqpData> amqpDataList) {
|
|
|
- Collection<AmqpData> amqpDataCollection = amqpDataList.stream().collect(Collectors.toMap(
|
|
|
- AmqpData::getEquipmentNo,
|
|
|
- item -> item,
|
|
|
- (v1, v2) -> (v1.getObtainTime().compareTo(v2.getObtainTime()) > 0) ? v1 : v2
|
|
|
- )).values();
|
|
|
-
|
|
|
- for (AmqpData item : amqpDataCollection) {
|
|
|
- String equipmentNo = item.getEquipmentNo();
|
|
|
-
|
|
|
- String redisKey = IotProductionRedisPrefixConstant.INSTANTANEOUS_STATE + equipmentNo;
|
|
|
- AmqpData amqpData = redisCache.get(redisKey);
|
|
|
- if (amqpData == null || item.getObtainTime().compareTo(amqpData.getObtainTime()) > 0) {
|
|
|
- redisCache.setEx(redisKey, item, 40L);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|