浏览代码

消息推送

24282 1 年之前
父节点
当前提交
fb019d3f5b
共有 39 个文件被更改,包括 1336 次插入449 次删除
  1. 1 1
      hx-base/pom.xml
  2. 5 5
      hx-file/src/test/java/Main.java
  3. 68 0
      hx-socket/src/main/java/com/fjhx/socket/controller/push/PushAnnouncementController.java
  4. 51 0
      hx-socket/src/main/java/com/fjhx/socket/controller/push/PushInfoController.java
  5. 55 0
      hx-socket/src/main/java/com/fjhx/socket/core/PushParam.java
  6. 60 0
      hx-socket/src/main/java/com/fjhx/socket/core/PushTypeEnum.java
  7. 117 0
      hx-socket/src/main/java/com/fjhx/socket/core/WebSocketListener.java
  8. 130 0
      hx-socket/src/main/java/com/fjhx/socket/core/WebSocketPush.java
  9. 186 0
      hx-socket/src/main/java/com/fjhx/socket/core/WebSocketServer.java
  10. 22 0
      hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketOnMessageEvent.java
  11. 20 0
      hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketOnOpenEvent.java
  12. 24 0
      hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketSendFailEvent.java
  13. 24 0
      hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketSendSuccessEvent.java
  14. 23 0
      hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketUserOfflineEvent.java
  15. 0 37
      hx-socket/src/main/java/com/fjhx/socket/entity/MessageEntity.java
  16. 17 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushAnnouncementDto.java
  17. 17 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushAnnouncementSelectDto.java
  18. 17 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushInfoDto.java
  19. 30 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushInfoSelectDto.java
  20. 47 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/po/PushAnnouncement.java
  21. 63 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/po/PushInfo.java
  22. 22 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/vo/PushAnnouncementVo.java
  23. 17 0
      hx-socket/src/main/java/com/fjhx/socket/entity/push/vo/PushInfoVo.java
  24. 0 22
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnCloseEvent.java
  25. 0 20
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnErrorEvent.java
  26. 0 25
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnMessageEvent.java
  27. 0 22
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnOpenEvent.java
  28. 0 27
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketSendFailEvent.java
  29. 0 26
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketSendSuccessEvent.java
  30. 0 25
      hx-socket/src/main/java/com/fjhx/socket/event/WebSocketUserOfflineEvent.java
  31. 26 0
      hx-socket/src/main/java/com/fjhx/socket/mapper/push/PushAnnouncementMapper.java
  32. 26 0
      hx-socket/src/main/java/com/fjhx/socket/mapper/push/PushInfoMapper.java
  33. 0 239
      hx-socket/src/main/java/com/fjhx/socket/service/WebSocketServer.java
  34. 46 0
      hx-socket/src/main/java/com/fjhx/socket/service/push/PushAnnouncementService.java
  35. 38 0
      hx-socket/src/main/java/com/fjhx/socket/service/push/PushInfoService.java
  36. 80 0
      hx-socket/src/main/java/com/fjhx/socket/service/push/impl/PushAnnouncementServiceImpl.java
  37. 60 0
      hx-socket/src/main/java/com/fjhx/socket/service/push/impl/PushInfoServiceImpl.java
  38. 20 0
      hx-socket/src/main/resources/mapper/push/PushAnnouncementMapper.xml
  39. 24 0
      hx-socket/src/main/resources/mapper/push/PushInfoMapper.xml

+ 1 - 1
hx-base/pom.xml

@@ -11,7 +11,7 @@
 
     <groupId>com.fjhx</groupId>
     <artifactId>hx-base</artifactId>
-    <version>1.0.1</version>
+    <version>1.0.2</version>
 
     <properties>
         <maven.compiler.source>8</maven.compiler.source>

+ 5 - 5
hx-file/src/test/java/Main.java

@@ -4,12 +4,12 @@ public class Main {
 
     public static void main(String[] args) {
         GeneratorApplication.builder()
-                .url("jdbc:mysql://36.134.91.96:17330/bytesailing_kd100?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true")
-                .username("fjhx2012mysql")
-                .password("3PN-Mzn#vnP&q6d")
+                .url("jdbc:mysql://36.134.91.96:12333/bytesailing_base?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&nullCatalogMeansCurrent=true")
+                .username("root")
+                .password("5fWD*oa^nso@kmKa")
                 .port(9989)
-                .module("hx-kd100")
-                .parent("com.fjhx.kd100")
+                .module("hx-socket")
+                .parent("com.fjhx.socket")
                 .superServiceClass("com.ruoyi.common.core.service.BaseService")
                 .build();
     }

+ 68 - 0
hx-socket/src/main/java/com/fjhx/socket/controller/push/PushAnnouncementController.java

@@ -0,0 +1,68 @@
+package com.fjhx.socket.controller.push;
+
+import org.springframework.web.bind.annotation.*;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.vo.PushAnnouncementVo;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementSelectDto;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementDto;
+import com.ruoyi.common.core.domain.BaseSelectDto;
+import com.fjhx.socket.service.push.PushAnnouncementService;
+import org.springframework.beans.factory.annotation.Autowired;
+
+
+/**
+ * <p>
+ * 公告 前端控制器
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@RestController
+@RequestMapping("/pushAnnouncement")
+public class PushAnnouncementController {
+
+    @Autowired
+    private PushAnnouncementService pushAnnouncementService;
+
+    /**
+     * 公告分页
+     */
+    @PostMapping("/page")
+    public Page<PushAnnouncementVo> page(@RequestBody PushAnnouncementSelectDto dto) {
+        return pushAnnouncementService.getPage(dto);
+    }
+
+    /**
+     * 公告明细
+     */
+    @PostMapping("/detail")
+    public PushAnnouncementVo detail(@RequestBody BaseSelectDto dto) {
+        return pushAnnouncementService.detail(dto.getId());
+    }
+
+    /**
+     * 公告新增
+     */
+    @PostMapping("/add")
+    public void add(@RequestBody PushAnnouncementDto pushAnnouncementDto) {
+        pushAnnouncementService.add(pushAnnouncementDto);
+    }
+
+    /**
+     * 公告编辑
+     */
+    @PostMapping("/edit")
+    public void edit(@RequestBody PushAnnouncementDto pushAnnouncementDto) {
+        pushAnnouncementService.edit(pushAnnouncementDto);
+    }
+
+    /**
+     * 公告删除
+     */
+    @PostMapping("/delete")
+    public void delete(@RequestBody BaseSelectDto dto) {
+        pushAnnouncementService.delete(dto.getId());
+    }
+
+}

+ 51 - 0
hx-socket/src/main/java/com/fjhx/socket/controller/push/PushInfoController.java

