123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package com.fjhx.service;
- import cn.hutool.extra.spring.SpringUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.core.toolkit.IdWorker;
- import com.fjhx.entity.MessageEntity;
- import com.fjhx.event.*;
- import lombok.Getter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.ApplicationContext;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.RequestBody;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.Date;
- import java.util.concurrent.ConcurrentHashMap;
- @ServerEndpoint("/webStock/{userId}")
- @Component
- @Slf4j
- @Getter
- public class WebSocketServer {
- private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
- /**
- * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
- */
- private static final ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
- /**
- * 与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- private Session session;
- /**
- * 接收userId
- */
- private String userId;
- /**
- * sessionId
- */
- private String sessionId;
- /**
- * 连接建立成功调用的方法
- */
- @OnOpen
- public void onOpen(@RequestBody Session session, @PathParam("userId") String userId) {
- this.session = session;
- this.sessionId = IdWorker.getIdStr();
- this.userId = userId;
- synchronized (this) {
- ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap =
- webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
- sessionIdWebSocketServerMap.put(sessionId, this);
- }
- // 发布建立连接连接事件
- applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
- }
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- synchronized (this) {
- ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
- if (sessionIdWebSocketServerMap != null) {
- sessionIdWebSocketServerMap.remove(sessionId);
- if (sessionIdWebSocketServerMap.size() == 0) {
- webSocketMap.remove(userId);
- }
- }
- }
- // 发布建立关闭连接事件
- applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
- }
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message 客户端发送过来的消息
- */
- @OnMessage
- public void onMessage(String message) {
- // 推送消息事件
- applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
- }
- /**
- * @param error 异常
- */
- @OnError
- public void onError(Throwable error) {
- // 发布连接异常事件
- applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error));
- }
- /**
- * 实现服务器主动推送
- *
- * @param message 消息内容
- */
- public void sendMessage(Integer type, JSONObject message) {
- MessageEntity sendEntity = new MessageEntity();
- sendEntity.setUserId(userId);
- sendEntity.setSessionId(sessionId);
- sendEntity.setCreateTime(new Date());
- sendEntity.setType(type);
- sendEntity.setData(message);
- try {
- this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
- applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity));
- } catch (IOException e) {
- applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity));
- }
- }
- /**
- * 发送自定义消息
- *
- * @param userId 发送用户
- * @param type 消息类型
- * @param message 消息内容
- */
- public static void sendInfo(String userId, Integer type, JSONObject message) {
- ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
- if (sessionIdWebSocketServerMap == null) {
- applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
- return;
- }
- for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
- webSocketServer.sendMessage(type, message);
- }
- }
- /**
- * 发送自定义消息
- *
- * @param userId 发送用户
- * @param type 消息类型
- * @param message 消息内容
- */
- public static void sendInfo(String userId, String sessionId, Integer type, JSONObject message) {
- ConcurrentHashMap<String, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
- if (sessionIdWebSocketServerMap == null) {
- applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
- return;
- }
- WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId);
- if (webSocketServer != null) {
- webSocketServer.sendMessage(type, message);
- } else {
- applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message));
- }
- }
- /**
- * 发送自定义消息--群发
- *
- * @param message 消息内容
- */
- public static void sendInfoGroup(Integer type, JSONObject message) {
- if (webSocketMap.size() > 0) {
- for (String userId : WebSocketServer.webSocketMap.keySet()) {
- sendInfo(userId, type, message);
- }
- }
- }
- /**
- * 在线人数
- */
- public static synchronized int getOnlineCount() {
- return webSocketMap.size();
- }
- }
|