package com.fjhx.service; import cn.hutool.extra.spring.SpringUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.IdWorker; import com.fjhx.entity.MessageEntity; import com.fjhx.event.*; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/webStock/{userId}") @Component @Slf4j @Getter public class WebSocketServer { private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext(); /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static final ConcurrentHashMap> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 接收userId */ private String userId; /** * sessionId */ private String sessionId; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(@RequestBody Session session, @PathParam("userId") String userId) { this.session = session; this.sessionId = IdWorker.getIdStr(); this.userId = userId; synchronized (this) { ConcurrentHashMap sessionIdWebSocketServerMap = webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); sessionIdWebSocketServerMap.put(sessionId, this); } // 发布建立连接连接事件 applicationContext.publishEvent(new WebSocketOnOpenEvent(this)); } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { synchronized (this) { ConcurrentHashMap sessionIdWebSocketServerMap = webSocketMap.get(userId); if (sessionIdWebSocketServerMap != null) { sessionIdWebSocketServerMap.remove(sessionId); if (sessionIdWebSocketServerMap.size() == 0) { webSocketMap.remove(userId); } } } // 发布建立关闭连接事件 applicationContext.publishEvent(new WebSocketOnCloseEvent(this)); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message) { // 推送消息事件 applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message)); } /** * @param error 异常 */ @OnError public void onError(Throwable error) { // 发布连接异常事件 applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error)); } /** * 实现服务器主动推送 * * @param message 消息内容 */ public void sendMessage(Integer type, JSONObject message) { MessageEntity sendEntity = new MessageEntity(); sendEntity.setUserId(userId); sendEntity.setSessionId(sessionId); sendEntity.setCreateTime(new Date()); sendEntity.setType(type); sendEntity.setData(message); try { this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity)); applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity)); } catch (IOException e) { applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity)); } } /** * 发送自定义消息 * * @param userId 发送用户 * @param type 消息类型 * @param message 消息内容 */ public static void sendInfo(String userId, Integer type, JSONObject message) { ConcurrentHashMap sessionIdWebSocketServerMap = webSocketMap.get(userId); if (sessionIdWebSocketServerMap == null) { applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message)); return; } for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) { webSocketServer.sendMessage(type, message); } } /** * 发送自定义消息 * * @param userId 发送用户 * @param type 消息类型 * @param message 消息内容 */ public static void sendInfo(String userId, String sessionId, Integer type, JSONObject message) { ConcurrentHashMap sessionIdWebSocketServerMap = webSocketMap.get(userId); if (sessionIdWebSocketServerMap == null) { applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message)); return; } WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId); if (webSocketServer != null) { webSocketServer.sendMessage(type, message); } else { applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, message)); } } /** * 发送自定义消息--群发 * * @param message 消息内容 */ public static void sendInfoGroup(Integer type, JSONObject message) { if (webSocketMap.size() > 0) { for (String userId : WebSocketServer.webSocketMap.keySet()) { sendInfo(userId, type, message); } } } /** * 在线人数 */ public static synchronized int getOnlineCount() { return webSocketMap.size(); } }