@@ -0,0 +1,51 @@
+package com.fjhx.socket.controller.push;
+
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.dto.PushInfoSelectDto;
+import com.fjhx.socket.entity.push.vo.PushInfoVo;
+import com.fjhx.socket.service.push.PushInfoService;
+import com.ruoyi.common.core.domain.BaseSelectDto;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+
+/**
+ * <p>
+ * 消息推送 前端控制器
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@RestController
+@RequestMapping("/pushInfo")
+public class PushInfoController {
+
+    @Autowired
+    private PushInfoService pushInfoService;
+
+    /**
+     * 消息推送分页
+     */
+    @PostMapping("/page")
+    public Page<PushInfoVo> page(@RequestBody PushInfoSelectDto dto) {
+        return pushInfoService.getPage(dto);
+    }
+
+    /**
+     * 消息推送明细
+     */
+    @PostMapping("/detail")
+    public PushInfoVo detail(@RequestBody BaseSelectDto dto) {
+        return pushInfoService.detail(dto.getId());
+    }
+
+    @PostMapping("/read")
+    public void read(@RequestBody PushInfoSelectDto dto) {
+        pushInfoService.read(dto.getIdList());
+    }
+
+}

+ 55 - 0
hx-socket/src/main/java/com/fjhx/socket/core/PushParam.java

@@ -0,0 +1,55 @@
+package com.fjhx.socket.core;
+
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class PushParam {
+
+    /**
+     * 推送id
+     */
+    private Long pushId;
+
+    /**
+     * 用户id
+     */
+    private Long userId;
+
+    /**
+     * 推送类型
+     */
+    private Integer type;
+
+    /**
+     * 标题
+     */
+    private String title;
+
+    /**
+     * 公告id
+     */
+    private Long announcementId;
+
+    /**
+     * 业务类型
+     */
+    private Integer businessType;
+
+    /**
+     * 业务数据
+     */
+    private Object businessData;
+
+    /**
+     * 是否是新消息
+     */
+    private Boolean newMsg;
+
+    /**
+     * 消息创建时间
+     */
+    private Date createTime;
+
+}

+ 60 - 0
hx-socket/src/main/java/com/fjhx/socket/core/PushTypeEnum.java

@@ -0,0 +1,60 @@
+package com.fjhx.socket.core;
+
+import com.ruoyi.common.exception.ServiceException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+@AllArgsConstructor
+public enum PushTypeEnum {
+
+    /**
+     * 系统提示
+     */
+    SYSTEM(-1, "系统", false),
+    /**
+     * 推送完整未读消息列表
+     */
+    ANNOUNCEMENT(1, "公告", true),
+    /**
+     * 推送未读数量
+     */
+    MESSAGE(2, "消息", true),
+    /**
+     * 推送消息对象
+     */
+    PUSH_NOTIFICATION(3, "闪动消息", false);
+
+    /**
+     * 消息类型
+     */
+    private final int type;
+    /**
+     * 消息说明
+     */
+    private final String remark;
+    /**
+     * 是否保存到数据库
+     */
+    private final boolean save;
+
+    private static final Map<Integer, PushTypeEnum> map = new HashMap<>();
+
+    static {
+        for (PushTypeEnum pushTypeEnum : PushTypeEnum.values()) {
+            map.put(pushTypeEnum.getType(), pushTypeEnum);
+        }
+    }
+
+    public static PushTypeEnum getEnum(Integer key) {
+        PushTypeEnum pushTypeEnum = map.get(key);
+        if (pushTypeEnum == null) {
+            throw new ServiceException("未知推送类型");
+        }
+        return pushTypeEnum;
+    }
+
+}

+ 117 - 0
hx-socket/src/main/java/com/fjhx/socket/core/WebSocketListener.java

@@ -0,0 +1,117 @@
+package com.fjhx.socket.core;
+
+import cn.hutool.core.util.ObjectUtil;
+import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
+import com.fjhx.socket.core.event.WebSocketOnOpenEvent;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementDto;
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.fjhx.socket.service.push.PushAnnouncementService;
+import com.fjhx.socket.service.push.PushInfoService;
+import com.fjhx.socket.service.push.impl.PushAnnouncementServiceImpl;
+import com.ruoyi.common.constant.BaseSourceConstant;
+import com.ruoyi.common.constant.StatusConstant;
+import com.ruoyi.common.core.domain.entity.SysUser;
+import com.ruoyi.common.core.redis.RedisCache;
+import com.ruoyi.framework.mybatis.holder.TenantHolder;
+import com.ruoyi.system.service.ISysUserService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Component
+public class WebSocketListener {
+
+    @Autowired
+    private RedisCache redisCache;
+
+    @Autowired
+    private PushInfoService pushInfoService;
+
+    @Autowired
+    private ISysUserService sysUserService;
+
+    @Autowired
+    private PushAnnouncementService pushAnnouncementService;
+
+    /**
+     * 用户连接上webSocket
+     */
+    @EventListener
+    public void open(WebSocketOnOpenEvent event) {
+        WebSocketServer webSocketServer = event.getWebSocketServer();
+
+        PushParam pushParam = new PushParam();
+        pushParam.setUserId(webSocketServer.getUserId());
+        webSocketServer.sendMessage(PushTypeEnum.ANNOUNCEMENT, pushParam);
+        webSocketServer.sendMessage(PushTypeEnum.MESSAGE, pushParam);
+    }
+
+    /**
+     * 每5分钟同步查看一次消息公告
+     */
+    @Scheduled(fixedDelay = 3 * 60 * 1000)
+    private void syncOrder() {
+
+        DynamicDataSourceContextHolder.push(BaseSourceConstant.BASE);
+        Boolean oldIgnore = TenantHolder.isIgnore();
+        TenantHolder.setIgnore(true);
+        List<SysUser> sysUserList = sysUserService.list();
+        List<Long> userIdList = sysUserList.stream().map(SysUser::getUserId).collect(Collectors.toList());
+        TenantHolder.setIgnore(oldIgnore);
+        DynamicDataSourceContextHolder.poll();
+
+        Set<String> redisKeySet = redisCache.scan(PushAnnouncementServiceImpl.PUSH_ANNOUNCEMENT_REDIS_KEY + "*");
+
+        if (ObjectUtil.isEmpty(redisKeySet)) {
+            return;
+        }
+
+        for (String redisKey : redisKeySet) {
+
+            PushAnnouncementDto pushAnnouncementDto = redisCache.getCacheObject(redisKey);
+            Date sendTime = pushAnnouncementDto.getSendTime();
+            Date endTime = pushAnnouncementDto.getEndTime();
+
+            if (sendTime != null && sendTime.after(new Date())) {
+                continue;
+            }
+
+            List<PushInfo> pushInfoList = userIdList.stream().map(item -> {
+                PushInfo pushInfo = new PushInfo();
+                pushInfo.setAnnouncementId(pushAnnouncementDto.getId());
+                pushInfo.setTitle(pushAnnouncementDto.getTitle());
+                pushInfo.setBusinessType(0);
+                pushInfo.setBusinessData(pushAnnouncementDto.getContent());
+                pushInfo.setPushUserId(item);
+                pushInfo.setPushRead(StatusConstant.NO);
+                pushInfo.setFailureTime(endTime);
+                return pushInfo;
+            }).collect(Collectors.toList());
+
+            pushAnnouncementDto.setStatus(StatusConstant.YES);
+
+            redisCache.deleteObject(redisKey);
+            pushInfoService.saveBatch(pushInfoList);
+            pushAnnouncementService.updateById(pushAnnouncementDto);
+
+        }
+
+        WebSocketServer.webSocketMap.forEach((k, v) -> {
+            PushParam pushParam = new PushParam();
+            if (ObjectUtil.isNotEmpty(v)) {
+                for (WebSocketServer webSocketServer : v) {
+                    pushParam.setUserId(webSocketServer.getUserId());
+                    webSocketServer.sendMessage(PushTypeEnum.ANNOUNCEMENT, pushParam);
+                }
+            }
+        });
+
+    }
+
+}

