24282 2 жил өмнө
parent
commit
972ca6c286

+ 49 - 0
src/main/java/com/fjhx/config/redis/FastJsonRedisSerialize.java

@@ -0,0 +1,49 @@
+package com.fjhx.config.redis;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import org.springframework.data.redis.serializer.RedisSerializer;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+public class FastJsonRedisSerialize<T> implements RedisSerializer<T> {
+
+    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+
+    static {
+        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
+    }
+
+    private final Class<T> clazz;
+
+    public FastJsonRedisSerialize(Class<T> clazz) {
+        super();
+        this.clazz = clazz;
+    }
+
+    /**
+     * 序列化
+     */
+    @Override
+    public byte[] serialize(T t) {
+        if (null == t) {
+            return new byte[0];
+        }
+        return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
+    }
+
+    /**
+     * 反序列化
+     */
+    @Override
+    public T deserialize(byte[] bytes) {
+        if (null == bytes || bytes.length == 0) {
+            return null;
+        }
+        String str = new String(bytes, DEFAULT_CHARSET);
+        return JSON.parseObject(str, clazz);
+    }
+
+}

+ 82 - 0
src/main/java/com/fjhx/config/redis/RedisCache.java

@@ -0,0 +1,82 @@
+package com.fjhx.config.redis;
+
+import cn.hutool.extra.spring.SpringUtil;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ValueOperations;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * redis缓存 工具类
+ **/
+@SuppressWarnings(value = {"unchecked", "rawtypes"})
+public class RedisCache {
+
+    private static final RedisTemplate redisTemplate = SpringUtil.getBean("redisTemplate");
+
+    /**
+     * 设置缓存(默认缓存30分钟)
+     *
+     * @param key   缓存的键值
+     * @param value 缓存的值
+     */
+    public static void set(String key, Object value) {
+        redisTemplate.opsForValue().set(key, value);
+    }
+
+    /**
+     * 设置缓存,并指定过期时间(分钟)
+     *
+     * @param key     缓存的键值
+     * @param value   缓存的值
+     * @param timeout 时间
+     */
+    public static void set(String key, Object value, Integer timeout) {
+        set(key, value, timeout, TimeUnit.MINUTES);
+    }
+
+    /**
+     * 设置缓存
+     *
+     * @param key      缓存的键值
+     * @param value    缓存的值
+     * @param timeout  时间
+     * @param timeUnit 时间颗粒度
+     */
+    public static void set(String key, Object value, Integer timeout, TimeUnit timeUnit) {
+        redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
+    }
+
+    /**
+     * 获取缓存
+     *
+     * @param key 缓存键值
+     * @param <T> 放回对象类型
+     * @return 缓存键值对应的数据
+     */
+    public static <T> T get(String key) {
+        ValueOperations<String, T> operation = redisTemplate.opsForValue();
+        return operation.get(key);
+    }
+
+    /**
+     * 删除缓存
+     *
+     * @param key 缓存key
+     */
+    public static void delete(String key) {
+        redisTemplate.delete(key);
+    }
+
+    /**
+     * 获得缓存的基本对象列表
+     *
+     * @param pattern 字符串前缀
+     * @return 对象列表
+     */
+    public static Collection<String> keys(final String pattern) {
+        return redisTemplate.keys(pattern);
+    }
+
+}

+ 12 - 0
src/main/java/com/fjhx/constants/RedisConstant.java

@@ -0,0 +1,12 @@
+package com.fjhx.constants;
+
+public interface RedisConstant {
+
+    String PREFIX = "email:";
+
+    /**
+     * 邮箱同步进度redisKey
+     */
+    String PROGRESS_KEY = PREFIX + "progressKey:";
+
+}

+ 1 - 1
src/main/java/com/fjhx/constants/SendConstants.java → src/main/java/com/fjhx/constants/SendConstant.java

@@ -1,6 +1,6 @@
 package com.fjhx.constants;
 
