|
- package com.fjhx.service.impl;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.io.FileUtil;
- import cn.hutool.core.lang.Assert;
- import cn.hutool.core.thread.ThreadUtil;
- import cn.hutool.core.util.ObjectUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.fjhx.config.TaskPoolConfig;
- import com.fjhx.config.exception.EmailEngineException;
- import com.fjhx.config.redis.RedisCache;
- import com.fjhx.constants.RedisConstant;
- import com.fjhx.entity.EmailInfo;
- import com.fjhx.entity.EmailMessage;
- import com.fjhx.entity.EmailMessageAttachment;
- import com.fjhx.entity.EmailMessageContent;
- import com.fjhx.enums.SendEventEnum;
- import com.fjhx.service.*;
- import com.fjhx.utils.EmailEngineUtil;
- import com.fjhx.utils.RetryUtil;
- import com.fjhx.vo.*;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.web.context.ServletContextAware;
- import javax.servlet.ServletContext;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.Executor;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- public class AccountServiceImpl implements IAccountService, ServletContextAware {
- @Autowired
- private IEmailInfoService emailInfoService;
- @Autowired
- private IEmailMessageService emailMessageService;
- @Autowired
- private IEmailMessageSendService emailMessageSendService;
- @Autowired
- private IEmailMessageAttachmentService emailMessageAttachmentService;
- @Autowired
- private IEmailMessageContentService emailMessageContentService;
- @Qualifier(TaskPoolConfig.SYNC_MAILBOX_EXECUTOR)
- @Autowired
- private Executor syncMailboxExecutor;
- @Qualifier(TaskPoolConfig.EXECUTOR)
- @Autowired
- private Executor syncExecutor;
- List<String> syncEmailMessageList = Collections.synchronizedList(new ArrayList<>(100));
- List<String> syncMessageAttachmentList = Collections.synchronizedList(new ArrayList<>(100));
- /**
- * 每页获取多少封邮件
- */
- private static final int SIZE = 50;
- @Override
- public void setServletContext(ServletContext servletContext) {
- syncExecutor.execute(this::syncMessageContent);
- syncExecutor.execute(this::syncMessageAttachment);
- }
- /**
- * 同步正文
- */
- private void syncMessageContent() {
- List<String> collect = emailMessageService.list(Wrappers.<EmailMessage>lambdaQuery()
- .select(EmailMessage::getEmail, EmailMessage::getMessageId)
- .eq(EmailMessage::getContentSync, false)
- .orderByDesc(EmailMessage::getFromDate).last("limit 50"))
- .stream()
- .map(item -> item.getEmail() + "," + item.getMessageId())
- .collect(Collectors.toList());
- for (String item : collect) {
- if (!syncEmailMessageList.contains(item)) {
- syncEmailMessageList.add(item);
- }
- }
- while (syncEmailMessageList.size() > 0) {
- String remove = null;
- try {
- synchronized (this) {
- log.info("下载正文");
- remove = syncEmailMessageList.remove(0);
- if (ObjectUtils.isEmpty(remove)) {
- continue;
- }
- String[] split = remove.split(",");
- String email = split[0];
- String messageId = split[1];
- // 正文内容
- MessageDetailVo messageDetail = EmailEngineUtil.getMessageDetail(email, messageId);
- MessageDetailVo.TextDTO text = messageDetail.getText();
- if (ObjectUtils.isNotEmpty(text)) {
- EmailMessageContent emailMessageContent = new EmailMessageContent();
- emailMessageContent.setMessageId(messageId);
- emailMessageContent.setHtmlContent(text.getHtml());
- emailMessageContentService.saveOrUpdate(emailMessageContent);
- EmailMessage emailMessage = new EmailMessage();
- emailMessage.setMessageId(messageId);
- emailMessage.setContentSync(true);
- emailMessageService.updateById(emailMessage);
- }
- log.info("下载正文成功");
- }
- } catch (Exception e) {
- log.error("同步正文发生异常:{}", remove, e);
- }
- }
- ThreadUtil.sleep(60 * 1000);
- this.syncMessageContent();
- }
- private void syncMessageAttachment() {
- List<String> collect = emailMessageAttachmentService.list(Wrappers.<EmailMessageAttachment>lambdaQuery()
- .select(EmailMessageAttachment::getAttachmentId, EmailMessageAttachment::getEmail, EmailMessageAttachment::getPath)
- .eq(EmailMessageAttachment::getIsDownload, false)
- .orderByDesc(EmailMessageAttachment::getCreateTime)
- .last("limit 50"))
- .stream()
- .map(item -> item.getAttachmentId() + "," + item.getEmail() + "," + item.getPath())
- .collect(Collectors.toList());
- for (String item : collect) {
- if (!syncMessageAttachmentList.contains(item)) {
- syncMessageAttachmentList.add(item);
- }
- }
- while (syncMessageAttachmentList.size() > 0) {
- String remove = null;
- try {
- synchronized (this) {
- log.info("下载附件");
- remove = syncMessageAttachmentList.remove(0);
- if (ObjectUtil.isEmpty(remove)) {
- continue;
- }
- String[] split = remove.split(",");
- String attachmentId = split[0];
- String email = split[1];
- String path = split[2];
- // 下载附件
- RetryUtil.execute(() -> EmailEngineUtil.downloadAttachment(email, attachmentId, path));
- EmailMessageAttachment attachment = new EmailMessageAttachment();
- attachment.setAttachmentId(attachmentId);
- attachment.setIsDownload(true);
- emailMessageAttachmentService.updateById(attachment);
- log.info("下载附件成功");
- }
- } catch (Exception e) {
- log.error("下载附件发生异常:{}", remove, e);
- }
- }
- ThreadUtil.sleep(60 * 1000);
- this.syncMessageAttachment();
- }
- @Transactional(rollbackFor = Exception.class)
- @Override
- public EmailInfo binding(BindingVo bindingVo) {
- String email = bindingVo.getEmail();
- EmailInfo emailInfo;
- try {
- emailInfo = emailInfoService.getOne(Wrappers.<EmailInfo>lambdaQuery().eq(EmailInfo::getEmail, email));
- // 如果存在,直接返回邮箱信息
- if (emailInfo != null) {
- return emailInfo;
- }
- // 保存账号信息到数据库
- emailInfo = this.saveEmailInfo(bindingVo);
- // 添加账号
- EmailEngineUtil.createAccount(bindingVo);
- // redis添加同步进度
- this.progressInitialization(bindingVo);
- } catch (Exception e) {
- EmailEngineUtil.deleteAccount(bindingVo.getEmail());
- throw e;
- }
- // 异步遍历文件夹下的所有邮件
- this.asyncReadEmail(emailInfo.getId(), bindingVo, 0);
- return emailInfo;
- }
- @Transactional(rollbackFor = Exception.class)
- @Override
- public void listener(ListenerVo listenerVo) {
- log.info("监听到新消息:{}", JSON.toJSONString(listenerVo));
- switch (SendEventEnum.get(listenerVo.getEvent())) {
- case MESSAGE_NEW:
- this.handleMessageNewEvent(listenerVo);
- break;
- case MESSAGE_DELETED:
- this.handleMessageDeletedEvent(listenerVo);
- break;
- case MESSAGE_UPDATED:
- this.handleMessageUpdatedEvent(listenerVo);
- break;
- default:
- log.error("监听到未知事件:{}", JSONObject.toJSONString(listenerVo));
- }
- }
- @Override
- public void submit(SubmitVo submitVo) {
- EmailEngineUtil.submit(submitVo);
- }
- /**
- * 添加邮件信息
- */
- private EmailInfo saveEmailInfo(BindingVo bindingVo) {
- EmailInfo emailInfo = BeanUtil.copyProperties(bindingVo, EmailInfo.class);
- emailInfoService.save(emailInfo);
- return emailInfo;
- }
- /**
- * 同步邮件进度初始化
- */
- private void progressInitialization(BindingVo bindingVo) {
- String email = bindingVo.getEmail();
- int pages = bindingVo.getPages();
- MessageVo messageVo = EmailEngineUtil.getMessageList(email, bindingVo.getPath(), 0, 1);
- ProgressVo progressVo = new ProgressVo();
- progressVo.setEmail(email);
- progressVo.setTotalMessageCount(messageVo.getTotal() > pages * SIZE ? pages * SIZE : messageVo.getTotal());
- progressVo.setCompleteMessageCount(0);
- progressVo.setPercentage(0);
- RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo);
- }
- /**
- * 异步读取文件
- */
- private void asyncReadEmail(Long emailInfoId, BindingVo bindingVo, Integer pageFlag) {
- String email = bindingVo.getEmail();
- String mailbox = bindingVo.getPath();
- int pages = bindingVo.getPages();
- syncMailboxExecutor.execute(() -> {
- int page = pageFlag;
- try {
- while (page < pages) {
- // 分页获取文件夹邮件
- MessageVo result = EmailEngineUtil.getMessageList(email, mailbox, page, SIZE);
- // 邮件信息
- List<MessageVo.MessagesDTO> messagesDTOList = result.getMessages();
- // 批量保存邮件信息
- this.saveBatchMessage(emailInfoId, email, messagesDTOList);
- // 更新同步邮件进度
- this.synchronizationProgress(email, messagesDTOList);
- page++;
- }
- } catch (EmailEngineException e) {
- log.error("下载失败,等待10分钟");
- ThreadUtil.sleep(1000 * 60 * 10);
- asyncReadEmail(emailInfoId, bindingVo, page);
- } catch (Exception e) {
- log.error("未知原因同步邮件失败:{}", email, e);
- }
- });
- String redisKey = RedisConstant.PROGRESS_KEY + email;
- ProgressVo progressVo = RedisCache.get(redisKey);
- progressVo.setPercentage(100);
- RedisCache.set(redisKey, progressVo);
- }
- /**
- * 批量保存邮件信息
- */
- private void saveBatchMessage(Long emailInfoId, String email, List<MessageVo.MessagesDTO> messagesDTOList) {
- List<EmailMessage> emailMessageList = new ArrayList<>();
- List<EmailMessageAttachment> emailMessageAttachmentList = new ArrayList<>();
- for (MessageVo.MessagesDTO messagesDTO : messagesDTOList) {
- EmailMessage message = this.createMessage(emailInfoId, email, messagesDTO);
- emailMessageList.add(message);
- List<MessageVo.MessagesDTO.AttachmentsDTO> attachments = messagesDTO.getAttachments();
- if (attachments != null && attachments.size() > 0) {
- for (MessageVo.MessagesDTO.AttachmentsDTO attachment : attachments) {
- emailMessageAttachmentList.add(this.createMessageAttachment(message, attachment));
- }
- }
- }
- // 保存邮件
- emailMessageService.saveOrUpdateBatch(emailMessageList);
- // 保存邮箱附件
- emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList);
- }
- /**
- * 生成邮件实体
- */
- private EmailMessage createMessage(Long emailInfoId, String email, MessageVo.MessagesDTO messagesDTO) {
- EmailMessage emailMessage = new EmailMessage();
- emailMessage.setMessageId(messagesDTO.getId());
- emailMessage.setEmailInfoId(emailInfoId);
- emailMessage.setEmail(email);
- emailMessage.setUnseen(messagesDTO.getUnseen());
- emailMessage.setFlagged(messagesDTO.getFlagged());
- emailMessage.setSubject(messagesDTO.getSubject());
- emailMessage.setFromDate(messagesDTO.getDate());
- emailMessage.setContentSync(false);
- // 发件人
- MessageVo.MessagesDTO.ToDTO from = messagesDTO.getFrom();
- if (ObjectUtils.isNotEmpty(from)) {
- emailMessage.setFromName(from.getName());
- emailMessage.setFromAddress(from.getAddress());
- }
- return emailMessage;
- }
- /**
- * 生成附件实体
- */
- private EmailMessageAttachment createMessageAttachment(EmailMessage message, MessageVo.MessagesDTO.AttachmentsDTO attachment) {
- String newFileName = attachment.getId() + "." + FileUtil.getSuffix(attachment.getFilename());
- EmailMessageAttachment emailMessageAttachment = new EmailMessageAttachment();
- emailMessageAttachment.setAttachmentId(attachment.getId());
- emailMessageAttachment.setMessageId(message.getMessageId());
- emailMessageAttachment.setEmail(message.getEmail());
- emailMessageAttachment.setName(attachment.getFilename());
- emailMessageAttachment.setSize(attachment.getEncodedSize());
- emailMessageAttachment.setPath(message.getEmail() + "\\" + newFileName);
- emailMessageAttachment.setIsDownload(false);
- return emailMessageAttachment;
- }
- /**
- * 同步邮件进度
- */
- private void synchronizationProgress(String email, List<MessageVo.MessagesDTO> messagesDTOList) {
- String redisKey = RedisConstant.PROGRESS_KEY + email;
- ProgressVo progressVo = RedisCache.get(redisKey);
- int completeMessageCount = progressVo.getCompleteMessageCount() + messagesDTOList.size();
- progressVo.setCompleteMessageCount(completeMessageCount);
- progressVo.setPercentage(100 * completeMessageCount / progressVo.getTotalMessageCount());
- RedisCache.set(redisKey, progressVo);
- }
- /**
- * 处理收到新邮件事件
- */
- private void handleMessageNewEvent(ListenerVo listenerVo) {
- // 查询邮箱
- String account = listenerVo.getAccount();
- EmailInfo emailInfo;
- try {
- emailInfo = emailInfoService.getOne(Wrappers.<EmailInfo>lambdaQuery().eq(EmailInfo::getEmail, account));
- Assert.notNull(emailInfo, "未找到邮箱信息");
- } catch (Exception e) {
- log.error("查找邮箱失败: {}", account, e);
- return;
- }
- // 获取邮件详情
- MessageVo.MessagesDTO messagesDTO = listenerVo.getData().toJavaObject(MessageVo.MessagesDTO.class);
- // 保存邮件
- EmailMessage emailMessage = this.createMessage(emailInfo.getId(), emailInfo.getEmail(), messagesDTO);
- emailMessageService.saveOrUpdate(emailMessage);
- // 保存邮箱附件
- List<EmailMessageAttachment> emailMessageAttachmentList;
- List<MessageVo.MessagesDTO.AttachmentsDTO> attachments = messagesDTO.getAttachments();
- if (attachments != null && attachments.size() > 0) {
- emailMessageAttachmentList = attachments.stream()
- .map(attachment -> this.createMessageAttachment(emailMessage, attachment))
- .collect(Collectors.toList());
- emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList);
- for (EmailMessageAttachment attachment : emailMessageAttachmentList) {
- String value = attachment.getAttachmentId() + "," + attachment.getEmail() + "," + attachment.getPath();
- syncMessageAttachmentList.add(0, value);
- }
- }
- syncEmailMessageList.add(0, emailMessage.getEmail() + "," + emailMessage.getMessageId());
- }
- /**
- * 处理删除邮件事件
- */
- private void handleMessageDeletedEvent(ListenerVo listenerVo) {
- MessageDeletedEventVo vo = listenerVo.getData().toJavaObject(MessageDeletedEventVo.class);
- emailMessageService.remove(Wrappers.<EmailMessage>lambdaQuery().eq(EmailMessage::getMessageId, vo.getId()));
- }
- /**
- * 邮件flag被改变事件
- */
- private void handleMessageUpdatedEvent(ListenerVo listenerVo) {
- log.error("邮件flag改变:{}", JSON.toJSONString(listenerVo));
- }
- }
|