+ 130 - 0
hx-socket/src/main/java/com/fjhx/socket/core/WebSocketPush.java

@@ -0,0 +1,130 @@
+package com.fjhx.socket.core;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.extra.spring.SpringUtil;
+import com.alibaba.fastjson2.JSONObject;
+import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
+import com.baomidou.mybatisplus.core.toolkit.StringPool;
+import com.fjhx.socket.core.event.WebSocketUserOfflineEvent;
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.fjhx.socket.service.push.PushInfoService;
+import com.ruoyi.common.constant.BaseSourceConstant;
+import com.ruoyi.common.constant.StatusConstant;
+import com.ruoyi.common.core.domain.entity.SysUser;
+import com.ruoyi.framework.mybatis.holder.TenantHolder;
+import com.ruoyi.system.service.ISysUserService;
+import org.springframework.context.ApplicationContext;
+
+import java.util.Date;
+import java.util.List;
+
+public class WebSocketPush {
+
+    private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
+    private static final ISysUserService sysUserService = SpringUtil.getBean(ISysUserService.class);
+    private static final PushInfoService pushInfoService = SpringUtil.getBean(PushInfoService.class);
+
+    /**
+     * 发送用户消息
+     */
+    public static void byUser(PushTypeEnum pushTypeEnum, Long userId, String title, Integer businessType) {
+        byUser(pushTypeEnum, userId, title, businessType, StringPool.EMPTY);
+    }
+
+    /**
+     * 发送用户消息
+     */
+    public static void byUser(PushTypeEnum pushTypeEnum, Long userId, String title, Integer businessType, Object businessData) {
+        PushParam pushParam = new PushParam();
+        pushParam.setPushId(IdWorker.getId());
+        pushParam.setUserId(userId);
+        pushParam.setType(pushTypeEnum.getType());
+        pushParam.setTitle(title);
+        pushParam.setBusinessType(businessType);
+        pushParam.setBusinessData(businessData);
+        pushParam.setNewMsg(true);
+        pushParam.setCreateTime(new Date());
+        byUser(pushTypeEnum, userId, pushParam);
+    }
+
+    /**
+     * 发送用户消息
+     */
+    protected static void byUser(PushTypeEnum pushTypeEnum, Long userId, PushParam pushParam) {
+
+        // 保存用户消息
+        if (pushTypeEnum.isSave()) {
+            savePushInfo(pushTypeEnum, pushParam);
+        }
+
+        List<WebSocketServer> webSocketServerList = WebSocketServer.webSocketMap.get(userId);
+        if (ObjectUtil.isEmpty(webSocketServerList)) {
+            applicationContext.publishEvent(new WebSocketUserOfflineEvent(pushTypeEnum, pushParam));
+            return;
+        }
+
+        for (WebSocketServer webSocketServer : webSocketServerList) {
+            webSocketServer.sendMessage(pushTypeEnum, pushParam);
+        }
+
+    }
+
+    /**
+     * 发送租户消息
+     */
+    public static void byTenant(PushTypeEnum pushTypeEnum, String title, Integer businessType) {
+        byTenant(pushTypeEnum, title, businessType, StringPool.EMPTY);
+    }
+
+    /**
+     * 发送租户消息
+     */
+    public static void byTenant(PushTypeEnum pushTypeEnum, String title, Integer businessType, Object businessData) {
+        DynamicDataSourceContextHolder.push(BaseSourceConstant.BASE);
+        List<SysUser> sysUserList = sysUserService.list();
+        for (SysUser sysUser : sysUserList) {
+            byUser(pushTypeEnum, sysUser.getUserId(), title, businessType, businessData);
+        }
+        DynamicDataSourceContextHolder.poll();
+    }
+
+    /**
+     * 发送全部消息
+     */
+    public static void all(PushTypeEnum pushTypeEnum, String title, Integer businessType) {
+        all(pushTypeEnum, title, businessType, StringPool.EMPTY);
+    }
+
+    /**
+     * 发送全部消息
+     */
+    public static void all(PushTypeEnum pushTypeEnum, String title, Integer businessType, Object businessData) {
+        DynamicDataSourceContextHolder.push(BaseSourceConstant.BASE);
+        Boolean oldIgnore = TenantHolder.isIgnore();
+        TenantHolder.setIgnore(true);
+        List<SysUser> sysUserList = sysUserService.list();
+        for (SysUser sysUser : sysUserList) {
+            byUser(pushTypeEnum, sysUser.getUserId(), title, businessType, businessData);
+        }
+        TenantHolder.setIgnore(oldIgnore);
+        DynamicDataSourceContextHolder.poll();
+    }
+
+    /**
+     * 保存消息
+     */
+    private static void savePushInfo(PushTypeEnum pushTypeEnum, PushParam pushParam) {
+        PushInfo pushInfo = new PushInfo();
+        pushInfo.setId(pushParam.getPushId());
+        pushInfo.setType(pushTypeEnum.getType());
+        pushInfo.setAnnouncementId(pushParam.getAnnouncementId());
+        pushInfo.setTitle(pushParam.getTitle());
+        pushInfo.setBusinessType(pushParam.getBusinessType());
+        pushInfo.setBusinessData(JSONObject.toJSONString(pushParam.getBusinessData()));
+        pushInfo.setPushUserId(pushParam.getUserId());
+        pushInfo.setPushRead(StatusConstant.NO);
+        pushInfoService.save(pushInfo);
+    }
+
+}

+ 186 - 0
hx-socket/src/main/java/com/fjhx/socket/core/WebSocketServer.java

