home 2 rokov pred
rodič
commit
95c7141656

+ 5 - 0
hx-common/common-websocket/src/main/java/com/fjhx/entity/MessageEntity.java

@@ -24,6 +24,11 @@ public class MessageEntity {
     private Date createTime;
 
     /**
+     * 类型
+     */
+    private Integer type;
+
+    /**
      * 消息主体
      */
     private JSONObject data;

+ 43 - 6
hx-common/common-websocket/src/main/java/com/fjhx/service/WebSocketServer.java

@@ -68,7 +68,17 @@ public class WebSocketServer {
      */
     @OnClose
     public void onClose() {
-        webSocketMap.remove(userId);
+        synchronized (this) {
+            ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
+            if (sessionIdWebSocketServerMap != null) {
+                sessionIdWebSocketServerMap.remove(sessionId);
+
+                if (sessionIdWebSocketServerMap.size() == 0) {
+                    webSocketMap.remove(userId);
+                }
+
+            }
+        }
 
         // 发布建立关闭连接事件
         applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
@@ -99,11 +109,12 @@ public class WebSocketServer {
      *
      * @param message 消息内容
      */
-    public void sendMessage(JSONObject message) {
+    public void sendMessage(Integer type, JSONObject message) {
         MessageEntity sendEntity = new MessageEntity();
         sendEntity.setUserId(userId);
         sendEntity.setSessionId(sessionId);
         sendEntity.setCreateTime(new Date());
+        sendEntity.setType(type);
         sendEntity.setData(message);
 
         try {
@@ -118,9 +129,10 @@ public class WebSocketServer {
      * 发送自定义消息
      *
      * @param userId  发送用户
+     * @param type    消息类型
      * @param message 消息内容
      */
-    public static void sendInfo(String userId, JSONObject message) {
+    public static void sendInfo(String userId, Integer type, JSONObject message) {
         ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
 
         if (sessionIdWebSocketServerMap == null) {
@@ -129,20 +141,45 @@ public class WebSocketServer {
         }
 
         for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
-            webSocketServer.sendMessage(message);
+            webSocketServer.sendMessage(type, message);
+        }
+
+    }
+
+    /**
+     * 发送自定义消息
+     *
+     * @param userId  发送用户
+     * @param type    消息类型
+     * @param message 消息内容
+     */
+    public static void sendInfo(String userId, String sessionId, Integer type, JSONObject message) {
+        ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
+
+        if (sessionIdWebSocketServerMap == null) {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
+            return;
+        }
+
+        WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId);
+        if (webSocketServer != null) {
+            webSocketServer.sendMessage(type, message);
+        } else {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
         }
 
     }
 
+
     /**
      * 发送自定义消息--群发
      *
      * @param message 消息内容
      */
-    public static void sendInfoGroup(JSONObject message) {
+    public static void sendInfoGroup(Integer type, JSONObject message) {
         if (webSocketMap.size() > 0) {
             for (String userId : WebSocketServer.webSocketMap.keySet()) {
-                sendInfo(userId, message);
+                sendInfo(userId, type, message);
             }
         }
     }

+ 0 - 20
hx-service-api/storage-restructure-api/src/main/java/com/fjhx/constants/InstructTypeConstant.java

@@ -1,20 +0,0 @@
-package com.fjhx.constants;
-
-public interface InstructTypeConstant {
-
-    /**
-     * 开始并返回所有数据
-     */
-    Integer INSTRUCT_TYPE_1 = 1;
-
-    /**
-     * 开始并返回去重数据
-     */
-    Integer INSTRUCT_TYPE_2 = 2;
-
-    /**
-     * 关闭
-     */
-    Integer INSTRUCT_TYPE_3 = 3;
-
-}

+ 35 - 0
hx-service-api/storage-restructure-api/src/main/java/com/fjhx/constants/WebSocketConstant.java

@@ -0,0 +1,35 @@
+package com.fjhx.constants;
+
+public interface WebSocketConstant {
+
+    /**
+     * 上位机userId
+     */
+    String WEB_STOCK_PROGRAM = "webStockProgram";
+
+    /**
+     * 转发userId key
+     */
+    String FORWARD_USER_ID = "forwardUserId";
+
+    /**
+     * 转发session key
+     */
+    String FORWARD_SESSION_ID = "forwardSessionId";
+
+    /**
+     * 初始化返回
+     */
+    int ON_OPEN_TYPE = 1;
+
+    /**
+     * 操作上位机
+     */
+    int OPERATING_UPPER_COMPUTER = 2;
+
+    /**
+     * 推送rfid
+     */
+    int PUSH_RFID = 3;
+
+}

+ 0 - 2
hx-service/storage-restructure/src/main/java/com/fjhx/StorageRestructureApplication.java

@@ -1,9 +1,7 @@
 package com.fjhx;
 
 import org.springblade.core.cloud.client.BladeCloudApplication;
-import org.springblade.core.cloud.feign.EnableBladeFeign;
 import org.springblade.core.launch.BladeApplication;
-import org.springframework.cloud.client.SpringCloudApplication;
 
 /**
  * 杰生重构模块启动器

+ 58 - 58
hx-service/storage-restructure/src/main/java/com/fjhx/listener/RfidDataListener.java

@@ -1,58 +1,58 @@
-package com.fjhx.listener;
-
-import com.alibaba.fastjson.JSONObject;
-import com.fjhx.service.WebSocketServer;
-import com.rabbitmq.client.Channel;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.annotation.RabbitHandler;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
-import org.springframework.retry.annotation.Backoff;
-import org.springframework.retry.annotation.EnableRetry;
-import org.springframework.retry.annotation.Retryable;
-import org.springframework.stereotype.Component;
-
-import java.nio.charset.StandardCharsets;
-
-/**
- * rabbitmq队列监听工具
- */
-
-@Slf4j
-@EnableRetry
-@Component
-public class RfidDataListener implements ChannelAwareMessageListener {
-
-    /**
-     * 监听rfid数据队列
-     */
-    @RabbitHandler
-    @RabbitListener(
-            queues = {
-                    "rfid_data_queue"
-            }
-    )
-    @Retryable(value = {Exception.class}, backoff = @Backoff(delay = 3000, multiplier = 1))
-    @Override
-    public void onMessage(Message message, Channel channel) throws Exception {
-        try {
-            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-            JSONObject jsonObject = JSONObject.parseObject(msg);
-
-            log.info(msg);
-
-            String userId = jsonObject.getString("userId");
-            WebSocketServer.sendInfo(userId, jsonObject.toJSONString());
-
-            // 手动签收消息,通知mq服务器端删除该消息
-            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-        } catch (Exception e) {
-            e.printStackTrace();
-            // 丢弃该消息
-            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
-            throw e;
-        }
-    }
-
-}
+//package com.fjhx.listener;
+//
+//import com.alibaba.fastjson.JSONObject;
+//import com.fjhx.service.WebSocketServer;
+//import com.rabbitmq.client.Channel;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.amqp.core.Message;
+//import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+//import org.springframework.amqp.rabbit.annotation.RabbitListener;
+//import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+//import org.springframework.retry.annotation.Backoff;
+//import org.springframework.retry.annotation.EnableRetry;
+//import org.springframework.retry.annotation.Retryable;
+//import org.springframework.stereotype.Component;
+//
+//import java.nio.charset.StandardCharsets;
+//
+///**
+// * rabbitmq队列监听工具
+// */
+//
+//@Slf4j
+//@EnableRetry
+//@Component
+//public class RfidDataListener implements ChannelAwareMessageListener {
+//
+//    /**
+//     * 监听rfid数据队列
+//     */
+//    @RabbitHandler
+//    @RabbitListener(
+//            queues = {
+//                    "rfid_data_queue"
+//            }
+//    )
+//    @Retryable(value = {Exception.class}, backoff = @Backoff(delay = 3000, multiplier = 1))
+//    @Override
+//    public void onMessage(Message message, Channel channel) throws Exception {
+//        try {
+//            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+//            JSONObject jsonObject = JSONObject.parseObject(msg);
+//
+//            log.info(msg);
+//
+//            String userId = jsonObject.getString("userId");
+//            WebSocketServer.sendInfo(userId, jsonObject.toJSONString());
+//
+//            // 手动签收消息,通知mq服务器端删除该消息
+//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//            // 丢弃该消息
+//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
+//            throw e;
+//        }
+//    }
+//
+//}

+ 111 - 101
hx-service/storage-restructure/src/main/java/com/fjhx/listener/WebSocketEventListener.java

@@ -1,133 +1,143 @@
 package com.fjhx.listener;
 
 import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.toolkit.IdWorker;
-import com.fjhx.constants.InstructTypeConstant;
-import com.fjhx.constants.RabbitConstant;
-import com.fjhx.enums.WebSocketOnMessageTypeEnum;
+import com.fjhx.constants.WebSocketConstant;
+import com.fjhx.entity.MessageEntity;
 import com.fjhx.event.WebSocketOnMessageEvent;
+import com.fjhx.event.WebSocketOnOpenEvent;
+import com.fjhx.service.WebSocketServer;
 import lombok.extern.slf4j.Slf4j;
-import org.springblade.core.tool.utils.ThreadUtil;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.core.MessageBuilder;
-import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.List;
-
 @Slf4j
 @Component
 public class WebSocketEventListener {
 
-    @Resource
-    private RabbitTemplate rabbitTemplate;
+//    @Resource
+//    private RabbitTemplate rabbitTemplate;
 
-    private RabbitTemplate.ConfirmCallback confirmCallback;
+//    private RabbitTemplate.ConfirmCallback confirmCallback;
 
 
     @EventListener
-    public void onMessageListener(WebSocketOnMessageEvent event) {
+    public void onMessageListener(WebSocketOnOpenEvent event) {
+        WebSocketServer source = event.getSource();
+        JSONObject jsonObject = new JSONObject();
+        source.sendMessage(WebSocketConstant.ON_OPEN_TYPE, jsonObject);
+    }
 
+
+    @EventListener
+    public void onMessageListener(WebSocketOnMessageEvent event) {
+        WebSocketServer webSocketServer = event.getSource();
         String message = event.getMessage();
 
-        JSONObject data = JSONObject.parseObject(message);
+        System.err.println(message);
 
-        switch (WebSocketOnMessageTypeEnum.get(event.getType())) {
-            case HANDHELD_MACHINE_RFID_SCANNING:
-                handheldMachineRfidScanning(data, event.getUserId(), InstructTypeConstant.INSTRUCT_TYPE_2);
-                break;
-            case HANDHELD_MACHINE_RFID_CLOSE_SCANNING:
-                handheldMachineRfidScanning(data, event.getUserId(), InstructTypeConstant.INSTRUCT_TYPE_3);
-                break;
-        }
-    }
+        MessageEntity sendEntity = JSONObject.parseObject(message, MessageEntity.class);
 
-    /**
-     * 一体机rfid扫描
-     */
-    private void handheldMachineRfidScanning(JSONObject data, String userId, Integer instructType) {
-        Integer plcCode = data.getInteger("plcCode");
-        Integer door = data.getInteger("door");
-        Integer inOut = data.getInteger("inOut");
-
-        JSONObject sendData = new JSONObject();
-        List<String> ips = new ArrayList<>();
-
-        // 赋值ip地址
-        if (plcCode == 1 && door == 1 && inOut == 1) {
-            ips.add("192.168.5.11");
-        } else if (plcCode == 1 && door == 1 && inOut == 2) {
-            ips.add("192.168.5.16");
-        } else if (plcCode == 1 && door == 2 && inOut == 1) {
-            ips.add("192.168.5.15");
-        } else if (plcCode == 2 && door == 1 && inOut == 1) {
-            ips.add("192.168.5.41");
-        } else if (plcCode == 2 && door == 2 && inOut == 2) {
-            ips.add("192.168.5.42");
-        } else {
-            return;
-        }
+        Integer type = sendEntity.getType();
+        JSONObject data = sendEntity.getData();
 
-        sendData.put("ips", ips);
-        sendData.put("userId", userId);
-        sendData.put("busUuid", userId);
-        sendData.put("instructType", instructType);
+        switch (type) {
+            case WebSocketConstant.OPERATING_UPPER_COMPUTER:
 
-        // 封装消息
-        Message message = MessageBuilder.withBody(sendData.toJSONString().getBytes())
-                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
-                .setContentEncoding("UTF-8")
-                .build();
+                data.put(WebSocketConstant.FORWARD_USER_ID, webSocketServer.getUserId());
+                data.put(WebSocketConstant.FORWARD_SESSION_ID, webSocketServer.getSessionId());
 
-        sendRabbit(message, 0);
-    }
+                WebSocketServer.sendInfo(WebSocketConstant.WEB_STOCK_PROGRAM, type, data);
+                break;
 
+            case WebSocketConstant.PUSH_RFID:
+                WebSocketServer.sendInfo(sendEntity.getUserId(), sendEntity.getSessionId(), type, data);
+                break;
+        }
 
-    /**
-     * 发送消息到rabbitMq
-     *
-     * @param message 消息体
-     * @param i       重试次数
-     */
-    private void sendRabbit(Message message, int i) {
-        int j = ++i;
-
-        // 发送消息
-        this.rabbitTemplate.setMandatory(true);
-        this.rabbitTemplate.setConfirmCallback(getConfirmCallback(j, message));
-        this.rabbitTemplate.convertAndSend(RabbitConstant.RFID_EXCHANGE, RabbitConstant.INSTRUCTIONS_ROUTING_KEY,
-                message, new CorrelationData(IdWorker.getIdStr()));
     }
 
-    /**
-     * 获取回调方法
-     */
-    private synchronized RabbitTemplate.ConfirmCallback getConfirmCallback(int j, Message message) {
-
-        if (confirmCallback == null) {
-            confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
-                // 重试机制
-                if (!ack) {
-                    if (j < 3) {
-                        ThreadUtil.sleep(j * 300L);
-                        sendRabbit(message, j);
-                        log.error("rabbitMqCallback:一体机开启出入库rfid扫描消息推送重试,次数:" + j);
-                    } else {
-                        log.error("rabbitMqCallback:一体机开启出入库rfid扫描消息推送失败");
-                    }
-                } else {
-                    log.info("rabbitMqCallback:success");
-                }
-            };
-        }
 
-        return confirmCallback;
-    }
+//    /**
+//     * 一体机rfid扫描
+//     */
+//    private void handheldMachineRfidScanning(JSONObject data, String userId, Integer instructType) {
+//        Integer plcCode = data.getInteger("plcCode");
+//        Integer door = data.getInteger("door");
+//        Integer inOut = data.getInteger("inOut");
+//
+//        JSONObject sendData = new JSONObject();
+//        List<String> ips = new ArrayList<>();
+//
+//        // 赋值ip地址
+//        if (plcCode == 1 && door == 1 && inOut == 1) {
+//            ips.add("192.168.5.11");
+//        } else if (plcCode == 1 && door == 1 && inOut == 2) {
+//            ips.add("192.168.5.16");
+//        } else if (plcCode == 1 && door == 2 && inOut == 1) {
+//            ips.add("192.168.5.15");
+//        } else if (plcCode == 2 && door == 1 && inOut == 1) {
+//            ips.add("192.168.5.41");
+//        } else if (plcCode == 2 && door == 2 && inOut == 2) {
+//            ips.add("192.168.5.42");
+//        } else {
+//            return;
+//        }
+//
+//        sendData.put("ips", ips);
+//        sendData.put("userId", userId);
+//        sendData.put("busUuid", userId);
+//        sendData.put("instructType", instructType);
+//
+//        // 封装消息
+//        Message message = MessageBuilder.withBody(sendData.toJSONString().getBytes())
+//                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
+//                .setContentEncoding("UTF-8")
+//                .build();
+//
+//        sendRabbit(message, 0);
+//    }
+
+
+//    /**
+//     * 发送消息到rabbitMq
+//     *
+//     * @param message 消息体
+//     * @param i       重试次数
+//     */
+//    private void sendRabbit(Message message, int i) {
+//        int j = ++i;
+//
+//        // 发送消息
+//        this.rabbitTemplate.setMandatory(true);
+//        this.rabbitTemplate.setConfirmCallback(getConfirmCallback(j, message));
+//        this.rabbitTemplate.convertAndSend(RabbitConstant.RFID_EXCHANGE, RabbitConstant.INSTRUCTIONS_ROUTING_KEY,
+//                message, new CorrelationData(IdWorker.getIdStr()));
+//    }
+//
+//    /**
+//     * 获取回调方法
+//     */
+//    private synchronized RabbitTemplate.ConfirmCallback getConfirmCallback(int j, Message message) {
+//
+//        if (confirmCallback == null) {
+//            confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
+//                // 重试机制
+//                if (!ack) {
+//                    if (j < 3) {
+//                        ThreadUtil.sleep(j * 300L);
+//                        sendRabbit(message, j);
+//                        log.error("rabbitMqCallback:一体机开启出入库rfid扫描消息推送重试,次数:" + j);
+//                    } else {
+//                        log.error("rabbitMqCallback:一体机开启出入库rfid扫描消息推送失败");
+//                    }
+//                } else {
+//                    log.info("rabbitMqCallback:success");
+//                }
+//            };
+//        }
+//
+//        return confirmCallback;
+//    }
 
 
 }