WebSocketServer.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package com.fjhx.service;
  2. import cn.hutool.extra.spring.SpringUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.baomidou.mybatisplus.core.toolkit.IdWorker;
  6. import com.fjhx.entity.MessageEntity;
  7. import com.fjhx.event.*;
  8. import lombok.Getter;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.context.ApplicationContext;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.web.bind.annotation.RequestBody;
  13. import javax.websocket.*;
  14. import javax.websocket.server.PathParam;
  15. import javax.websocket.server.ServerEndpoint;
  16. import java.io.IOException;
  17. import java.util.Date;
  18. import java.util.concurrent.ConcurrentHashMap;
  19. @ServerEndpoint("/webStock/{userId}")
  20. @Component
  21. @Slf4j
  22. @Getter
  23. public class WebSocketServer {
  24. private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
  25. /**
  26. * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  27. */
  28. private static final ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
  29. /**
  30. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  31. */
  32. private Session session;
  33. /**
  34. * 接收userId
  35. */
  36. private String userId;
  37. /**
  38. * sessionId
  39. */
  40. private String sessionId;
  41. /**
  42. * 连接建立成功调用的方法
  43. */
  44. @OnOpen
  45. public void onOpen(@RequestBody Session session, @PathParam("userId") String userId) {
  46. this.session = session;
  47. this.sessionId = IdWorker.getIdStr();
  48. this.userId = userId;
  49. synchronized (this) {
  50. ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap =
  51. webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
  52. sessionIdWebSocketServerMap.put(sessionId, this);
  53. }
  54. // 发布建立连接连接事件
  55. applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
  56. }
  57. /**
  58. * 连接关闭调用的方法
  59. */
  60. @OnClose
  61. public void onClose() {
  62. synchronized (this) {
  63. ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
  64. if (sessionIdWebSocketServerMap != null) {
  65. sessionIdWebSocketServerMap.remove(sessionId);
  66. if (sessionIdWebSocketServerMap.size() == 0) {
  67. webSocketMap.remove(userId);
  68. }
  69. }
  70. }
  71. // 发布建立关闭连接事件
  72. applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
  73. }
  74. /**
  75. * 收到客户端消息后调用的方法
  76. *
  77. * @param message 客户端发送过来的消息
  78. */
  79. @OnMessage
  80. public void onMessage(String message) {
  81. // 推送消息事件
  82. applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
  83. }
  84. /**
  85. * @param error 异常
  86. */
  87. @OnError
  88. public void onError(Throwable error) {
  89. // 发布连接异常事件
  90. applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error));
  91. }
  92. /**
  93. * 实现服务器主动推送
  94. *
  95. * @param message 消息内容
  96. */
  97. public void sendMessage(Integer type, JSONObject message) {
  98. MessageEntity sendEntity = new MessageEntity();
  99. sendEntity.setUserId(userId);
  100. sendEntity.setSessionId(sessionId);
  101. sendEntity.setCreateTime(new Date());
  102. sendEntity.setType(type);
  103. sendEntity.setData(message);
  104. try {
  105. this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
  106. applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity));
  107. } catch (IOException e) {
  108. applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity));
  109. }
  110. }
  111. /**
  112. * 发送自定义消息
  113. *
  114. * @param userId 发送用户
  115. * @param type 消息类型
  116. * @param message 消息内容
  117. */
  118. public static void sendInfo(String userId, Integer type, JSONObject message) {
  119. ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
  120. if (sessionIdWebSocketServerMap == null) {
  121. applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
  122. return;
  123. }
  124. for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
  125. webSocketServer.sendMessage(type, message);
  126. }
  127. }
  128. /**
  129. * 发送自定义消息
  130. *
  131. * @param userId 发送用户
  132. * @param type 消息类型
  133. * @param message 消息内容
  134. */
  135. public static void sendInfo(String userId, String sessionId, Integer type, JSONObject message) {
  136. ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
  137. if (sessionIdWebSocketServerMap == null) {
  138. applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
  139. return;
  140. }
  141. WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId);
  142. if (webSocketServer != null) {
  143. webSocketServer.sendMessage(type, message);
  144. } else {
  145. applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
  146. }
  147. }
  148. /**
  149. * 发送自定义消息--群发
  150. *
  151. * @param message 消息内容
  152. */
  153. public static void sendInfoGroup(Integer type, JSONObject message) {
  154. if (webSocketMap.size() > 0) {
  155. for (String userId : WebSocketServer.webSocketMap.keySet()) {
  156. sendInfo(userId, type, message);
  157. }
  158. }
  159. }
  160. /**
  161. * 在线人数
  162. */
  163. public static synchronized int getOnlineCount() {
  164. return webSocketMap.size();
  165. }
  166. }