@@ -0,0 +1,186 @@
+package com.fjhx.socket.core;
+
+import cn.hutool.extra.spring.SpringUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
+import com.fjhx.socket.core.event.WebSocketOnMessageEvent;
+import com.fjhx.socket.core.event.WebSocketOnOpenEvent;
+import com.fjhx.socket.core.event.WebSocketSendFailEvent;
+import com.fjhx.socket.core.event.WebSocketSendSuccessEvent;
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.fjhx.socket.service.push.PushInfoService;
+import com.ruoyi.common.constant.StatusConstant;
+import com.ruoyi.common.core.domain.entity.SysUser;
+import com.ruoyi.common.core.domain.model.LoginUser;
+import com.ruoyi.framework.web.service.TokenService;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ServerEndpoint("/webStock/{token}")
+@Component
+@Slf4j
+@Getter
+public class WebSocketServer {
+
+    /**
+     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象
+     */
+    protected static final Map<Long, List<WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
+
+    private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
+    private static final TokenService tokenService = SpringUtil.getBean(TokenService.class);
+    private static final PushInfoService pushInfoService = SpringUtil.getBean(PushInfoService.class);
+
+    /**
+     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
+     */
+    private Session session;
+
+    /**
+     * sessionId
+     */
+    private Long sessionId;
+
+    /**
+     * 租户id
+     */
+    private String tenantId;
+
+    /**
+     * userId
+     */
+    private Long userId;
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(@RequestBody Session session, @PathParam("token") String token) {
+
+        // 解析token
+        LoginUser loginUser = tokenService.getLoginUser(token);
+
+        // token错误
+        if (loginUser == null) {
+            try {
+                PushParam pushParam = new PushParam();
+                pushParam.setTitle("token错误");
+                pushParam.setCreateTime(new Date());
+                this.sendMessage(PushTypeEnum.SYSTEM, pushParam);
+                this.session.close();
+            } catch (Exception e) {
+                log.error("webSocket连接失败", e);
+            }
+            return;
+        }
+
+        SysUser user = loginUser.getUser();
+        this.session = session;
+        this.sessionId = IdWorker.getId();
+        this.userId = user.getUserId();
+        this.tenantId = user.getTenantId();
+
+        synchronized (WebSocketServer.class) {
+            List<WebSocketServer> webSocketServerList = webSocketMap.computeIfAbsent(userId, k -> new ArrayList<>());
+            webSocketServerList.add(this);
+        }
+
+        // 发布建立连接连接事件
+        applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose() {
+        synchronized (WebSocketServer.class) {
+            List<WebSocketServer> webSocketServerList = webSocketMap.get(userId);
+            webSocketServerList.remove(this);
+
+            if (webSocketServerList.size() == 0) {
+                webSocketMap.remove(userId);
+            }
+        }
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message) {
+        // 推送消息事件
+        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
+    }
+
+    /**
+     * @param error 异常
+     */
+    @OnError
+    public void onError(Throwable error) {
+        log.error("webSocket连接异常", error);
+    }
+
+    /**
+     * 实现服务器主动推送
+     */
+    protected void sendMessage(PushTypeEnum pushTypeEnum, PushParam pushParam) {
+
+        JSONObject msg = new JSONObject();
+        msg.put("type", pushTypeEnum.getType());
+
+        switch (pushTypeEnum) {
+            case SYSTEM:
+                msg.put("content", pushParam.getTitle());
+                break;
+            case ANNOUNCEMENT:
+                List<PushInfo> list = pushInfoService.list(q -> q
+                        .eq(PushInfo::getType, PushTypeEnum.ANNOUNCEMENT.getType())
+                        .eq(PushInfo::getPushUserId, pushParam.getUserId())
+                        .eq(PushInfo::getPushRead, StatusConstant.NO)
+                        .and(s -> s.isNull(PushInfo::getFailureTime).or().le(PushInfo::getFailureTime, new Date()))
+                );
+                msg.put("list", list);
+                break;
+            case MESSAGE:
+                Long count = pushInfoService.count(q -> q
+                        .eq(PushInfo::getType, PushTypeEnum.MESSAGE.getType())
+                        .eq(PushInfo::getPushUserId, pushParam.getUserId())
+                        .eq(PushInfo::getPushRead, StatusConstant.NO)
+                        .and(s -> s.isNull(PushInfo::getFailureTime).or().le(PushInfo::getFailureTime, new Date()))
+                );
+                msg.put("count", count);
+                break;
+            case PUSH_NOTIFICATION:
+                msg.put("title", pushParam.getTitle());
+                msg.put("businessType", pushParam.getBusinessType());
+                msg.put("businessData", pushParam.getBusinessData());
+                break;
+        }
+
+        try {
+            this.session.getBasicRemote().sendText(JSON.toJSONString(msg.toJSONString()));
+            applicationContext.publishEvent(new WebSocketSendSuccessEvent(pushTypeEnum, pushParam));
+        } catch (IOException e) {
+            applicationContext.publishEvent(new WebSocketSendFailEvent(pushTypeEnum, pushParam));
+        }
+
+    }
+
+}

+ 22 - 0
hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketOnMessageEvent.java

@@ -0,0 +1,22 @@
+package com.fjhx.socket.core.event;
+
+import com.fjhx.socket.core.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock接收消息事件
+ */
+@Getter
+public class WebSocketOnMessageEvent extends ApplicationEvent {
+
+    private final WebSocketServer webSocketServer;
+    private final String message;
+
+    public WebSocketOnMessageEvent(WebSocketServer webSocketServer, String message) {
+        super(WebSocketOnMessageEvent.class);
+        this.webSocketServer = webSocketServer;
+        this.message = message;
+    }
+
+}

+ 20 - 0
hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketOnOpenEvent.java

@@ -0,0 +1,20 @@
+package com.fjhx.socket.core.event;
+
+import com.fjhx.socket.core.WebSocketServer;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock建立连接事件
+ */
+@Getter
+public class WebSocketOnOpenEvent extends ApplicationEvent {
+
+    private final WebSocketServer webSocketServer;
+
+    public WebSocketOnOpenEvent(WebSocketServer webSocketServer) {
+        super(WebSocketOnOpenEvent.class);
+        this.webSocketServer = webSocketServer;
+    }
+
+}

+ 24 - 0
hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketSendFailEvent.java

@@ -0,0 +1,24 @@
+package com.fjhx.socket.core.event;
+
+
+import com.fjhx.socket.core.PushParam;
+import com.fjhx.socket.core.PushTypeEnum;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息失败事件
+ */
+@Getter
+public class WebSocketSendFailEvent extends ApplicationEvent {
+
+    private final PushTypeEnum pushTypeEnum;
+    private final PushParam pushParam;
+
+    public WebSocketSendFailEvent(PushTypeEnum pushTypeEnum, PushParam pushParam) {
+        super(WebSocketSendFailEvent.class);
+        this.pushTypeEnum = pushTypeEnum;
+        this.pushParam = pushParam;
+    }
+
+}

+ 24 - 0
hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketSendSuccessEvent.java

@@ -0,0 +1,24 @@
+package com.fjhx.socket.core.event;
+
+import com.fjhx.socket.core.PushParam;
+import com.fjhx.socket.core.PushTypeEnum;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送消息成功事件
+ */
+@Getter
+public class WebSocketSendSuccessEvent extends ApplicationEvent {
+
+    private final PushTypeEnum pushTypeEnum;
+    private final PushParam pushParam;
+
+    public WebSocketSendSuccessEvent(PushTypeEnum pushTypeEnum, PushParam pushParam) {
+        super(WebSocketSendSuccessEvent.class);
+        this.pushTypeEnum = pushTypeEnum;
+        this.pushParam = pushParam;
+    }
+
+
+}

+ 23 - 0
hx-socket/src/main/java/com/fjhx/socket/core/event/WebSocketUserOfflineEvent.java

@@ -0,0 +1,23 @@
+package com.fjhx.socket.core.event;
+
+import com.fjhx.socket.core.PushParam;
+import com.fjhx.socket.core.PushTypeEnum;
+import lombok.Getter;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * webStock推送的用户离线事件
+ */
+@Getter
+public class WebSocketUserOfflineEvent extends ApplicationEvent {
+
+    private final PushTypeEnum pushTypeEnum;
+    private final PushParam pushParam;
+
+    public WebSocketUserOfflineEvent(PushTypeEnum pushTypeEnum, PushParam pushParam) {
+        super(WebSocketUserOfflineEvent.class);
+        this.pushTypeEnum = pushTypeEnum;
+        this.pushParam = pushParam;
+    }
+
+}

+ 0 - 37
hx-socket/src/main/java/com/fjhx/socket/entity/MessageEntity.java

@@ -1,37 +0,0 @@
-package com.fjhx.socket.entity;
-
-import lombok.Data;
-
-import java.util.Date;
-
-@Data
-public class MessageEntity {
-
-    /**
-     * 用户id
-     */
-    private Long userId;
-
-    /**
-     * 会话id
-     */
-    private Long sessionId;
-
-    /**
-     * 消息创建时间
-     */
-    private Date createTime;
-
-    /**
-     * 类型
-     *
-     * @remark -1:websocket链接失败  其他状态根据业务自定义
-     */
-    private Integer type;
-
-    /**
-     * 消息主体
-     */
-    private Object data;
-
-}

+ 17 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushAnnouncementDto.java

@@ -0,0 +1,17 @@
+package com.fjhx.socket.entity.push.dto;
+
+import com.fjhx.socket.entity.push.po.PushAnnouncement;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 公告新增编辑入参实体
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushAnnouncementDto extends PushAnnouncement {
+
+}

+ 17 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushAnnouncementSelectDto.java

@@ -0,0 +1,17 @@
+package com.fjhx.socket.entity.push.dto;
+
+import com.ruoyi.common.core.domain.BaseSelectDto;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 公告列表查询入参实体
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushAnnouncementSelectDto extends BaseSelectDto {
+
+}

+ 17 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushInfoDto.java

@@ -0,0 +1,17 @@
+package com.fjhx.socket.entity.push.dto;
+
+import com.fjhx.socket.entity.push.po.PushInfo;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 消息推送新增编辑入参实体
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushInfoDto extends PushInfo {
+
+}

+ 30 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/dto/PushInfoSelectDto.java

@@ -0,0 +1,30 @@
+package com.fjhx.socket.entity.push.dto;
+
+import com.ruoyi.common.core.domain.BaseSelectDto;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * 消息推送列表查询入参实体
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushInfoSelectDto extends BaseSelectDto {
+
+    /**
+     * 推送消息id
+     */
+    private List<Long> idList;
+
+    /**
+     * 是否已读 1是 0否
+     */
+    private Integer pushRead;
+
+
+}

+ 47 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/po/PushAnnouncement.java

@@ -0,0 +1,47 @@
+package com.fjhx.socket.entity.push.po;
+
+import com.ruoyi.common.core.domain.BasePo;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * <p>
+ * 公告
+ * </p>
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+@TableName("push_announcement")
+public class PushAnnouncement extends BasePo {
+
+    /**
+     * 标题
+     */
+    private String title;
+
+    /**
+     * 内容
+     */
+    private String content;
+
+    /**
+     * 发送时间
+     */
+    private Date sendTime;
+
+    /**
+     * 结束时间
+     */
+    private Date endTime;
+
+    /**
+     * 状态 0待发送 1已发送
+     */
+    private Integer status;
+
+}

+ 63 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/po/PushInfo.java

@@ -0,0 +1,63 @@
+package com.fjhx.socket.entity.push.po;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.ruoyi.common.core.domain.BasePo;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+
+/**
+ * <p>
+ * 消息推送
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+@TableName("push_info")
+public class PushInfo extends BasePo {
+
+    /**
+     * 推送类型 1公告 2消息
+     */
+    private Integer type;
+
+    /**
+     * 公告id
+     */
+    private Long announcementId;
+
+    /**
+     * 标题
+     */
+    private String title;
+
+    /**
+     * 业务类型
+     */
+    private Integer businessType;
+
+    /**
+     * 业务消息
+     */
+    private String businessData;
+
+    /**
+     * 推送用户id
+     */
+    private Long pushUserId;
+
+    /**
+     * 是否已读 1是 0否
+     */
+    private Integer pushRead;
+
+    /**
+     * 失效时间
+     */
+    private Date failureTime;
+
+}

