Quellcode durchsuchen

添加 Websocket消息通知及测试控制器

yzc vor 2 Jahren
Ursprung
Commit
b663ea4101

+ 6 - 0
hx-service/victoriatourist/pom.xml

@@ -43,6 +43,12 @@
             <artifactId>sdk</artifactId>
             <version>1.0.11</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-websocket</artifactId>
+            <version>5.2.15.RELEASE</version>
+            <scope>compile</scope>
+        </dependency>
 
     </dependencies>
 

+ 2 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/VictoriatouristApplication.java

@@ -2,11 +2,13 @@ package com.fjhx;
 
 import org.springblade.core.cloud.client.BladeCloudApplication;
 import org.springblade.core.launch.BladeApplication;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
 
 /**
  * 维多利亚
  */
 @BladeCloudApplication
+@EnableWebSocket
 public class VictoriatouristApplication {
 
     private static final String APP_NAME = "victoriatourist";

+ 27 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/controller/TestMsgController.java

@@ -0,0 +1,27 @@
+package com.fjhx.controller;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.websocket.service.WebSocketServer;
+import org.springblade.core.tool.api.R;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+@RestController
+@RequestMapping("/test")
+public class TestMsgController {
+
+    @Autowired
+    WebSocketServer webSocketServer;
+
+    @PostMapping("/sendMsg")
+    public void sendMsg(@RequestBody JSONObject msgs) {
+        WebSocketServer.sendInfo(msgs.getString("userId"),1,msgs);
+        R.success();
+    }
+}

+ 17 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/config/WebSocketConfig.java

@@ -0,0 +1,17 @@
+package com.fjhx.websocket.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig {
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+}

+ 36 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/entity/MessageEntity.java

@@ -0,0 +1,36 @@
+package com.fjhx.websocket.entity;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class MessageEntity {
+
+    /**
+     * 用户id
+     */
+    private String userId;
+
+    /**
+     * 会话id
+     */
+    private String sessionId;
+
+    /**
+     * 消息创建时间
+     */
+    private Date createTime;
+
+    /**
+     * 类型
+     */
+    private Integer type;
+
+    /**
+     * 消息主体
+     */
+    private JSONObject data;
+
+}

+ 22 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketOnCloseEvent.java

@@ -0,0 +1,22 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock关闭连接事件
+ */
+@Getter
+public class WebSocketOnCloseEvent extends ApplicationEvent {
+
+    public WebSocketOnCloseEvent(Object source) {
+        super(source);
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer) super.getSource();
+    }
+
+}

+ 20 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketOnErrorEvent.java

@@ -0,0 +1,20 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送的用户离线事件
+ */
+@Getter
+public class WebSocketOnErrorEvent extends ApplicationEvent {
+
+    private final Throwable throwable;
+
+    public WebSocketOnErrorEvent(WebSocketServer source, Throwable throwable) {
+        super(source);
+        this.throwable = throwable;
+    }
+
+}

+ 25 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketOnMessageEvent.java

@@ -0,0 +1,25 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock接收消息事件
+ */
+@Getter
+public class WebSocketOnMessageEvent extends ApplicationEvent {
+
+    private final String message;
+
+    public WebSocketOnMessageEvent(WebSocketServer source, String message) {
+        super(source);
+        this.message = message;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer) super.getSource();
+    }
+
+}

+ 22 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketOnOpenEvent.java

@@ -0,0 +1,22 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock建立连接事件
+ */
+@Getter
+public class WebSocketOnOpenEvent extends ApplicationEvent {
+
+    public WebSocketOnOpenEvent(WebSocketServer source) {
+        super(source);
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer)super.getSource();
+    }
+
+}

+ 26 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketSendFailEvent.java

@@ -0,0 +1,26 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.entity.MessageEntity;
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息失败事件
+ */
+@Getter
+public class WebSocketSendFailEvent extends ApplicationEvent {
+
+    private final MessageEntity messageEntity;
+
+    public WebSocketSendFailEvent(WebSocketServer source, MessageEntity messageEntity) {
+        super(source);
+        this.messageEntity = messageEntity;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer)super.getSource();
+    }
+
+}

+ 26 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketSendSuccessEvent.java

@@ -0,0 +1,26 @@
+package com.fjhx.websocket.event;
+
+import com.fjhx.websocket.entity.MessageEntity;
+import com.fjhx.websocket.service.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息成功事件
+ */
+@Getter
+public class WebSocketSendSuccessEvent extends ApplicationEvent {
+
+    private final MessageEntity messageEntity;
+
+    public WebSocketSendSuccessEvent(WebSocketServer source, MessageEntity messageEntity) {
+        super(source);
+        this.messageEntity = messageEntity;
+    }
+
+    @Override
+    public WebSocketServer getSource() {
+        return (WebSocketServer) super.getSource();
+    }
+
+}

