|
@@ -0,0 +1,167 @@
|
|
|
+package com.fjhx.common.service.send.impl;
|
|
|
+
|
|
|
+import cn.hutool.core.bean.BeanUtil;
|
|
|
+import cn.hutool.core.convert.Convert;
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
|
|
|
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.fjhx.common.constant.SourceConstant;
|
|
|
+import com.fjhx.common.entity.send.dto.SendMegDto;
|
|
|
+import com.fjhx.common.entity.send.dto.SendMegSelectDto;
|
|
|
+import com.fjhx.common.entity.send.po.SendMeg;
|
|
|
+import com.fjhx.common.entity.send.vo.SendMegVo;
|
|
|
+import com.fjhx.common.mapper.send.SendMegMapper;
|
|
|
+import com.fjhx.common.service.send.SendMegService;
|
|
|
+import com.fjhx.socket.entity.MessageEntity;
|
|
|
+import com.fjhx.socket.event.WebSocketUserOfflineEvent;
|
|
|
+import com.fjhx.socket.service.WebSocketServer;
|
|
|
+import com.ruoyi.common.constant.StatusConstant;
|
|
|
+import com.ruoyi.common.core.domain.entity.SysUser;
|
|
|
+import com.ruoyi.common.core.redis.RedisCache;
|
|
|
+import com.ruoyi.common.utils.wrapper.IWrapper;
|
|
|
+import com.ruoyi.framework.mybatis.holder.TenantHolder;
|
|
|
+import com.ruoyi.system.service.ISysUserService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * <p>
|
|
|
+ * 服务实现类
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @author
|
|
|
+ * @since 2023-05-22
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class SendMegServiceImpl extends ServiceImpl<SendMegMapper, SendMeg> implements SendMegService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ISysUserService sysUserService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RedisCache redisCache;
|
|
|
+
|
|
|
+ private static final String sendMessageKey = "sendMessage:";
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Page<SendMegVo> getPage(SendMegSelectDto dto) {
|
|
|
+ IWrapper<SendMeg> wrapper = getWrapper();
|
|
|
+ wrapper.eq("sm", SendMeg::getType, dto.getType());
|
|
|
+ wrapper.eq("sm", SendMeg::getStatus, dto.getStatus());
|
|
|
+ wrapper.orderByAsc("sm", SendMeg::getStatus);
|
|
|
+ wrapper.orderByDesc("sm", SendMeg::getId);
|
|
|
+ Page<SendMegVo> page = this.baseMapper.getPage(dto.getPage(), wrapper);
|
|
|
+ return page;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public SendMegVo detail(Long id) {
|
|
|
+ SendMeg SendMeg = this.getById(id);
|
|
|
+ SendMegVo result = BeanUtil.toBean(SendMeg, SendMegVo.class);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void add(SendMegDto sendMegDto) {
|
|
|
+ Date sendTime = sendMegDto.getSendTime();
|
|
|
+ if (sendTime.before(DateUtil.endOfMinute(new Date()))) {
|
|
|
+ sendMegDto.setStatus(StatusConstant.YES);
|
|
|
+ new Thread(() -> send(sendMegDto)).start();
|
|
|
+ } else {
|
|
|
+ sendMegDto.setStatus(StatusConstant.NO);
|
|
|
+ }
|
|
|
+
|
|
|
+ this.save(sendMegDto);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void edit(SendMegDto sendMegDto) {
|
|
|
+ sendMegDto.setStatus(StatusConstant.YES);
|
|
|
+ this.updateById(sendMegDto);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void delete(Long id) {
|
|
|
+ this.removeById(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void send(SendMeg sendMeg) {
|
|
|
+ DynamicDataSourceContextHolder.push(SourceConstant.BASE);
|
|
|
+ TenantHolder.setIgnore(true);
|
|
|
+ List<SysUser> list = sysUserService.list();
|
|
|
+ DynamicDataSourceContextHolder.poll();
|
|
|
+
|
|
|
+ for (SysUser sysUser : list) {
|
|
|
+ WebSocketServer.sendInfo(sysUser.getUserId(), 0, sendMeg);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void executeSend() {
|
|
|
+
|
|
|
+ Set<String> scan = redisCache.scan(sendMessageKey + "**");
|
|
|
+ for (String key : scan) {
|
|
|
+ String[] split = key.split(":");
|
|
|
+ if (split.length != 3) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String userIdStr = split[2];
|
|
|
+ Long userId = Convert.toLong(userIdStr);
|
|
|
+ if (userId == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ SendMeg cacheObject = redisCache.getCacheObject(key);
|
|
|
+ redisCache.deleteObject(key);
|
|
|
+ WebSocketServer.sendInfo(userId, 0, cacheObject);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("推送异常", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ List<SendMeg> list = list(q -> q
|
|
|
+ .eq(SendMeg::getStatus, StatusConstant.NO)
|
|
|
+ .le(SendMeg::getSendTime, DateUtil.endOfMinute(new Date())));
|
|
|
+
|
|
|
+ for (SendMeg sendMeg : list) {
|
|
|
+ send(sendMeg);
|
|
|
+ sendMeg.setStatus(StatusConstant.YES);
|
|
|
+ }
|
|
|
+ updateBatchById(list);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @EventListener
|
|
|
+ public void userOfflineEvent(WebSocketUserOfflineEvent webSocketUserOfflineEvent) {
|
|
|
+ MessageEntity messageEntity = webSocketUserOfflineEvent.getMessageEntity();
|
|
|
+
|
|
|
+ Object data = messageEntity.getData();
|
|
|
+
|
|
|
+ if (messageEntity.getType() != 0 || !(data instanceof SendMeg)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.error("用户{}不在线", messageEntity.getUserId());
|
|
|
+ String key = sendMessageKey + ((SendMeg) data).getId() + ":" + messageEntity.getUserId();
|
|
|
+ if (((SendMeg) data).getEndTime().after(new Date())) {
|
|
|
+ redisCache.setCacheObject(key, data);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|