+ 22 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/vo/PushAnnouncementVo.java

@@ -0,0 +1,22 @@
+package com.fjhx.socket.entity.push.vo;
+
+import com.fjhx.socket.entity.push.po.PushAnnouncement;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 公告列表查询返回值实体
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushAnnouncementVo extends PushAnnouncement {
+
+    /**
+     * 创建人名称
+     */
+    private String createUserName;
+
+}

+ 17 - 0
hx-socket/src/main/java/com/fjhx/socket/entity/push/vo/PushInfoVo.java

@@ -0,0 +1,17 @@
+package com.fjhx.socket.entity.push.vo;
+
+import com.fjhx.socket.entity.push.po.PushInfo;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * 消息推送列表查询返回值实体
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+@Getter
+@Setter
+public class PushInfoVo extends PushInfo {
+
+}

+ 0 - 22
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnCloseEvent.java

@@ -1,22 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock关闭连接事件
- */
-@Getter
-public class WebSocketOnCloseEvent extends ApplicationEvent {
-
-    public WebSocketOnCloseEvent(Object source) {
-        super(source);
-    }
-
-    @Override
-    public WebSocketServer getSource() {
-        return (WebSocketServer) super.getSource();
-    }
-
-}

+ 0 - 20
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnErrorEvent.java