+ 25 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/event/WebSocketUserOfflineEvent.java

@@ -0,0 +1,25 @@
+package com.fjhx.websocket.event;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.websocket.entity.MessageEntity;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+import java.util.Date;
+
+/**
+ * webStock推送的用户离线事件
+ */
+@Getter
+public class WebSocketUserOfflineEvent extends ApplicationEvent {
+
+    private final MessageEntity messageEntity = new MessageEntity();
+
+    public WebSocketUserOfflineEvent(String userId, JSONObject message) {
+        super(userId);
+        this.messageEntity.setUserId(userId);
+        this.messageEntity.setData(message);
+        this.messageEntity.setCreateTime(new Date());
+    }
+
+}

+ 207 - 0
hx-service/victoriatourist/src/main/java/com/fjhx/websocket/service/WebSocketServer.java

@@ -0,0 +1,207 @@
+package com.fjhx.websocket.service;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
+import com.fjhx.websocket.entity.MessageEntity;
+import com.fjhx.websocket.event.*;
+import io.jsonwebtoken.Claims;
+import io.undertow.servlet.spec.HttpServletRequestImpl;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.launch.constant.TokenConstant;
+import org.springblade.core.secure.utils.AuthUtil;
+import org.springblade.core.tool.utils.Func;
+import org.springblade.core.tool.utils.WebUtil;
+import org.springframework.context.ApplicationContext;
+import org.springframework.http.HttpRequest;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ServerEndpoint("/webStock/{token}")
+@Component
+@Slf4j
+@Getter
+public class WebSocketServer {
+
+    private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
+
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
+     */
+    private static final ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+    /**
+     * 接收userId
+     */
+    private String userId;
+    /**
+     * sessionId
+     */
+    private String sessionId;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(@RequestBody Session session, @PathParam("token") String token) {
+        this.session = session;
+        this.sessionId = IdWorker.getIdStr();
+        //根据Token获取userid
+        Claims claims = AuthUtil.parseJWT(token);
+        String userId = Func.toStr(claims.get(TokenConstant.USER_ID));
+        this.userId = userId;
+
+        synchronized (this) {
+            ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap =
+                    webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+
+            sessionIdWebSocketServerMap.put(sessionId, this);
+        }
+
+        // 发布建立连接连接事件
+        applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        synchronized (this) {
+            ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
+            if (sessionIdWebSocketServerMap != null) {
+                sessionIdWebSocketServerMap.remove(sessionId);
+
+                if (sessionIdWebSocketServerMap.size() == 0) {
+                    webSocketMap.remove(userId);
+                }
+
+            }
+        }
+
+        // 发布建立关闭连接事件
+        applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        // 推送消息事件
+        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
+    }
+
+    /**
+     * @param error 异常
+     */
+    @OnError
+    public void onError(Throwable error) {
+        // 发布连接异常事件
+        applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error));
+    }
+
+    /**
+     * 实现服务器主动推送
+     *
+     * @param message 消息内容
+     */
+    public void sendMessage(Integer type, JSONObject message) {
+        MessageEntity sendEntity = new MessageEntity();
+        sendEntity.setUserId(userId);
+        sendEntity.setSessionId(sessionId);
+        sendEntity.setCreateTime(new Date());
+        sendEntity.setType(type);
+        sendEntity.setData(message);
+
+        try {
+            this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
+            applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity));
+        } catch (IOException e) {
+            applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity));
+        }
+    }
+
+    /**
+     * 发送自定义消息
+     *
+     * @param userId  发送用户
+     * @param type    消息类型
+     * @param message 消息内容
+     */
+    public static void sendInfo(String userId, Integer type, JSONObject message) {
+        ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
+
+        if (sessionIdWebSocketServerMap == null) {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
+            return;
+        }
+
+        for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
+            webSocketServer.sendMessage(type, message);
+        }
+
+    }
+
+    /**
+     * 发送自定义消息
+     *
+     * @param userId  发送用户
+     * @param type    消息类型
+     * @param message 消息内容
+     */
+    public static void sendInfo(String userId, String sessionId, Integer type, JSONObject message) {
+        ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
+
+        if (sessionIdWebSocketServerMap == null) {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
+            return;
+        }
+
+        WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId);
+        if (webSocketServer != null) {
+            webSocketServer.sendMessage(type, message);
+        } else {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
+        }
+
+    }
+
+
+    /**
+     * 发送自定义消息--群发
+     *
+     * @param message 消息内容
+     */
+    public static void sendInfoGroup(Integer type, JSONObject message) {
+        if (webSocketMap.size() > 0) {
+            for (String userId : WebSocketServer.webSocketMap.keySet()) {
+                sendInfo(userId, type, message);
+            }
+        }
+    }
+
+    /**
+     * 在线人数
+     */
+    public static synchronized int getOnlineCount() {
+        return webSocketMap.size();
+    }
+
+}