瀏覽代碼

webSocket

home 2 年之前
父節點
當前提交
72b9a7153c

+ 31 - 0
hx-common/common-websocket/src/main/java/com/fjhx/entity/MessageEntity.java

@@ -0,0 +1,31 @@
+package com.fjhx.entity;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class MessageEntity {
+
+    /**
+     * 用户id
+     */
+    private String userId;
+
+    /**
+     * 会话id
+     */
+    private String sessionId;
+
+    /**
+     * 消息创建时间
+     */
+    private Date createTime;
+
+    /**
+     * 消息主体
+     */
+    private JSONObject data;
+
+}

+ 0 - 30
hx-common/common-websocket/src/main/java/com/fjhx/entity/SendEntity.java

@@ -1,30 +0,0 @@
-package com.fjhx.entity;
-
-import com.fasterxml.jackson.annotation.JsonFormat;
-import lombok.Data;
-import org.springblade.core.tool.utils.DateUtil;
-import org.springframework.format.annotation.DateTimeFormat;
-
-import java.util.Date;
-
-@Data
-public class SendEntity {
-
-    /**
-     * 消息创建时间
-     */
-    @DateTimeFormat(pattern = DateUtil.PATTERN_DATETIME)
-    @JsonFormat(pattern = DateUtil.PATTERN_DATETIME)
-    private Date createTime;
-
-    /**
-     * 消息发送人
-     */
-    private String userId;
-
-    /**
-     * 消息主体
-     */
-    private String message;
-
-}

+ 2 - 5
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnCloseEvent.java