@@ -1,20 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock发送异常
- */
-@Getter
-public class WebSocketOnErrorEvent extends ApplicationEvent {
-
-    private final Throwable throwable;
-
-    public WebSocketOnErrorEvent(WebSocketServer source, Throwable throwable) {
-        super(source);
-        this.throwable = throwable;
-    }
-
-}

+ 0 - 25
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnMessageEvent.java

@@ -1,25 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock接收消息事件
- */
-@Getter
-public class WebSocketOnMessageEvent extends ApplicationEvent {
-
-    private final String message;
-
-    public WebSocketOnMessageEvent(WebSocketServer source, String message) {
-        super(source);
-        this.message = message;
-    }
-
-    @Override
-    public WebSocketServer getSource() {
-        return (WebSocketServer) super.getSource();
-    }
-
-}

+ 0 - 22
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketOnOpenEvent.java

@@ -1,22 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock建立连接事件
- */
-@Getter
-public class WebSocketOnOpenEvent extends ApplicationEvent {
-
-    public WebSocketOnOpenEvent(WebSocketServer source) {
-        super(source);
-    }
-
-    @Override
-    public WebSocketServer getSource() {
-        return (WebSocketServer) super.getSource();
-    }
-
-}

+ 0 - 27
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketSendFailEvent.java

@@ -1,27 +0,0 @@
-package com.fjhx.socket.event;
-
-
-import com.fjhx.socket.entity.MessageEntity;
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock推送消息失败事件
- */
-@Getter
-public class WebSocketSendFailEvent extends ApplicationEvent {
-
-    private final MessageEntity messageEntity;
-
-    public WebSocketSendFailEvent(WebSocketServer source, MessageEntity messageEntity) {
-        super(source);
-        this.messageEntity = messageEntity;
-    }
-
-    @Override
-    public WebSocketServer getSource() {
-        return (WebSocketServer) super.getSource();
-    }
-
-}

+ 0 - 26
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketSendSuccessEvent.java

@@ -1,26 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.entity.MessageEntity;
-import com.fjhx.socket.service.WebSocketServer;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-/**
- * webStock推送消息成功事件
- */
-@Getter
-public class WebSocketSendSuccessEvent extends ApplicationEvent {
-
-    private final MessageEntity messageEntity;
-
-    public WebSocketSendSuccessEvent(WebSocketServer source, MessageEntity messageEntity) {
-        super(source);
-        this.messageEntity = messageEntity;
-    }
-
-    @Override
-    public WebSocketServer getSource() {
-        return (WebSocketServer) super.getSource();
-    }
-
-}

+ 0 - 25
hx-socket/src/main/java/com/fjhx/socket/event/WebSocketUserOfflineEvent.java

@@ -1,25 +0,0 @@
-package com.fjhx.socket.event;
-
-import com.fjhx.socket.entity.MessageEntity;
-import lombok.Getter;
-import org.springframework.context.ApplicationEvent;
-
-import java.util.Date;
-
-/**
- * webStock推送的用户离线事件
- */
-@Getter
-public class WebSocketUserOfflineEvent extends ApplicationEvent {
-
-    private final MessageEntity messageEntity = new MessageEntity();
-
-    public WebSocketUserOfflineEvent(Long userId, Integer type, Object message) {
-        super(userId);
-        this.messageEntity.setType(type);
-        this.messageEntity.setUserId(userId);
-        this.messageEntity.setData(message);
-        this.messageEntity.setCreateTime(new Date());
-    }
-
-}

+ 26 - 0
hx-socket/src/main/java/com/fjhx/socket/mapper/push/PushAnnouncementMapper.java

@@ -0,0 +1,26 @@
+package com.fjhx.socket.mapper.push;
+
+import com.fjhx.socket.entity.push.po.PushAnnouncement;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.vo.PushAnnouncementVo;
+import com.ruoyi.common.utils.wrapper.IWrapper;
+import org.apache.ibatis.annotations.Param;
+
+
+/**
+ * <p>
+ * 公告 Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+public interface PushAnnouncementMapper extends BaseMapper<PushAnnouncement> {
+
+    /**
+     * 公告分页
+     */
+    Page<PushAnnouncementVo> getPage(@Param("page") Page<Object> page, @Param("ew") IWrapper<PushAnnouncement> wrapper);
+
+}

+ 26 - 0
hx-socket/src/main/java/com/fjhx/socket/mapper/push/PushInfoMapper.java

@@ -0,0 +1,26 @@
+package com.fjhx.socket.mapper.push;
+
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.vo.PushInfoVo;
+import com.ruoyi.common.utils.wrapper.IWrapper;
+import org.apache.ibatis.annotations.Param;
+
+
+/**
+ * <p>
+ * 消息推送 Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+public interface PushInfoMapper extends BaseMapper<PushInfo> {
+
+    /**
+     * 消息推送分页
+     */
+    Page<PushInfoVo> getPage(@Param("page") Page<Object> page, @Param("ew") IWrapper<PushInfo> wrapper);
+
+}

+ 0 - 239
hx-socket/src/main/java/com/fjhx/socket/service/WebSocketServer.java

