package com.fjhx.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.lang.Assert; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.ObjectUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.fjhx.config.TaskPoolConfig; import com.fjhx.config.exception.EmailEngineException; import com.fjhx.config.redis.RedisCache; import com.fjhx.constants.RedisConstant; import com.fjhx.entity.EmailInfo; import com.fjhx.entity.EmailMessage; import com.fjhx.entity.EmailMessageAttachment; import com.fjhx.entity.EmailMessageContent; import com.fjhx.enums.SendEventEnum; import com.fjhx.service.*; import com.fjhx.utils.EmailEngineUtil; import com.fjhx.utils.RetryUtil; import com.fjhx.vo.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.context.ServletContextAware; import javax.servlet.ServletContext; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.stream.Collectors; @Slf4j @Service public class AccountServiceImpl implements IAccountService, ServletContextAware { @Autowired private IEmailInfoService emailInfoService; @Autowired private IEmailMessageService emailMessageService; @Autowired private IEmailMessageSendService emailMessageSendService; @Autowired private IEmailMessageAttachmentService emailMessageAttachmentService; @Autowired private IEmailMessageContentService emailMessageContentService; @Qualifier(TaskPoolConfig.SYNC_MAILBOX_EXECUTOR) @Autowired private Executor syncMailboxExecutor; @Qualifier(TaskPoolConfig.EXECUTOR) @Autowired private Executor syncExecutor; List syncEmailMessageList = Collections.synchronizedList(new ArrayList<>(100)); List syncMessageAttachmentList = Collections.synchronizedList(new ArrayList<>(100)); /** * 每页获取多少封邮件 */ private static final int SIZE = 50; @Override public void setServletContext(ServletContext servletContext) { syncExecutor.execute(this::syncMessageContent); syncExecutor.execute(this::syncMessageAttachment); } /** * 同步正文 */ private void syncMessageContent() { List collect = emailMessageService.list(Wrappers.lambdaQuery() .select(EmailMessage::getEmail, EmailMessage::getMessageId) .eq(EmailMessage::getContentSync, false) .orderByDesc(EmailMessage::getFromDate).last("limit 50")) .stream() .map(item -> item.getEmail() + "," + item.getMessageId()) .collect(Collectors.toList()); for (String item : collect) { if (!syncEmailMessageList.contains(item)) { syncEmailMessageList.add(item); } } while (syncEmailMessageList.size() > 0) { String remove = null; try { synchronized (this) { log.info("下载正文"); remove = syncEmailMessageList.remove(0); if (ObjectUtils.isEmpty(remove)) { continue; } String[] split = remove.split(","); String email = split[0]; String messageId = split[1]; // 正文内容 MessageDetailVo messageDetail = EmailEngineUtil.getMessageDetail(email, messageId); MessageDetailVo.TextDTO text = messageDetail.getText(); if (ObjectUtils.isNotEmpty(text)) { EmailMessageContent emailMessageContent = new EmailMessageContent(); emailMessageContent.setMessageId(messageId); emailMessageContent.setHtmlContent(text.getHtml()); emailMessageContentService.saveOrUpdate(emailMessageContent); EmailMessage emailMessage = new EmailMessage(); emailMessage.setMessageId(messageId); emailMessage.setContentSync(true); emailMessageService.updateById(emailMessage); } log.info("下载正文成功"); } } catch (Exception e) { log.error("同步正文发生异常:{}", remove, e); } } ThreadUtil.sleep(60 * 1000); this.syncMessageContent(); } private void syncMessageAttachment() { List collect = emailMessageAttachmentService.list(Wrappers.lambdaQuery() .select(EmailMessageAttachment::getAttachmentId, EmailMessageAttachment::getEmail, EmailMessageAttachment::getPath) .eq(EmailMessageAttachment::getIsDownload, false) .orderByDesc(EmailMessageAttachment::getCreateTime) .last("limit 50")) .stream() .map(item -> item.getAttachmentId() + "," + item.getEmail() + "," + item.getPath()) .collect(Collectors.toList()); for (String item : collect) { if (!syncMessageAttachmentList.contains(item)) { syncMessageAttachmentList.add(item); } } while (syncMessageAttachmentList.size() > 0) { String remove = null; try { synchronized (this) { log.info("下载附件"); remove = syncMessageAttachmentList.remove(0); if (ObjectUtil.isEmpty(remove)) { continue; } String[] split = remove.split(","); String attachmentId = split[0]; String email = split[1]; String path = split[2]; // 下载附件 RetryUtil.execute(() -> EmailEngineUtil.downloadAttachment(email, attachmentId, path)); EmailMessageAttachment attachment = new EmailMessageAttachment(); attachment.setAttachmentId(attachmentId); attachment.setIsDownload(true); emailMessageAttachmentService.updateById(attachment); log.info("下载附件成功"); } } catch (Exception e) { log.error("下载附件发生异常:{}", remove, e); } } ThreadUtil.sleep(60 * 1000); this.syncMessageAttachment(); } @Transactional(rollbackFor = Exception.class) @Override public EmailInfo binding(BindingVo bindingVo) { String email = bindingVo.getEmail(); EmailInfo emailInfo; try { emailInfo = emailInfoService.getOne(Wrappers.lambdaQuery().eq(EmailInfo::getEmail, email)); // 如果存在,直接返回邮箱信息 if (emailInfo != null) { return emailInfo; } // 保存账号信息到数据库 emailInfo = this.saveEmailInfo(bindingVo); // 添加账号 EmailEngineUtil.createAccount(bindingVo); // redis添加同步进度 this.progressInitialization(bindingVo); } catch (Exception e) { EmailEngineUtil.deleteAccount(bindingVo.getEmail()); throw e; } // 异步遍历文件夹下的所有邮件 this.asyncReadEmail(emailInfo.getId(), bindingVo, 0); return emailInfo; } @Transactional(rollbackFor = Exception.class) @Override public void listener(ListenerVo listenerVo) { log.info("监听到新消息:{}", JSON.toJSONString(listenerVo)); switch (SendEventEnum.get(listenerVo.getEvent())) { case MESSAGE_NEW: this.handleMessageNewEvent(listenerVo); break; case MESSAGE_DELETED: this.handleMessageDeletedEvent(listenerVo); break; case MESSAGE_UPDATED: this.handleMessageUpdatedEvent(listenerVo); break; default: log.error("监听到未知事件:{}", JSONObject.toJSONString(listenerVo)); } } @Override public void submit(SubmitVo submitVo) { EmailEngineUtil.submit(submitVo); } /** * 添加邮件信息 */ private EmailInfo saveEmailInfo(BindingVo bindingVo) { EmailInfo emailInfo = BeanUtil.copyProperties(bindingVo, EmailInfo.class); emailInfoService.save(emailInfo); return emailInfo; } /** * 同步邮件进度初始化 */ private void progressInitialization(BindingVo bindingVo) { String email = bindingVo.getEmail(); int pages = bindingVo.getPages(); MessageVo messageVo = EmailEngineUtil.getMessageList(email, bindingVo.getPath(), 0, 1); ProgressVo progressVo = new ProgressVo(); progressVo.setEmail(email); progressVo.setTotalMessageCount(messageVo.getTotal() > pages * SIZE ? pages * SIZE : messageVo.getTotal()); progressVo.setCompleteMessageCount(0); progressVo.setPercentage(0); RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo); } /** * 异步读取文件 */ private void asyncReadEmail(Long emailInfoId, BindingVo bindingVo, Integer pageFlag) { String email = bindingVo.getEmail(); String mailbox = bindingVo.getPath(); int pages = bindingVo.getPages(); syncMailboxExecutor.execute(() -> { int page = pageFlag; try { while (page < pages) { // 分页获取文件夹邮件 MessageVo result = EmailEngineUtil.getMessageList(email, mailbox, page, SIZE); // 邮件信息 List messagesDTOList = result.getMessages(); // 批量保存邮件信息 this.saveBatchMessage(emailInfoId, email, messagesDTOList); // 更新同步邮件进度 this.synchronizationProgress(email, messagesDTOList); page++; } } catch (EmailEngineException e) { log.error("下载失败,等待10分钟"); ThreadUtil.sleep(1000 * 60 * 10); asyncReadEmail(emailInfoId, bindingVo, page); } catch (Exception e) { log.error("未知原因同步邮件失败:{}", email, e); } }); String redisKey = RedisConstant.PROGRESS_KEY + email; ProgressVo progressVo = RedisCache.get(redisKey); progressVo.setPercentage(100); RedisCache.set(redisKey, progressVo); } /** * 批量保存邮件信息 */ private void saveBatchMessage(Long emailInfoId, String email, List messagesDTOList) { List emailMessageList = new ArrayList<>(); List emailMessageAttachmentList = new ArrayList<>(); for (MessageVo.MessagesDTO messagesDTO : messagesDTOList) { EmailMessage message = this.createMessage(emailInfoId, email, messagesDTO); emailMessageList.add(message); List attachments = messagesDTO.getAttachments(); if (attachments != null && attachments.size() > 0) { for (MessageVo.MessagesDTO.AttachmentsDTO attachment : attachments) { emailMessageAttachmentList.add(this.createMessageAttachment(message, attachment)); } } } // 保存邮件 emailMessageService.saveOrUpdateBatch(emailMessageList); // 保存邮箱附件 emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList); } /** * 生成邮件实体 */ private EmailMessage createMessage(Long emailInfoId, String email, MessageVo.MessagesDTO messagesDTO) { EmailMessage emailMessage = new EmailMessage(); emailMessage.setMessageId(messagesDTO.getId()); emailMessage.setEmailInfoId(emailInfoId); emailMessage.setEmail(email); emailMessage.setUnseen(messagesDTO.getUnseen()); emailMessage.setFlagged(messagesDTO.getFlagged()); emailMessage.setSubject(messagesDTO.getSubject()); emailMessage.setFromDate(messagesDTO.getDate()); emailMessage.setContentSync(false); // 发件人 MessageVo.MessagesDTO.ToDTO from = messagesDTO.getFrom(); if (ObjectUtils.isNotEmpty(from)) { emailMessage.setFromName(from.getName()); emailMessage.setFromAddress(from.getAddress()); } return emailMessage; } /** * 生成附件实体 */ private EmailMessageAttachment createMessageAttachment(EmailMessage message, MessageVo.MessagesDTO.AttachmentsDTO attachment) { String newFileName = attachment.getId() + "." + FileUtil.getSuffix(attachment.getFilename()); EmailMessageAttachment emailMessageAttachment = new EmailMessageAttachment(); emailMessageAttachment.setAttachmentId(attachment.getId()); emailMessageAttachment.setMessageId(message.getMessageId()); emailMessageAttachment.setEmail(message.getEmail()); emailMessageAttachment.setName(attachment.getFilename()); emailMessageAttachment.setSize(attachment.getEncodedSize()); emailMessageAttachment.setPath(message.getEmail() + "\\" + newFileName); emailMessageAttachment.setIsDownload(false); return emailMessageAttachment; } /** * 同步邮件进度 */ private void synchronizationProgress(String email, List messagesDTOList) { String redisKey = RedisConstant.PROGRESS_KEY + email; ProgressVo progressVo = RedisCache.get(redisKey); int completeMessageCount = progressVo.getCompleteMessageCount() + messagesDTOList.size(); progressVo.setCompleteMessageCount(completeMessageCount); progressVo.setPercentage(100 * completeMessageCount / progressVo.getTotalMessageCount()); RedisCache.set(redisKey, progressVo); } /** * 处理收到新邮件事件 */ private void handleMessageNewEvent(ListenerVo listenerVo) { // 查询邮箱 String account = listenerVo.getAccount(); EmailInfo emailInfo; try { emailInfo = emailInfoService.getOne(Wrappers.lambdaQuery().eq(EmailInfo::getEmail, account)); Assert.notNull(emailInfo, "未找到邮箱信息"); } catch (Exception e) { log.error("查找邮箱失败: {}", account, e); return; } // 获取邮件详情 MessageVo.MessagesDTO messagesDTO = listenerVo.getData().toJavaObject(MessageVo.MessagesDTO.class); // 保存邮件 EmailMessage emailMessage = this.createMessage(emailInfo.getId(), emailInfo.getEmail(), messagesDTO); emailMessageService.saveOrUpdate(emailMessage); // 保存邮箱附件 List emailMessageAttachmentList; List attachments = messagesDTO.getAttachments(); if (attachments != null && attachments.size() > 0) { emailMessageAttachmentList = attachments.stream() .map(attachment -> this.createMessageAttachment(emailMessage, attachment)) .collect(Collectors.toList()); emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList); for (EmailMessageAttachment attachment : emailMessageAttachmentList) { String value = attachment.getAttachmentId() + "," + attachment.getEmail() + "," + attachment.getPath(); syncMessageAttachmentList.add(0, value); } } syncEmailMessageList.add(0, emailMessage.getEmail() + "," + emailMessage.getMessageId()); } /** * 处理删除邮件事件 */ private void handleMessageDeletedEvent(ListenerVo listenerVo) { MessageDeletedEventVo vo = listenerVo.getData().toJavaObject(MessageDeletedEventVo.class); emailMessageService.remove(Wrappers.lambdaQuery().eq(EmailMessage::getMessageId, vo.getId())); } /** * 邮件flag被改变事件 */ private void handleMessageUpdatedEvent(ListenerVo listenerVo) { log.error("邮件flag改变:{}", JSON.toJSONString(listenerVo)); } }