@@ -10,16 +10,13 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketOnCloseEvent extends ApplicationEvent {
 
-    private final String userId;
-
-    public WebSocketOnCloseEvent(WebSocketServer source, String userId) {
+    public WebSocketOnCloseEvent(Object source) {
         super(source);
-        this.userId = userId;
     }
 
     @Override
     public WebSocketServer getSource() {
-        return (WebSocketServer)super.getSource();
+        return (WebSocketServer) super.getSource();
     }
 
 }

+ 1 - 3
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnErrorEvent.java

@@ -10,12 +10,10 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketOnErrorEvent extends ApplicationEvent {
 
-    private final String userId;
     private final Throwable throwable;
 
-    public WebSocketOnErrorEvent(WebSocketServer source, String userId, Throwable throwable) {
+    public WebSocketOnErrorEvent(WebSocketServer source, Throwable throwable) {
         super(source);
-        this.userId = userId;
         this.throwable = throwable;
     }
 

+ 1 - 18
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnMessageEvent.java

@@ -1,8 +1,6 @@
 package com.fjhx.event;
 
-import com.alibaba.fastjson.JSONObject;
 import com.fjhx.service.WebSocketServer;
-import com.fjhx.utils.Assert;
 import lombok.Getter;
 import org.springframework.context.ApplicationEvent;
 
@@ -12,12 +10,10 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketOnMessageEvent extends ApplicationEvent {
 
-    private final String userId;
     private final String message;
 
-    public WebSocketOnMessageEvent(WebSocketServer source, String userId, String message) {
+    public WebSocketOnMessageEvent(WebSocketServer source, String message) {
         super(source);
-        this.userId = userId;
         this.message = message;
     }
 
@@ -26,17 +22,4 @@ public class WebSocketOnMessageEvent extends ApplicationEvent {
         return (WebSocketServer) super.getSource();
     }
 
-    public JSONObject getMessageObj() {
-        return JSONObject.parseObject(message);
-    }
-
-    /**
-     * 消息类型
-     */
-    public Integer getType() {
-        Integer type = getMessageObj().getInteger("type");
-        Assert.notEmpty(type, "消息类型不能为空");
-        return type;
-    }
-
 }

+ 1 - 4
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketOnOpenEvent.java

@@ -10,11 +10,8 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketOnOpenEvent extends ApplicationEvent {
 
-    private final String userId;
-
-    public WebSocketOnOpenEvent(WebSocketServer source, String userId) {
+    public WebSocketOnOpenEvent(WebSocketServer source) {
         super(source);
-        this.userId = userId;
     }
 
     @Override

+ 4 - 4
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketSendFailEvent.java

@@ -1,6 +1,6 @@
 package com.fjhx.event;
 
-import com.fjhx.entity.SendEntity;
+import com.fjhx.entity.MessageEntity;
 import com.fjhx.service.WebSocketServer;
 import lombok.Getter;
 import org.springframework.context.ApplicationEvent;
@@ -11,11 +11,11 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketSendFailEvent extends ApplicationEvent {
 
-    private final SendEntity sendEntity;
+    private final MessageEntity messageEntity;
 
-    public WebSocketSendFailEvent(WebSocketServer source, SendEntity sendEntity) {
+    public WebSocketSendFailEvent(WebSocketServer source, MessageEntity messageEntity) {
         super(source);
-        this.sendEntity = sendEntity;
+        this.messageEntity = messageEntity;
     }
 
     @Override

+ 4 - 4
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketSendSuccessEvent.java

@@ -1,6 +1,6 @@
 package com.fjhx.event;
 
-import com.fjhx.entity.SendEntity;
+import com.fjhx.entity.MessageEntity;
 import com.fjhx.service.WebSocketServer;
 import lombok.Getter;
 import org.springframework.context.ApplicationEvent;
@@ -11,11 +11,11 @@ import org.springframework.context.ApplicationEvent;
 @Getter
 public class WebSocketSendSuccessEvent extends ApplicationEvent {
 
-    private final SendEntity sendEntity;
+    private final MessageEntity messageEntity;
 
-    public WebSocketSendSuccessEvent(WebSocketServer source, SendEntity sendEntity) {
+    public WebSocketSendSuccessEvent(WebSocketServer source, MessageEntity messageEntity) {
         super(source);
-        this.sendEntity = sendEntity;
+        this.messageEntity = messageEntity;
     }
 
     @Override

+ 7 - 6
hx-common/common-websocket/src/main/java/com/fjhx/event/WebSocketUserOfflineEvent.java

@@ -1,6 +1,7 @@
 package com.fjhx.event;
 
-import com.fjhx.entity.SendEntity;
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.entity.MessageEntity;
 import lombok.Getter;
 import org.springframework.context.ApplicationEvent;
 
@@ -12,13 +13,13 @@ import java.util.Date;
 @Getter
 public class WebSocketUserOfflineEvent extends ApplicationEvent {
 
-    private final SendEntity sendEntity = new SendEntity();
+    private final MessageEntity messageEntity = new MessageEntity();
 
-    public WebSocketUserOfflineEvent(String userId, String message) {
+    public WebSocketUserOfflineEvent(String userId, JSONObject message) {
         super(userId);
-        this.sendEntity.setUserId(userId);
-        this.sendEntity.setMessage(message);
-        this.sendEntity.setCreateTime(new Date());
+        this.messageEntity.setUserId(userId);
+        this.messageEntity.setData(message);
+        this.messageEntity.setCreateTime(new Date());
     }
 
 }

+ 33 - 16
hx-common/common-websocket/src/main/java/com/fjhx/service/WebSocketServer.java

@@ -2,8 +2,10 @@ package com.fjhx.service;
 
 import cn.hutool.extra.spring.SpringUtil;
 import com.alibaba.fastjson.JSON;
-import com.fjhx.entity.SendEntity;
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.entity.MessageEntity;
 import com.fjhx.event.*;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.ApplicationContext;
 import org.springframework.stereotype.Component;
@@ -19,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @ServerEndpoint("/webStock/{userId}")
 @Component
 @Slf4j
+@Getter
 public class WebSocketServer {
 
     private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
@@ -26,7 +29,7 @@ public class WebSocketServer {
     /**
      * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
      */
-    private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    private static final ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
@@ -35,6 +38,10 @@ public class WebSocketServer {
      * 接收userId
      */
     private String userId;
+    /**
+     * sessionId
+     */
+    private String sessionId;
 
     /**
      * 连接建立成功调用的方法
@@ -42,12 +49,18 @@ public class WebSocketServer {
     @OnOpen
     public void onOpen(@RequestBody Session session, @PathParam("userId") String userId) {
         this.session = session;
+        this.sessionId = session.getId();
         this.userId = userId;
-        webSocketMap.remove(userId);
-        webSocketMap.put(userId, this);
+
+        synchronized (this) {
+            ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap =
+                    webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+
+            sessionIdWebSocketServerMap.put(sessionId, this);
+        }
 
         // 发布建立连接连接事件
-        applicationContext.publishEvent(new WebSocketOnOpenEvent(this, userId));
+        applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
     }
 
     /**
@@ -58,7 +71,7 @@ public class WebSocketServer {
         webSocketMap.remove(userId);
 
         // 发布建立关闭连接事件
-        applicationContext.publishEvent(new WebSocketOnCloseEvent(this, userId));
+        applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
     }
 
     /**
@@ -69,7 +82,7 @@ public class WebSocketServer {
     @OnMessage
     public void onMessage(String message) {
         // 推送消息事件
-        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, userId, message));
+        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
     }
 
     /**
@@ -78,7 +91,7 @@ public class WebSocketServer {
     @OnError
     public void onError(Throwable error) {
         // 发布连接异常事件
-        applicationContext.publishEvent(new WebSocketOnErrorEvent(this, userId, error));
+        applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error));
     }
 
     /**
@@ -86,11 +99,12 @@ public class WebSocketServer {
      *
      * @param message 消息内容
      */
-    public void sendMessage(String message) {
-        SendEntity sendEntity = new SendEntity();
+    public void sendMessage(JSONObject message) {
+        MessageEntity sendEntity = new MessageEntity();
         sendEntity.setUserId(userId);
+        sendEntity.setSessionId(sessionId);
         sendEntity.setCreateTime(new Date());
-        sendEntity.setMessage(message);
+        sendEntity.setData(message);
 
         try {
             this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
@@ -106,15 +120,18 @@ public class WebSocketServer {
      * @param userId  发送用户
      * @param message 消息内容
      */
-    public static void sendInfo(String userId, String message) {
-        WebSocketServer webSocketServer = webSocketMap.get(userId);
+    public static void sendInfo(String userId, JSONObject message) {
+        ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
 
-        if (webSocketServer == null) {
+        if (sessionIdWebSocketServerMap == null) {
             applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
             return;
         }
 
-        webSocketServer.sendMessage(message);
+        for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
+            webSocketServer.sendMessage(message);
+        }
+
     }
 
     /**
@@ -122,7 +139,7 @@ public class WebSocketServer {
      *
      * @param message 消息内容
      */
-    public static void sendInfoGroup(String message) {
+    public static void sendInfoGroup(JSONObject message) {
         if (webSocketMap.size() > 0) {
             for (String userId : WebSocketServer.webSocketMap.keySet()) {
                 sendInfo(userId, message);