Browse Source

WebSocket

home 2 years ago
parent
commit
94a35cf2eb

+ 1 - 1
bladex/blade-gateway/src/main/resources/bootstrap.yml

@@ -1,5 +1,5 @@
 server:
-  port: 80
+  port: 8080
 
 spring:
   cloud:

+ 50 - 0
hx-common/common-websocket/pom.xml

@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hx-common</artifactId>
+        <groupId>com.fjhx</groupId>
+        <version>3.2.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>common-websocket</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+
+        <!-- 工具模块 -->
+        <dependency>
+            <groupId>com.fjhx</groupId>
+            <artifactId>common-tool</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+            <version>2.7.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-tomcat</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-undertow</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>

+ 17 - 0
hx-common/common-websocket/src/main/java/com/fjhx/config/WebSocketConfig.java

@@ -0,0 +1,17 @@
+package com.fjhx.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig {
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+}

+ 25 - 0
hx-common/common-websocket/src/main/java/com/fjhx/entity/SendEntity.java

@@ -0,0 +1,25 @@
+package com.fjhx.entity;
+
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class SendEntity {
+
+    /**
+     * 消息创建时间
+     */
+    private Date createTime;
+
+    /**
+     * 消息发送人
+     */
+    private String userId;
+
+    /**
+     * 消息主体
+     */
+    private String message;
+
+}

+ 25 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnCloseEvent.java

@@ -0,0 +1,25 @@
+package com.fjhx.event;
+
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock关闭连接事件
+ */
+@Getter
+public class WebSocketOnCloseEvent extends ApplicationEvent {
+
+    private final String userId;
+
+    public WebSocketOnCloseEvent(WebSocketServer source, String userId) {
+        super(source);
+        this.userId = userId;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer)super.getSource();
+    }
+
+}

+ 22 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnErrorEvent.java

@@ -0,0 +1,22 @@
+package com.fjhx.event;
+
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送的用户离线事件
+ */
+@Getter
+public class WebSocketOnErrorEvent extends ApplicationEvent {
+
+    private final String userId;
+    private final Throwable throwable;
+
+    public WebSocketOnErrorEvent(WebSocketServer source, String userId, Throwable throwable) {
+        super(source);
+        this.userId = userId;
+        this.throwable = throwable;
+    }
+
+}

+ 38 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnMessageEvent.java

@@ -0,0 +1,38 @@
+package com.fjhx.event;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock接收消息事件
+ */
+@Getter
+public class WebSocketOnMessageEvent extends ApplicationEvent {
+
+    private final String userId;
+    private final String message;
+    private final JSONObject messageJSONObject;
+
+    public WebSocketOnMessageEvent(WebSocketServer source, String userId, String message) {
+        super(source);
+        this.userId = userId;
+        this.message = message;
+        this.messageJSONObject = JSONObject.parseObject(message);
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer) super.getSource();
+    }
+
+    /**
+     * 消息类型
+     */
+    public String getType() {
+        return messageJSONObject.getString("type");
+    }
+
+
+}

+ 25 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnOpenEvent.java

@@ -0,0 +1,25 @@
+package com.fjhx.event;
+
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock建立连接事件
+ */
+@Getter
+public class WebSocketOnOpenEvent extends ApplicationEvent {
+
+    private final String userId;
+
+    public WebSocketOnOpenEvent(WebSocketServer source, String userId) {
+        super(source);
+        this.userId = userId;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer)super.getSource();
+    }
+
+}

+ 26 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketSendFailEvent.java

@@ -0,0 +1,26 @@
+package com.fjhx.event;
+
+import com.fjhx.entity.SendEntity;
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息失败事件
+ */
+@Getter
+public class WebSocketSendFailEvent extends ApplicationEvent {
+
+    private final SendEntity sendEntity;
+
+    public WebSocketSendFailEvent(WebSocketServer source, SendEntity sendEntity) {
+        super(source);
+        this.sendEntity = sendEntity;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer)super.getSource();
+    }
+
+}

+ 26 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketSendSuccessEvent.java

@@ -0,0 +1,26 @@
+package com.fjhx.event;
+
+import com.fjhx.entity.SendEntity;
+import com.fjhx.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息成功事件
+ */
+@Getter
+public class WebSocketSendSuccessEvent extends ApplicationEvent {
+
+    private final SendEntity sendEntity;
+
+    public WebSocketSendSuccessEvent(WebSocketServer source, SendEntity sendEntity) {
+        super(source);
+        this.sendEntity = sendEntity;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer) super.getSource();
+    }
+
+}