@@ -1,239 +0,0 @@
-package com.fjhx.socket.service;
-
-import cn.hutool.extra.spring.SpringUtil;
-import com.alibaba.fastjson.JSON;
-import com.baomidou.mybatisplus.core.toolkit.IdWorker;
-import com.fjhx.socket.entity.MessageEntity;
-import com.fjhx.socket.event.*;
-import com.ruoyi.common.core.domain.entity.SysUser;
-import com.ruoyi.common.core.domain.model.LoginUser;
-import com.ruoyi.common.utils.SecurityUtils;
-import com.ruoyi.framework.web.service.TokenService;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.ApplicationContext;
-import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.RequestBody;
-
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-@ServerEndpoint("/webStock/{token}")
-@Component
-@Slf4j
-@Getter
-public class WebSocketServer {
-
-    private static final ApplicationContext applicationContext = SpringUtil.getApplicationContext();
-
-    /**
-     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
-     */
-    private static final Map<Long, Map<Long, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
-    private static final TokenService tokenService = SpringUtil.getBean(TokenService.class);
-
-    /**
-     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
-     */
-    private Session session;
-
-    /**
-     * 接收userId
-     */
-    private Long userId;
-
-    /**
-     * sessionId
-     */
-    private Long sessionId;
-
-    /**
-     * 租户id
-     */
-    private String tenantId;
-
-    /**
-     * 发送自定义消息
-     *
-     * @param userId  发送用户
-     * @param type    消息类型
-     * @param message 消息内容
-     */
-    public static void sendInfo(Long userId, Integer type, Object message) {
-
-        String tenantId = SecurityUtils.getTenantId();
-
-        Map<Long, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
-
-        if (sessionIdWebSocketServerMap == null) {
-            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, type, message));
-            return;
-        }
-
-        for (WebSocketServer webSocketServer : sessionIdWebSocketServerMap.values()) {
-
-            String itemTenantId = webSocketServer.getTenantId();
-
-            if (Objects.equals(tenantId, itemTenantId)) {
-                webSocketServer.sendMessage(type, message);
-            }
-
-        }
-
-    }
-
-    /**
-     * 发送自定义消息
-     *
-     * @param userId    发送用户
-     * @param sessionId sessionId
-     * @param type      消息类型
-     * @param message   消息内容
-     */
-    public static void sendInfo(Long userId, Long sessionId, Integer type, Object message) {
-        Map<Long, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
-
-        if (sessionIdWebSocketServerMap == null) {
-            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, type, message));
-            return;
-        }
-
-        WebSocketServer webSocketServer = sessionIdWebSocketServerMap.get(sessionId);
-        if (webSocketServer != null) {
-            webSocketServer.sendMessage(type, message);
-        } else {
-            applicationContext.publishEvent(new WebSocketUserOfflineEvent(userId, type, message));
-        }
-
-    }
-
-    /**
-     * 发送自定义消息--群发
-     *
-     * @param message 消息内容
-     */
-    public static void sendInfoGroup(Integer type, Object message) {
-        if (webSocketMap.size() > 0) {
-            Set<Long> keySet = WebSocketServer.webSocketMap.keySet();
-            for (Long userId : keySet) {
-                sendInfo(userId, type, message);
-            }
-        }
-    }
-
-    /**
-     * 在线人数
-     */
-    public static int getOnlineCount() {
-        return webSocketMap.size();
-    }
-
-    /**
-     * 连接建立成功调用的方法
-     */
-    @OnOpen
-    public void onOpen(@RequestBody Session session, @PathParam("token") String token) {
-
-        // 解析token
-        LoginUser loginUser = tokenService.getLoginUser(token);
-
-        if (loginUser == null) {
-            try {
-                this.sendMessage(-1, "token错误");
-                this.session.close();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            return;
-        }
-
-        SysUser user = loginUser.getUser();
-
-        this.session = session;
-        this.sessionId = IdWorker.getId();
-        this.userId = user.getUserId();
-        this.tenantId = user.getTenantId();
-
-        synchronized (this) {
-            Map<Long, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
-            sessionIdWebSocketServerMap.put(sessionId, this);
-        }
-
-        // 发布建立连接连接事件
-        applicationContext.publishEvent(new WebSocketOnOpenEvent(this));
-    }
-
-    /**
-     * 连接关闭调用的方法
-     */
-    @OnClose
-    public void onClose() {
-        synchronized (this) {
-            if (userId != null) {
-                Map<Long, WebSocketServer> sessionIdWebSocketServerMap = webSocketMap.get(userId);
-                if (sessionIdWebSocketServerMap != null) {
-                    sessionIdWebSocketServerMap.remove(sessionId);
-
-                    if (sessionIdWebSocketServerMap.size() == 0) {
-                        webSocketMap.remove(userId);
-                    }
-
-                }
-            }
-        }
-
-        // 发布建立关闭连接事件
-        applicationContext.publishEvent(new WebSocketOnCloseEvent(this));
-    }
-
-    /**
-     * 收到客户端消息后调用的方法
-     *
-     * @param message 客户端发送过来的消息
-     */
-    @OnMessage
-    public void onMessage(String message) {
-        // 推送消息事件
-        applicationContext.publishEvent(new WebSocketOnMessageEvent(this, message));
-    }
-
-    /**
-     * @param error 异常
-     */
-    @OnError
-    public void onError(Throwable error) {
-        // 发布连接异常事件
-        applicationContext.publishEvent(new WebSocketOnErrorEvent(this, error));
-    }
-
-    /**
-     * 实现服务器主动推送
-     *
-     * @param message 消息内容
-     */
-    public void sendMessage(Integer type, Object message) {
-
-        MessageEntity sendEntity = new MessageEntity();
-        sendEntity.setUserId(userId);
-        sendEntity.setSessionId(sessionId);
-        sendEntity.setCreateTime(new Date());
-        sendEntity.setType(type);
-        sendEntity.setData(message);
-
-        try {
-            this.session.getBasicRemote().sendText(JSON.toJSONString(sendEntity));
-            applicationContext.publishEvent(new WebSocketSendSuccessEvent(this, sendEntity));
-        } catch (IOException e) {
-            applicationContext.publishEvent(new WebSocketSendFailEvent(this, sendEntity));
-        }
-
-    }
-
-}

+ 46 - 0
hx-socket/src/main/java/com/fjhx/socket/service/push/PushAnnouncementService.java

@@ -0,0 +1,46 @@
+package com.fjhx.socket.service.push;
+
+import com.fjhx.socket.entity.push.po.PushAnnouncement;
+import com.ruoyi.common.core.service.BaseService;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.vo.PushAnnouncementVo;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementSelectDto;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementDto;
+
+
+/**
+ * <p>
+ * 公告 服务类
+ * </p>
+ *
+ * @author 
+ * @since 2023-07-13
+ */
+public interface PushAnnouncementService extends BaseService<PushAnnouncement> {
+
+    /**
+     * 公告分页
+     */
+    Page<PushAnnouncementVo> getPage(PushAnnouncementSelectDto dto);
+
+    /**
+     * 公告明细
+     */
+    PushAnnouncementVo detail(Long id);
+
+    /**
+     * 公告新增
+     */
+    void add(PushAnnouncementDto pushAnnouncementDto);
+
+    /**
+     * 公告编辑
+     */
+    void edit(PushAnnouncementDto pushAnnouncementDto);
+
+    /**
+     * 公告删除
+     */
+    void delete(Long id);
+
+}

+ 38 - 0
hx-socket/src/main/java/com/fjhx/socket/service/push/PushInfoService.java

@@ -0,0 +1,38 @@
+package com.fjhx.socket.service.push;
+
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.ruoyi.common.core.service.BaseService;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fjhx.socket.entity.push.vo.PushInfoVo;
+import com.fjhx.socket.entity.push.dto.PushInfoSelectDto;
+import com.fjhx.socket.entity.push.dto.PushInfoDto;
+
+import java.util.List;
+
+
+/**
+ * <p>
+ * 消息推送 服务类
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+public interface PushInfoService extends BaseService<PushInfo> {
+
+    /**
+     * 消息推送分页
+     */
+    Page<PushInfoVo> getPage(PushInfoSelectDto dto);
+
+    /**
+     * 消息推送明细
+     */
+    PushInfoVo detail(Long id);
+
+    /**
+     * 设置已读
+     */
+    void read(List<Long> idList);
+
+}

+ 80 - 0
hx-socket/src/main/java/com/fjhx/socket/service/push/impl/PushAnnouncementServiceImpl.java

