24282 2 gadi atpakaļ
vecāks
revīzija
771ccceb0b

+ 1 - 1
pom.xml

@@ -91,7 +91,7 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
-            <version>2.0.21</version>
+            <version>1.2.76</version>
         </dependency>
 
     </dependencies>

+ 1 - 2
src/main/java/com/fjhx/config/redis/FastJsonRedisSerialize.java

@@ -42,8 +42,7 @@ public class FastJsonRedisSerialize<T> implements RedisSerializer<T> {
         if (null == bytes || bytes.length == 0) {
             return null;
         }
-        String str = new String(bytes, DEFAULT_CHARSET);
-        return JSON.parseObject(str, clazz);
+        return JSON.parseObject(new String(bytes, DEFAULT_CHARSET), clazz);
     }
 
 }

+ 44 - 0
src/main/java/com/fjhx/config/redis/RedisConfig.java

@@ -0,0 +1,44 @@
+package com.fjhx.config.redis;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * redis配置
+ */
+
+@Configuration
+@SuppressWarnings(value = "All")
+public class RedisConfig {
+
+    @Bean
+    public RedisSerializer<Object> fastJson2JsonRedisSerialize() {
+        return new FastJsonRedisSerialize<>(Object.class);
+    }
+
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
+
+        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
+        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
+
+        redisTemplate.setConnectionFactory(redisConnectionFactory);
+        // key采用String的序列化方式
+        redisTemplate.setKeySerializer(stringRedisSerializer);
+        // hash的key也采用String的序列化方式
+        redisTemplate.setHashKeySerializer(stringRedisSerializer);
+        // value序列化方式采用fastJson
+        redisTemplate.setValueSerializer(fastJson2JsonRedisSerialize());
+        // hash的value序列化方式采用fastJson
+        redisTemplate.setHashValueSerializer(fastJson2JsonRedisSerialize());
+
+        redisTemplate.afterPropertiesSet();
+
+        return redisTemplate;
+    }
+
+}

+ 10 - 0
src/main/java/com/fjhx/controller/AccountController.java

@@ -6,6 +6,7 @@ import com.fjhx.constants.RedisConstant;
 import com.fjhx.entity.EmailInfo;
 import com.fjhx.service.IAccountService;
 import com.fjhx.vo.BindingVo;
+import com.fjhx.vo.ListenerVo;
 import com.fjhx.vo.ProgressVo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
@@ -32,7 +33,16 @@ public class AccountController {
     @GetMapping("getAsyncProgress")
     public R getAsyncProgress(@RequestParam("email") String email) {
         ProgressVo progressVo = RedisCache.get(RedisConstant.PROGRESS_KEY + email);
+        if (progressVo.getPercentage() >= 100) {
+            RedisCache.delete(RedisConstant.PROGRESS_KEY + email);
+        }
         return R.ok(progressVo);
     }
 
+    @PostMapping("/listener")
+    public R post(@RequestBody ListenerVo listenerVo) {
+        accountService.listener(listenerVo);
+        return R.ok();
+    }
+
 }

+ 39 - 0
src/main/java/com/fjhx/enums/SendEventEnum.java

@@ -0,0 +1,39 @@
+package com.fjhx.enums;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+
+@Getter
+public enum SendEventEnum {
+
+    /**
+     * 收到新邮件
+     */
+    MESSAGE_NEW("messageNew"),
+
+
+    ;
+
+    private final String type;
+
+    private static final HashMap<String, SendEventEnum> map = new HashMap<>();
+
+    SendEventEnum(String type) {
+        this.type = type;
+    }
+
+    static {
+        for (SendEventEnum value : SendEventEnum.values()) {
+            map.put(value.getType(), value);
+        }
+    }
+
+    /**
+     * 根据type获取枚举
+     */
+    public static SendEventEnum get(String type) {
+        return map.get(type);
+    }
+
+}

+ 8 - 0
src/main/java/com/fjhx/service/IAccountService.java

@@ -2,10 +2,18 @@ package com.fjhx.service;
 
 import com.fjhx.entity.EmailInfo;
 import com.fjhx.vo.BindingVo;
+import com.fjhx.vo.ListenerVo;
 
 public interface IAccountService {
 
+    /**
+     * 绑定邮箱
+     */
     EmailInfo binding(BindingVo bindingVo);
 
+    /**
+     * 监听邮箱
+     */
+    void listener(ListenerVo listenerVo);
 
 }

+ 67 - 8
src/main/java/com/fjhx/service/impl/AccountServiceImpl.java

@@ -2,7 +2,10 @@ package com.fjhx.service.impl;
 
 import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.lang.Assert;
 import cn.hutool.core.thread.ThreadUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@@ -11,6 +14,7 @@ import com.fjhx.config.redis.RedisCache;
 import com.fjhx.constants.RedisConstant;
 import com.fjhx.constants.SendConstant;
 import com.fjhx.entity.*;
+import com.fjhx.enums.SendEventEnum;
 import com.fjhx.service.*;
 import com.fjhx.utils.EmailEngineUtil;
 import com.fjhx.utils.RetryUtil;