+ 24 - 0
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketUserOfflineEvent.java

@@ -0,0 +1,24 @@
+package com.fjhx.event;
+
+import com.fjhx.entity.SendEntity;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.Date;
+
+/**
+ * webStock推送的用户离线事件
+ */
+@Getter
+public class WebSocketUserOfflineEvent extends ApplicationEvent {
+
+    private final SendEntity sendEntity = new SendEntity();
+
+    public WebSocketUserOfflineEvent(String userId, String message) {
+        super(userId);
+        this.sendEntity.setUserId(userId);
+        this.sendEntity.setMessage(message);
+        this.sendEntity.setCreateTime(new Date());
+    }
+
+}

+ 140 - 0
hx-common/common-websocket/src/main/java/com/fjhx/service/WebSocketServer.java

@@ -0,0 +1,140 @@
+package com.fjhx.service;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.alibaba.fastjson.JSON;
+import com.fjhx.entity.SendEntity;
+import com.fjhx.event.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ServerEndpoint("/webStock/{userId}")
+@Component
+@Slf4j
+public class WebSocketServer {
+
+    private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
+
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+    /**
+     * 接收userId
+     */
+    private String userId;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(@RequestBody Session session, @PathParam("userId") String userId) {
+        this.session = session;
+        this.userId = userId;
+        webSocketMap.remove(userId);
+        webSocketMap.put(userId, this);
+
+        // 发布建立连接连接事件
+        applicationContext.publishEvent(new WebSocketOnOpenEvent(this, userId));
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        webSocketMap.remove(userId);
+
+        // 发布建立关闭连接事件
+        applicationContext.publishEvent(new WebSocketOnCloseEvent(this, userId));
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        // 发布建立关闭连接事件
+        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, userId, message));
+    }
+
+    /**
+     * @param error 异常
+     */
+    @OnError
+    public void onError(Throwable error) {
+        // 发布连接异常事件
+        applicationContext.publishEvent(new WebSocketOnErrorEvent(this, userId, error));
+    }
+
+    /**
+     * 实现服务器主动推送
+     *
+     * @param message 消息内容
+     */
+    public void sendMessage(String message) {
+        SendEntity sendEntity = new SendEntity();
+        sendEntity.setUserId(userId);
+        sendEntity.setCreateTime(new Date());
+        sendEntity.setMessage(message);
+
+        try {
+            this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
+            applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity));
+        } catch (IOException e) {
+            applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity));
+        }
+    }
+
+    /**
+     * 发送自定义消息
+     *
+     * @param userId  发送用户
+     * @param message 消息内容
+     */
+    public static void sendInfo(String userId, String message) {
+        WebSocketServer webSocketServer = webSocketMap.get(userId);
+
+        if (webSocketServer == null) {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
+            return;
+        }
+
+        webSocketServer.sendMessage(message);
+    }
+
+    /**
+     * 发送自定义消息--群发
+     *
+     * @param message 消息内容
+     */
+    public static void sendInfoGroup(String message) {
+        if (webSocketMap.size() > 0) {
+            for (String userId : WebSocketServer.webSocketMap.keySet()) {
+                sendInfo(userId, message);
+            }
+        }
+    }
+
+    /**
+     * 在线人数
+     */
+    public static synchronized int getOnlineCount() {
+        return webSocketMap.size();
+    }
+
+}

+ 1 - 0
hx-common/pom.xml

@@ -22,6 +22,7 @@
         <module>library-product</module>
         <module>library-storage</module>
         <module>library-supply</module>
+        <module>common-websocket</module>
     </modules>
 
     <properties>

+ 6 - 0
hx-service/pom.xml