@@ -0,0 +1,80 @@
+package com.fjhx.socket.service.push.impl;
+
+import cn.hutool.core.bean.BeanUtil;
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementDto;
+import com.fjhx.socket.entity.push.dto.PushAnnouncementSelectDto;
+import com.fjhx.socket.entity.push.po.PushAnnouncement;
+import com.fjhx.socket.entity.push.vo.PushAnnouncementVo;
+import com.fjhx.socket.mapper.push.PushAnnouncementMapper;
+import com.fjhx.socket.service.push.PushAnnouncementService;
+import com.ruoyi.common.core.domain.BasePo;
+import com.ruoyi.common.core.redis.RedisCache;
+import com.ruoyi.common.utils.wrapper.IWrapper;
+import com.ruoyi.system.utils.UserUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+
+/**
+ * <p>
+ * 公告 服务实现类
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@Service
+public class PushAnnouncementServiceImpl extends ServiceImpl<PushAnnouncementMapper, PushAnnouncement> implements PushAnnouncementService {
+
+    @Autowired
+    private RedisCache redisCache;
+
+    public static final String PUSH_ANNOUNCEMENT_REDIS_KEY = "pushAnnouncement:";
+
+    @Override
+    public Page<PushAnnouncementVo> getPage(PushAnnouncementSelectDto dto) {
+        IWrapper<PushAnnouncement> wrapper = getWrapper();
+        wrapper.orderByDesc("pa", PushAnnouncement::getId);
+        wrapper.eq("pa", PushAnnouncement::getTitle, dto.getKeyword());
+        Page<PushAnnouncementVo> page = this.baseMapper.getPage(dto.getPage(), wrapper);
+        List<PushAnnouncementVo> records = page.getRecords();
+        if (records.size() == 0) {
+            return page;
+        }
+
+        UserUtil.assignmentNickName(records, BasePo::getCreateUser, PushAnnouncementVo::setCreateUserName);
+        return page;
+    }
+
+    @Override
+    public PushAnnouncementVo detail(Long id) {
+        PushAnnouncement PushAnnouncement = this.getById(id);
+        PushAnnouncementVo result = BeanUtil.toBean(PushAnnouncement, PushAnnouncementVo.class);
+        return result;
+    }
+
+    @Override
+    public void add(PushAnnouncementDto pushAnnouncementDto) {
+        pushAnnouncementDto.setId(IdWorker.getId());
+        redisCache.setCacheObject(PUSH_ANNOUNCEMENT_REDIS_KEY + pushAnnouncementDto.getId(), pushAnnouncementDto);
+        this.save(pushAnnouncementDto);
+    }
+
+    @Override
+    public void edit(PushAnnouncementDto pushAnnouncementDto) {
+        redisCache.setCacheObject(PUSH_ANNOUNCEMENT_REDIS_KEY + pushAnnouncementDto.getId(), pushAnnouncementDto);
+        this.updateById(pushAnnouncementDto);
+    }
+
+    @Override
+    public void delete(Long id) {
+        redisCache.deleteObject(PUSH_ANNOUNCEMENT_REDIS_KEY + id);
+        this.removeById(id);
+    }
+
+}

+ 60 - 0
hx-socket/src/main/java/com/fjhx/socket/service/push/impl/PushInfoServiceImpl.java

@@ -0,0 +1,60 @@
+package com.fjhx.socket.service.push.impl;
+
+import cn.hutool.core.bean.BeanUtil;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fjhx.socket.entity.push.dto.PushInfoDto;
+import com.fjhx.socket.entity.push.dto.PushInfoSelectDto;
+import com.fjhx.socket.entity.push.po.PushInfo;
+import com.fjhx.socket.entity.push.vo.PushInfoVo;
+import com.fjhx.socket.mapper.push.PushInfoMapper;
+import com.fjhx.socket.service.push.PushInfoService;
+import com.ruoyi.common.constant.StatusConstant;
+import com.ruoyi.common.core.domain.BaseIdPo;
+import com.ruoyi.common.core.domain.BasePo;
+import com.ruoyi.common.utils.SecurityUtils;
+import com.ruoyi.common.utils.wrapper.IWrapper;
+import org.springframework.stereotype.Service;
+
+import java.util.Date;
+import java.util.List;
+
+
+/**
+ * <p>
+ * 消息推送 服务实现类
+ * </p>
+ *
+ * @author
+ * @since 2023-07-13
+ */
+@Service
+public class PushInfoServiceImpl extends ServiceImpl<PushInfoMapper, PushInfo> implements PushInfoService {
+
+    @Override
+    public Page<PushInfoVo> getPage(PushInfoSelectDto dto) {
+        IWrapper<PushInfo> wrapper = getWrapper();
+        wrapper.orderByDesc("pi", PushInfo::getId);
+        wrapper.eq("pi", PushInfo::getPushRead, dto.getPushRead());
+        wrapper.eq("pi", PushInfo::getPushUserId, SecurityUtils.getUserId());
+        Page<PushInfoVo> page = this.baseMapper.getPage(dto.getPage(), wrapper);
+        return page;
+    }
+
+    @Override
+    public PushInfoVo detail(Long id) {
+        PushInfo PushInfo = this.getById(id);
+        PushInfoVo result = BeanUtil.toBean(PushInfo, PushInfoVo.class);
+        return result;
+    }
+
+    @Override
+    public void read(List<Long> idList) {
+        update(q -> q.in(BaseIdPo::getId, idList)
+                .set(PushInfo::getPushRead, StatusConstant.YES)
+                .set(BasePo::getUpdateTime, new Date())
+                .set(BasePo::getUpdateUser, SecurityUtils.getUserId())
+        );
+    }
+
+}

+ 20 - 0
hx-socket/src/main/resources/mapper/push/PushAnnouncementMapper.xml

@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fjhx.socket.mapper.push.PushAnnouncementMapper">
+    <select id="getPage" resultType="com.fjhx.socket.entity.push.vo.PushAnnouncementVo">
+        select
+            pa.id,
+            pa.title,
+            pa.content,
+            pa.send_time,
+            pa.end_time,
+            pa.status,
+            pa.create_user,
+            pa.create_time,
+            pa.update_user,
+            pa.update_time
+        from push_announcement pa
+            ${ew.customSqlSegment}
+    </select>
+
+</mapper>

+ 24 - 0
hx-socket/src/main/resources/mapper/push/PushInfoMapper.xml

@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fjhx.socket.mapper.push.PushInfoMapper">
+    <select id="getPage" resultType="com.fjhx.socket.entity.push.vo.PushInfoVo">
+        select
+            pi.id,
+            pi.type,
+            pi.announcement_id,
+            pi.title,
+            pi.business_type,
+            pi.business_data,
+            pi.push_user_id,
+            pi.push_read,
+            pi.push_start_time,
+            pi.push_end_time,
+            pi.create_user,
+            pi.create_time,
+            pi.update_user,
+            pi.update_time
+        from push_info pi
+            ${ew.customSqlSegment}
+    </select>
+
+</mapper>