@@ -55,6 +59,9 @@ public class AccountServiceImpl implements IAccountService {
 
         String email = bindingVo.getEmail();
 
+        ProgressVo progressVo = RedisCache.get(RedisConstant.PROGRESS_KEY + email);
+        Assert.isNull(progressVo, "邮箱正在同步中,请勿重复操作");
+
         EmailInfo emailInfo = emailInfoService.getOne(Wrappers.<EmailInfo>lambdaQuery().eq(EmailInfo::getEmail, email));
         // 如果存在,直接返回邮箱信息
         if (emailInfo != null) {
@@ -69,16 +76,32 @@ public class AccountServiceImpl implements IAccountService {
         List<EmailMailboxVo.MailboxesDTO> mailboxList = EmailEngineUtil.getMailboxList(email);
         // 添加邮箱文件夹
         List<EmailMailbox> emailMailboxList = this.saveEmailMailbox(mailboxList, emailInfo.getId(), email);
-
         // redis添加同步进度
         this.synchronizationProgressInitialization(email, mailboxList);
-
         // 异步遍历文件夹下的所有邮件
         this.asyncReadEmail(email, emailMailboxList);
 
         return emailInfo;
     }
 
+    @Transactional(rollbackFor = Exception.class)
+    @Override
+    public void listener(ListenerVo listenerVo) {
+
+        log.info("监听到新消息:{}", JSON.toJSONString(listenerVo));
+
+        switch (SendEventEnum.get(listenerVo.getEvent())) {
+            case MESSAGE_NEW:
+                this.handleMessageNewEvent(listenerVo);
+                break;
+
+
+            default:
+                log.error("监听到未知事件:{}", JSONObject.toJSONString(listenerVo));
+        }
+
+    }
+
     /**
      * 添加邮件信息
      */
@@ -119,10 +142,8 @@ public class AccountServiceImpl implements IAccountService {
 
             // 遍历每个邮箱
             for (EmailMailbox emailMailbox : emailMailboxList) {
-
                 int page = 0;
                 int pages;
-
                 do {
                     MessageVo result = EmailEngineUtil.handleMessageList(email, emailMailbox.getPath(), page);
                     List<MessageVo.MessagesDTO> messagesDTOList = result.getMessages();
@@ -130,13 +151,10 @@ public class AccountServiceImpl implements IAccountService {
                         this.saveBatchMessage(email, lastMessageIdList, emailMailbox, messagesDTOList);
                         this.synchronizationProgress(email, messagesDTOList, emailMailbox);
                     }
-
                     pages = result.getPages();
                     page++;
                 } while (page < pages);
-
             }
-
         });
     }
 
@@ -296,7 +314,6 @@ public class AccountServiceImpl implements IAccountService {
         }
     }
 
-
     /**
      * 同步邮件进度初始化
      */
@@ -344,4 +361,46 @@ public class AccountServiceImpl implements IAccountService {
 
         RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo);
     }
+
+    /**
+     * 处理收到新邮件事件
+     */
+    private void handleMessageNewEvent(ListenerVo listenerVo) {
+        // 查询邮箱
+        String account = listenerVo.getAccount();
+        String path = listenerVo.getPath();
+
+        EmailMailbox emailMailbox;
+        try {
+            emailMailbox = RetryUtil.execute(() -> {
+                EmailMailbox tempEmailMailbox = emailMailboxService.getOne(Wrappers.<EmailMailbox>lambdaQuery()
+                        .eq(EmailMailbox::getPath, path)
+                        .eq(EmailMailbox::getEmail, account)
+                );
+                Assert.notNull(tempEmailMailbox, "未找到邮箱信息");
+                return tempEmailMailbox;
+            });
+        } catch (Exception e) {
+            log.error("查找邮箱失败", e);
+            return;
+        }
+
+        // 获取附件详情
+        MessageVo.MessagesDTO messagesDTO = listenerVo.getData().toJavaObject(MessageVo.MessagesDTO.class);
+
+        // 保存邮件
+        EmailMessage emailMessage = this.createMessage(account, emailMailbox, messagesDTO);
+        emailMessageService.save(emailMessage);
+
+        // 保存推送人信息
+        List<EmailMessageSend> emailMessageSendList = new ArrayList<>();
+        this.addMessageSend(emailMessageSendList, messagesDTO, emailMessage.getId());
+        emailMessageSendService.saveBatch(emailMessageSendList);
+
+        // 保存附件信息
+        List<EmailMessageAttachment> emailMessageAttachmentList = new ArrayList<>();
+        this.addMessageAttachment(account, emailMessageAttachmentList, messagesDTO, emailMessage.getId());
+        emailMessageAttachmentService.saveBatch(emailMessageAttachmentList);
+    }
+
 }

+ 1 - 1
src/main/java/com/fjhx/utils/RetryUtil.java

@@ -36,7 +36,7 @@ public class RetryUtil {
             if (executeCount >= maxCount) {
                 throw e;
             }
-            log.warn("任务执行失败,已失败{}次,最多允许失败{}次{}秒后开始重试", executeCount + 1, maxCount, sleep / 1000, e);
+            log.warn("任务执行失败,已失败{}次,最多允许失败{}次{}秒后开始重试", executeCount + 1, maxCount, sleep / 1000, e);
             ThreadUtil.sleep(sleep);
             return execute(runnable, ++executeCount, maxCount, sleep);
         }

+ 21 - 0
src/main/java/com/fjhx/vo/ListenerVo.java

@@ -0,0 +1,21 @@
+package com.fjhx.vo;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+
+@Data
+public class ListenerVo {
+
+    private String path;
+
+    private String serviceUrl;
+
+    private String specialUse;
+
+    private String event;
+
+    private String account;
+
+    private JSONObject data;
+
+}