@@ -107,6 +107,12 @@
                 <version>${hx.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.fjhx</groupId>
+                <artifactId>common-websocket</artifactId>
+                <version>${hx.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 

+ 5 - 0
hx-service/storage-restructure/pom.xml

@@ -43,6 +43,11 @@
             <artifactId>easyexcel</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.fjhx</groupId>
+            <artifactId>common-websocket</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>

+ 37 - 0
hx-service/storage-restructure/src/main/java/com/fjhx/listener/WebSocketEventListener.java

@@ -0,0 +1,37 @@
+package com.fjhx.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.entity.SendEntity;
+import com.fjhx.event.WebSocketOnMessageEvent;
+import com.fjhx.event.WebSocketOnOpenEvent;
+import com.fjhx.event.WebSocketUserOfflineEvent;
+import com.fjhx.service.WebSocketServer;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class WebSocketEventListener {
+
+    @EventListener
+    public void eventListener(WebSocketOnOpenEvent webSocketOnOpenEvent) {
+        WebSocketServer webSocketServer = webSocketOnOpenEvent.getSource();
+        webSocketServer.sendMessage("后端返回:连接成功");
+    }
+
+    @EventListener
+    public void eventListener(WebSocketOnMessageEvent webSocketOnMessageEvent) {
+        WebSocketServer webSocketServer = webSocketOnMessageEvent.getSource();
+        webSocketServer.sendMessage(webSocketOnMessageEvent.getMessage());
+
+        JSONObject jsonObject = webSocketOnMessageEvent.getMessageJSONObject();
+        WebSocketServer.sendInfo(jsonObject.getString("toUserId"), jsonObject.getString("contentText"));
+    }
+
+    @EventListener
+    public void eventListener(WebSocketUserOfflineEvent webSocketUserOfflineEvent) {
+        SendEntity sendEntity = webSocketUserOfflineEvent.getSendEntity();
+
+        System.out.println("用户:" + sendEntity.getUserId() + "不在线");
+    }
+
+}

+ 1 - 1
hx-service/syringe-production/src/main/java/com/fjhx/config/amqp/AmqpClientOptions.java

@@ -43,7 +43,7 @@ public class AmqpClientOptions {
      * false:收到消息后,需要手动调用message.acknowledge()
      */
     @Builder.Default
-    private boolean isAutoAcknowledge = true;
+    private boolean isAutoAcknowledge = false;
 
     /**
      * 重连时延(ms)

+ 32 - 22
hx-service/syringe-production/src/main/java/com/fjhx/service/amqp/impl/AmqpDataServiceImpl.java

@@ -15,6 +15,8 @@ import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import java.util.Collection;
 import java.util.Date;
@@ -45,40 +47,48 @@ public class AmqpDataServiceImpl extends ServiceImpl<AmqpDataMapper, AmqpData> i
         consumer.setMessageListener(message -> {
             try {
                 // 此处进行消息处理。如果处理比较耗时,最好进行开启新的线程处理,否则可能造成心跳超时链接断开。
-                String body = message.getBody(String.class);
-                saveData(body);
+                saveData(message);
             } catch (Exception e) {
                 log.warn("message.getBody error,exception is ", e);
             }
         });
     }
 
-    private void saveData(String body) {
-        JSONObject jsonObject = JSONObject.parseObject(body);
+    private void saveData(Message message) {
+        try {
 
-        JSONObject bodyObj = jsonObject
-                .getJSONObject("notify_data")
-                .getJSONObject("body");
+            JSONObject jsonObject = JSONObject.parseObject(message.getBody(String.class));
 
-        // 设备状态
-        String status = bodyObj.getString("status");
-        if (status != null) {
-            System.out.println(jsonObject);
+            JSONObject bodyObj = jsonObject
+                    .getJSONObject("notify_data")
+                    .getJSONObject("body");
 
-            return;
-        }
+            // 设备状态
+            String status = bodyObj.getString("status");
+            if (status != null) {
+                System.out.println(jsonObject);
+
+                return;
+            }
 
-        String dataStr = bodyObj.getJSONArray("services")
-                .getJSONObject(0)
-                .getJSONObject("properties")
-                .getString("DeviceData");
+            String dataStr = bodyObj.getJSONArray("services")
+                    .getJSONObject(0)
+                    .getJSONObject("properties")
+                    .getString("DeviceData");
 
-        List<AmqpData> amqpDataList = JSONObject.parseArray(dataStr, AmqpData.class);
-        Date date = new Date();
-        amqpDataList.forEach(item -> item.setCreateTime(date));
+            List<AmqpData> amqpDataList = JSONObject.parseArray(dataStr, AmqpData.class);
+            Date date = new Date();
+            amqpDataList.forEach(item -> item.setCreateTime(date));
 
-        saveInstantaneousState(amqpDataList);
-        saveBatch(amqpDataList);
+            saveInstantaneousState(amqpDataList);
+            saveBatch(amqpDataList);
+
+            message.acknowledge();
+
+        } catch (JMSException e) {
+            // TODO 消息保存失败处理
+            e.printStackTrace();
+        }
     }
 
     /**