-public interface SendConstants {
+public interface SendConstant {
 
     /**
      * 收件人

+ 13 - 4
src/main/java/com/fjhx/controller/AccountController.java

@@ -1,14 +1,14 @@
 package com.fjhx.controller;
 
 import com.fjhx.base.R;
+import com.fjhx.config.redis.RedisCache;
+import com.fjhx.constants.RedisConstant;
 import com.fjhx.entity.EmailInfo;
 import com.fjhx.service.IAccountService;
 import com.fjhx.vo.BindingVo;
+import com.fjhx.vo.ProgressVo;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 
 @RestController
 @RequestMapping("/account")
@@ -26,4 +26,13 @@ public class AccountController {
         return R.ok(emailInfo);
     }
 
+    /**
+     * 查看用户账号同步进度
+     */
+    @GetMapping("getAsyncProgress")
+    public R getAsyncProgress(@RequestParam("email") String email) {
+        ProgressVo progressVo = RedisCache.get(RedisConstant.PROGRESS_KEY + email);
+        return R.ok(progressVo);
+    }
+
 }

+ 89 - 24
src/main/java/com/fjhx/service/impl/AccountServiceImpl.java

@@ -2,19 +2,19 @@ package com.fjhx.service.impl;
 
 import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.thread.ThreadUtil;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.fjhx.config.TaskPoolConfig;
-import com.fjhx.constants.SendConstants;
+import com.fjhx.config.redis.RedisCache;
+import com.fjhx.constants.RedisConstant;
+import com.fjhx.constants.SendConstant;
 import com.fjhx.entity.*;
 import com.fjhx.service.*;
 import com.fjhx.utils.EmailEngineUtil;
 import com.fjhx.utils.RetryUtil;
-import com.fjhx.vo.BindingVo;
-import com.fjhx.vo.EmailMailboxVo;
-import com.fjhx.vo.MessageDetailVo;
-import com.fjhx.vo.MessageVo;
+import com.fjhx.vo.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -63,14 +63,19 @@ public class AccountServiceImpl implements IAccountService {
 
         // 添加账号
         EmailEngineUtil.createAccount(bindingVo);
-        // 保存账号数据
-        emailInfo = saveEmailInfo(bindingVo);
+        // 保存账号信息到数据
+        emailInfo = this.saveEmailInfo(bindingVo);
         // 查询邮箱文件夹
         List<EmailMailboxVo.MailboxesDTO> mailboxList = EmailEngineUtil.getMailboxList(email);
         // 添加邮箱文件夹
-        List<EmailMailbox> emailMailboxList = saveEmailMailbox(mailboxList, emailInfo.getId(), email);
+        List<EmailMailbox> emailMailboxList = this.saveEmailMailbox(mailboxList, emailInfo.getId(), email);
+
+        // redis添加同步进度
+        this.synchronizationProgressInitialization(email, mailboxList);
+
         // 异步遍历文件夹下的所有邮件
-        asyncReadEmail(email, emailMailboxList);
+        this.asyncReadEmail(email, emailMailboxList);
+
         return emailInfo;
     }
 
@@ -107,19 +112,29 @@ public class AccountServiceImpl implements IAccountService {
      */
     private void asyncReadEmail(String email, List<EmailMailbox> emailMailboxList) {
         executor.execute(() -> {
-            // ThreadUtil.sleep(5000);
+            ThreadUtil.sleep(3000);
 
             // 记录上次消息id,防止循环查询分页过程中接收到了新邮件,出现邮件重复问题
             List<String> lastMessageIdList = new ArrayList<>();
 
             // 遍历每个邮箱
             for (EmailMailbox emailMailbox : emailMailboxList) {
-                // 保存邮箱中的邮件
-                EmailEngineUtil.handleMessageList(
-                        email,
-                        emailMailbox.getPath(),
-                        messageList -> this.saveBatchMessage(email, lastMessageIdList, emailMailbox, messageList)
-                );
+
+                int page = 0;
+                int pages;
+
+                do {
+                    MessageVo result = EmailEngineUtil.handleMessageList(email, emailMailbox.getPath(), page);
+                    List<MessageVo.MessagesDTO> messagesDTOList = result.getMessages();
+                    if (messagesDTOList.size() > 0) {
+                        this.saveBatchMessage(email, lastMessageIdList, emailMailbox, messagesDTOList);
+                        this.synchronizationProgress(email, messagesDTOList, emailMailbox);
+                    }
+
+                    pages = result.getPages();
+                    page++;
+                } while (page < pages);
+
             }
 
         });
@@ -135,25 +150,27 @@ public class AccountServiceImpl implements IAccountService {
         List<EmailMessageAttachment> emailMessageAttachmentList = new ArrayList<>();
 
         for (MessageVo.MessagesDTO messagesDTO : messageList) {
+            // 已添加过的邮件则跳过
             if (lastMessageId.contains(messagesDTO.getId())) {
                 continue;
             }
 
             // 生成邮件实体
-            EmailMessage emailMessage = createMessage(email, emailMailbox, messagesDTO);
-
+            EmailMessage emailMessage = this.createMessage(email, emailMailbox, messagesDTO);
             emailMessageList.add(emailMessage);
 
             // 添加推送信息
-            addMessageSend(emailMessageSendList, messagesDTO, emailMessage.getId());
+            this.addMessageSend(emailMessageSendList, messagesDTO, emailMessage.getId());
 
             // 下载并添加附件信息
-            addMessageAttachment(email, emailMessageAttachmentList, messagesDTO, emailMessage.getId());
-
+            this.addMessageAttachment(email, emailMessageAttachmentList, messagesDTO, emailMessage.getId());
         }
 
+        // 保存邮件
         emailMessageService.saveBatch(emailMessageList);
+        // 保存推送人信息
         emailMessageSendService.saveBatch(emailMessageSendList);
+        // 保存附件信息
         emailMessageAttachmentService.saveBatch(emailMessageAttachmentList);
 
         lastMessageId.clear();
@@ -207,7 +224,7 @@ public class AccountServiceImpl implements IAccountService {
                 EmailMessageSend emailMessageSend = new EmailMessageSend();
                 emailMessageSend.setEmailMessageId(emailMessageId);
                 emailMessageSend.setMessageId(messagesDTO.getId());
-                emailMessageSend.setType(SendConstants.TO);
+                emailMessageSend.setType(SendConstant.TO);
                 emailMessageSend.setName(toDTO.getName());
                 emailMessageSend.setAddress(toDTO.getAddress());
                 emailMessageSendList.add(emailMessageSend);
@@ -221,7 +238,7 @@ public class AccountServiceImpl implements IAccountService {
                 EmailMessageSend emailMessageSend = new EmailMessageSend();
                 emailMessageSend.setEmailMessageId(emailMessageId);
                 emailMessageSend.setMessageId(messagesDTO.getId());
-                emailMessageSend.setType(SendConstants.CC);
+                emailMessageSend.setType(SendConstant.CC);
                 emailMessageSend.setName(toDTO.getName());
                 emailMessageSend.setAddress(toDTO.getAddress());
                 emailMessageSendList.add(emailMessageSend);
@@ -235,7 +252,7 @@ public class AccountServiceImpl implements IAccountService {
                 EmailMessageSend emailMessageSend = new EmailMessageSend();
                 emailMessageSend.setEmailMessageId(emailMessageId);
                 emailMessageSend.setMessageId(messagesDTO.getId());
-                emailMessageSend.setType(SendConstants.BCC);
+                emailMessageSend.setType(SendConstant.BCC);
                 emailMessageSend.setName(toDTO.getName());
                 emailMessageSend.setAddress(toDTO.getAddress());
                 emailMessageSendList.add(emailMessageSend);
@@ -279,4 +296,52 @@ public class AccountServiceImpl implements IAccountService {
         }
     }
 
+
+    /**
+     * 同步邮件进度初始化
+     */
+    private void synchronizationProgressInitialization(String email, List<EmailMailboxVo.MailboxesDTO> mailboxList) {
+
+        ProgressVo progressVo = new ProgressVo();
+
+
+        List<ProgressVo.Details> detailsList = BeanUtil.copyToList(mailboxList, ProgressVo.Details.class);
+
+        int totalMessageCount = 0;
+
+        for (ProgressVo.Details detail : detailsList) {
+            detail.setCompleteMessageCount(0);
+            detail.setPercentage(0);
+            totalMessageCount += detail.getMessages();
+        }
+
+        progressVo.setTotalMessageCount(totalMessageCount);
+        progressVo.setCompleteMessageCount(0);
+        progressVo.setPercentage(0);
+        progressVo.setDetailsList(detailsList);
+
+        RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo);
+    }
+
+    /**
+     * 同步邮件进度
+     */
+    private void synchronizationProgress(String email, List<MessageVo.MessagesDTO> messagesDTOList, EmailMailbox emailMailbox) {
+
+        ProgressVo progressVo = RedisCache.get(RedisConstant.PROGRESS_KEY + email);
+        int completeMessageCount = progressVo.getCompleteMessageCount() + messagesDTOList.size();
+        progressVo.setCompleteMessageCount(completeMessageCount);
+        progressVo.setPercentage(100 * completeMessageCount / progressVo.getTotalMessageCount());
+
+        for (ProgressVo.Details details : progressVo.getDetailsList()) {
+            if (details.getPath().equals(emailMailbox.getPath())) {
+                int detailsCompleteMessageCount = details.getCompleteMessageCount() + messagesDTOList.size();
+                details.setCompleteMessageCount(detailsCompleteMessageCount);
+                details.setPercentage(100 * detailsCompleteMessageCount / details.getMessages());
+                break;
+            }
+        }
+
+        RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo);
+    }
 }

+ 5 - 22
src/main/java/com/fjhx/utils/EmailEngineUtil.java

@@ -14,9 +14,7 @@ import org.springframework.web.client.ResponseErrorHandler;
 import org.springframework.web.client.RestTemplate;
 
 import java.io.*;
-import java.rmi.RemoteException;
 import java.util.List;
-import java.util.function.Consumer;
 
 @Slf4j
 @Component
@@ -40,8 +38,8 @@ public class EmailEngineUtil {
             public void handleError(@NotNull ClientHttpResponse response) throws IOException {
                 InputStream body = response.getBody();
                 byte[] bytes = new byte[body.available()];
-                String result = new String(bytes);
-                throw new RemoteException(result);
+                body.read(bytes);
+                throw new RuntimeException(new String(bytes));
             }
         });
     }
@@ -86,24 +84,9 @@ public class EmailEngineUtil {
     /**
      * 查看文件夹的所有邮件
      */
-    public static void handleMessageList(String email, String mailboxName, Consumer<List<MessageVo.MessagesDTO>> consumer) {
-        int page = 0;
-        int pages;
-
-        do {
-            String url = "/v1/account/" + email + "/messages?path=" + mailboxName + "&page=" + page + "&pageSize=100&documentStore=false";
-
-            MessageVo result = get(url, MessageVo.class);
-
-            List<MessageVo.MessagesDTO> messages = result.getMessages();
-            if (messages.size() > 0) {
-                consumer.accept(messages);
-            }
-
-            pages = result.getPages();
-            page++;
-        } while (page < pages);
-
+    public static MessageVo handleMessageList(String email, String path, int page) {
+        String url = "/v1/account/" + email + "/messages?path=" + path + "&page=" + page + "&pageSize=100&documentStore=false";
+        return get(url, MessageVo.class);
     }
 
     /**

+ 12 - 2
src/main/java/com/fjhx/utils/RetryUtil.java

@@ -15,18 +15,28 @@ public class RetryUtil {
         return execute(runnable, 0, 3, 1000L);
     }
 
+    /**
+     * 任务发生异常开启重试
+     *
+     * @param runnable 任务
+     * @param maxCount 最大重试次数
+     * @param sleep    重试间隔
+     * @param <T>      返回值类型
+     * @return runnable执行结果
+     * @throws Exception runnable抛出的异常
+     */
     public static <T> T execute(RetryRunnable<T> runnable, int maxCount, Long sleep) throws Exception {
         return execute(runnable, 0, maxCount, sleep);
     }
 
-    public static <T> T execute(RetryRunnable<T> runnable, int executeCount, int maxCount, Long sleep) throws Exception {
+    private static <T> T execute(RetryRunnable<T> runnable, int executeCount, int maxCount, Long sleep) throws Exception {
         try {
             return runnable.run();
         } catch (Exception e) {
             if (executeCount >= maxCount) {
                 throw e;
             }
-            log.warn("任务执行失败,最多允许失败{}次,已失败{}次,{}秒后开始重试", maxCount, executeCount + 1, sleep / 1000, e);
+            log.warn("任务执行失败,已失败{}次,,最多允许失败{}次{}秒后开始重试", executeCount + 1, maxCount, sleep / 1000, e);
             ThreadUtil.sleep(sleep);
             return execute(runnable, ++executeCount, maxCount, sleep);
         }

+ 2 - 0
src/main/java/com/fjhx/vo/EmailMailboxVo.java

@@ -19,6 +19,8 @@ public class EmailMailboxVo {
 
         private String name;
 
+        private Integer messages;
+
     }
 
 }

+ 53 - 0
src/main/java/com/fjhx/vo/ProgressVo.java

@@ -0,0 +1,53 @@
+package com.fjhx.vo;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ProgressVo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 所有需要同步的邮件数量
+     */
+    private Integer totalMessageCount;
+
+    /**
+     * 已同步的数量
+     */
+    private Integer completeMessageCount;
+
+    /**
+     * 完成百分比
+     */
+    private Integer percentage;
+
+    /**
+     * 明细
+     */
+    private List<Details> detailsList;
+
+    /**
+     * 明细类
+     */
+    @Data
+    public static class Details implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private String path;
+
+        private String name;
+
+        private Integer messages;
+
+        private Integer completeMessageCount;
+
+        private Integer percentage;
+
+    }
+
+
+}