|
@@ -0,0 +1,471 @@
|
|
|
+package com.fjhx.email.service.impl;
|
|
|
+
|
|
|
+import cn.hutool.core.date.DateTime;
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import cn.hutool.core.thread.ThreadUtil;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.fjhx.email.config.TaskPoolConfig;
|
|
|
+import com.fjhx.email.config.base.BaseEntity;
|
|
|
+import com.fjhx.email.entity.*;
|
|
|
+import com.fjhx.email.entity.dto.MailFolderInfo;
|
|
|
+import com.fjhx.email.entity.dto.MailInfo;
|
|
|
+import com.fjhx.email.entity.dto.MailSyncInfo;
|
|
|
+import com.fjhx.email.entity.dto.MailboxInfo;
|
|
|
+import com.fjhx.email.service.*;
|
|
|
+import com.fjhx.email.utils.EmailUtil;
|
|
|
+import com.sun.mail.imap.IMAPMessage;
|
|
|
+import com.sun.mail.imap.IMAPStore;
|
|
|
+import com.sun.mail.util.MailConnectException;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.boot.ApplicationArguments;
|
|
|
+import org.springframework.boot.ApplicationRunner;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.PlatformTransactionManager;
|
|
|
+import org.springframework.transaction.TransactionDefinition;
|
|
|
+import org.springframework.transaction.TransactionStatus;
|
|
|
+
|
|
|
+import javax.mail.*;
|
|
|
+import javax.mail.internet.InternetAddress;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.Executor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class CoreServiceImpl implements ApplicationRunner {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private PlatformTransactionManager platformTransactionManager;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TransactionDefinition transactionDefinition;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ @Qualifier(TaskPoolConfig.emailInfoTaskExecutor)
|
|
|
+ private Executor emailTaskExecutor;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IPersonalMailboxService personalMailboxService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IPersonalFolderService personalFolderService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IPersonalMessageService personalMessageService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IEnterpriseMailboxService enterpriseMailboxService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IEnterpriseFolderService enterpriseFolderService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IEnterpriseMessageService enterpriseMessageService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 累计同步次数
|
|
|
+ */
|
|
|
+ private static int num = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 单次同步邮件总次数
|
|
|
+ */
|
|
|
+ private static int mailCount;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 缓存错误次数
|
|
|
+ */
|
|
|
+ private static final Map<Long, Integer> map = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 正在同步的邮箱集合
|
|
|
+ */
|
|
|
+ private static final List<Long> syncMailList = Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(ApplicationArguments args) {
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 发起邮件同步
|
|
|
+ new Thread(this::monitorMail).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 邮件同步
|
|
|
+ */
|
|
|
+ public void monitorMail() {
|
|
|
+ try {
|
|
|
+ doMonitorMail();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("未知异常", e);
|
|
|
+ }
|
|
|
+ new Thread(this::monitorMail).start();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行邮件同步逻辑
|
|
|
+ */
|
|
|
+ private void doMonitorMail() {
|
|
|
+
|
|
|
+ // 所有需要同步的邮件
|
|
|
+ List<MailboxInfo> mailboxInfoList = new ArrayList<>(MailSyncInfo.mailboxInfoList);
|
|
|
+ int mailboxSize = mailboxInfoList.size();
|
|
|
+
|
|
|
+ // 开始处理时间
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 清空抓取邮件数
|
|
|
+ mailCount = 0;
|
|
|
+
|
|
|
+ // 累计同步次数
|
|
|
+ num++;
|
|
|
+
|
|
|
+ // log.info("开始第 {} 伦邮件同步,共同步 {} 个邮箱", num, mailboxSize);
|
|
|
+
|
|
|
+ // 定义一个线程计数器
|
|
|
+ CountDownLatch countDownLatch = new CountDownLatch(mailboxSize);
|
|
|
+
|
|
|
+ // 开启抓取邮件任务
|
|
|
+ for (MailboxInfo mailbox : mailboxInfoList) {
|
|
|
+
|
|
|
+ // 开启异步操作
|
|
|
+ emailTaskExecutor.execute(() -> {
|
|
|
+ try {
|
|
|
+ synchronousMail(mailbox);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("发生未知异常", e);
|
|
|
+ }
|
|
|
+ countDownLatch.countDown();
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ boolean awaitFlag = countDownLatch.await(5, TimeUnit.MINUTES);
|
|
|
+ if (!awaitFlag) {
|
|
|
+ log.error("超时执行");
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("countDownLatch.await() 发生异常", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 结束处理时间
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // 处理时间
|
|
|
+ long handleTime = end - start;
|
|
|
+
|
|
|
+ log.info("第 {} 伦邮件同步完成,共同步 {} 个邮箱,{} 封邮件,总耗时 {}", num, mailboxSize, mailCount, DateUtil.formatBetween(handleTime));
|
|
|
+
|
|
|
+ // 处理时间是否小于最小处理时间
|
|
|
+ if (MailSyncInfo.minWaitingTime > handleTime) {
|
|
|
+ ThreadUtil.sleep(MailSyncInfo.minWaitingTime - handleTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步邮箱
|
|
|
+ */
|
|
|
+ private void synchronousMail(MailboxInfo mailboxInfo) {
|
|
|
+
|
|
|
+ Long id = mailboxInfo.getId();
|
|
|
+
|
|
|
+ if (mailboxInfo.getSkip()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (syncMailList.contains(id)) {
|
|
|
+ log.error("邮箱 {} 同步超时:{}", mailboxInfo.getMailUser(), JSON.toJSONString(mailboxInfo));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 开始同步,添加id
|
|
|
+ syncMailList.add(id);
|
|
|
+
|
|
|
+ IMAPStore store = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ // 获取邮件连接
|
|
|
+ store = EmailUtil.getIMAPStore(mailboxInfo);
|
|
|
+
|
|
|
+ List<MailFolderInfo> mailFolderInfoList = mailboxInfo.getMailFolderInfoList();
|
|
|
+
|
|
|
+ // 分别拉取每个文件夹邮件
|
|
|
+ for (MailFolderInfo mailFolder : mailFolderInfoList) {
|
|
|
+
|
|
|
+ // 是否跳过文件夹抓取
|
|
|
+ if (mailFolder.getSkip()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Folder folder = store.getFolder(mailFolder.getName());
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 只读方式打开收件箱
|
|
|
+ folder.open(Folder.READ_ONLY);
|
|
|
+ } catch (FolderNotFoundException folderNotFoundException) {
|
|
|
+ mailFolder.setSkip(true);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 获取文件夹邮件
|
|
|
+ Message[] messages = folder.getMessages();
|
|
|
+ if (messages.length == 0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果邮件最后一封和最后一次同步id相同,则代表没有新邮件
|
|
|
+ IMAPMessage message = (IMAPMessage) messages[messages.length - 1];
|
|
|
+ int messageNumber = message.getMessageNumber();
|
|
|
+ Date receivedDate = message.getReceivedDate();
|
|
|
+
|
|
|
+ if (Objects.equals(messageNumber, mailFolder.getLastMessageNumber())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 拉取邮件
|
|
|
+ List<MailInfo> mailInfoList = getMailInfoList(mailFolder, messages);
|
|
|
+
|
|
|
+ Integer lastMessageNumber = mailFolder.getLastMessageNumber();
|
|
|
+ Date lastReceivedDate = mailFolder.getLastReceivedDate();
|
|
|
+
|
|
|
+ // 保存最后一份邮件id与收件时间
|
|
|
+ mailFolder.setLastMessageNumber(messageNumber);
|
|
|
+ mailFolder.setLastReceivedDate(receivedDate);
|
|
|
+
|
|
|
+ try {
|
|
|
+ saveMailInfo(mailboxInfo, mailFolder, mailInfoList);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 回滚
|
|
|
+ mailFolder.setLastMessageNumber(lastMessageNumber);
|
|
|
+ mailFolder.setLastReceivedDate(lastReceivedDate);
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (folder.isOpen()) {
|
|
|
+ folder.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 删除错误次数
|
|
|
+ map.remove(id);
|
|
|
+
|
|
|
+ } catch (AuthenticationFailedException | MailConnectException exception) {
|
|
|
+
|
|
|
+ // 添加异常次数
|
|
|
+ map.merge(id, 1, Integer::sum);
|
|
|
+
|
|
|
+ // 异常次数连续超过3次,不再同步邮件
|
|
|
+ if (map.get(id) >= MailSyncInfo.errorNumber) {
|
|
|
+ mailboxInfo.setSkip(true);
|
|
|
+ map.remove(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+
|
|
|
+ log.error("邮件同步出错,邮箱信息:{}", JSON.toJSONString(mailboxInfo));
|
|
|
+ e.printStackTrace();
|
|
|
+
|
|
|
+ } finally {
|
|
|
+
|
|
|
+ // 同步完成,删除id
|
|
|
+ syncMailList.remove(id);
|
|
|
+
|
|
|
+ if (store != null) {
|
|
|
+ try {
|
|
|
+ if (store.isConnected()) {
|
|
|
+ store.close();
|
|
|
+ }
|
|
|
+ } catch (MessagingException e) {
|
|
|
+ log.error("store关闭失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 拉取邮件
|
|
|
+ */
|
|
|
+ private List<MailInfo> getMailInfoList(MailFolderInfo mailFolder, Message[] messages) throws MessagingException {
|
|
|
+
|
|
|
+ // 获取最后一次发件id
|
|
|
+ Integer lastMessageNumber = mailFolder.getLastMessageNumber();
|
|
|
+ // 获取最后一次收件的发送时间
|
|
|
+ Date lastReceivedDate = mailFolder.getLastReceivedDate();
|
|
|
+ // 同步时间
|
|
|
+ DateTime date = DateUtil.offsetDay(new Date(), -MailSyncInfo.initDay);
|
|
|
+ // 最多只同步 initDay 天数据
|
|
|
+ if (lastReceivedDate == null || lastReceivedDate.before(date)) {
|
|
|
+ lastReceivedDate = date;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 收件列表
|
|
|
+ List<MailInfo> mailInfoList = new ArrayList<>();
|
|
|
+
|
|
|
+ int length = messages.length;
|
|
|
+ for (int i = length - 1; i >= 0; i--) {
|
|
|
+ IMAPMessage message = (IMAPMessage) messages[i];
|
|
|
+ // 邮件发送时间
|
|
|
+ Date receivedDate = message.getReceivedDate();
|
|
|
+ int messageNumber = message.getMessageNumber();
|
|
|
+ // 邮件id
|
|
|
+ String messageID = message.getMessageID();
|
|
|
+
|
|
|
+ // 最后一封邮件id相同,代表上次同步邮件到这,不继续找之前的邮件了
|
|
|
+ if (ObjectUtil.isNotEmpty(lastMessageNumber)) {
|
|
|
+ if (lastMessageNumber.equals(messageNumber)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 收件时间在上次收件时间之前,不继续找之前的邮件了
|
|
|
+ if (receivedDate.before(lastReceivedDate) || receivedDate.equals(lastReceivedDate)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 邮件信息
|
|
|
+ MailInfo mailInfo = new MailInfo();
|
|
|
+ mailInfo.setMessageId(messageID);
|
|
|
+ mailInfo.setMessageNumber(messageNumber);
|
|
|
+ mailInfo.setSubject(message.getSubject());
|
|
|
+ mailInfo.setFlags(EmailUtil.getFlags(message.getFlags()));
|
|
|
+ mailInfo.setSendDate(receivedDate);
|
|
|
+
|
|
|
+ // 保存发件人信息
|
|
|
+ Address[] addresses = message.getFrom();
|
|
|
+ if (addresses != null && addresses.length > 0) {
|
|
|
+ InternetAddress internetAddress = (InternetAddress) addresses[0];
|
|
|
+ mailInfo.setFromEmail(internetAddress.getAddress());
|
|
|
+ mailInfo.setFromPersonalName(internetAddress.getPersonal());
|
|
|
+ mailInfo.setFromType("from");
|
|
|
+ } else {
|
|
|
+ InternetAddress sender = (InternetAddress) message.getSender();
|
|
|
+ if (sender != null) {
|
|
|
+ mailInfo.setFromEmail(sender.getAddress());
|
|
|
+ mailInfo.setFromPersonalName(sender.getPersonal());
|
|
|
+ mailInfo.setFromType("sender");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 添加到邮件列表
|
|
|
+ mailInfoList.add(0, mailInfo);
|
|
|
+
|
|
|
+ // 添加同步邮件总次数
|
|
|
+ addMailCount();
|
|
|
+ }
|
|
|
+
|
|
|
+ return mailInfoList;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 保存邮件数据,更新文件夹最后同步邮件时间
|
|
|
+ */
|
|
|
+ private void saveMailInfo(MailboxInfo mailboxInfo, MailFolderInfo mailFolder, List<MailInfo> mailInfoList) {
|
|
|
+ TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
|
|
|
+ try {
|
|
|
+
|
|
|
+ // 个人邮箱
|
|
|
+ if (mailboxInfo.getType() == 1) {
|
|
|
+
|
|
|
+ // 邮箱是否同步
|
|
|
+ if (mailboxInfo.getSkip()) {
|
|
|
+ personalMailboxService.update(q -> q.eq(BaseEntity::getId, mailboxInfo.getId()).set(PersonalMailbox::getSyncStatus, mailboxInfo.getSkip() ? 0 : 1).set(BaseEntity::getUpdateTime, new Date()));
|
|
|
+ }
|
|
|
+
|
|
|
+ PersonalFolder personalFolder = new PersonalFolder();
|
|
|
+ personalFolder.setId(mailFolder.getId());
|
|
|
+ personalFolder.setLastMessageNumber(mailFolder.getLastMessageNumber());
|
|
|
+ personalFolder.setLastReceivedDate(mailFolder.getLastReceivedDate());
|
|
|
+ personalFolder.setSyncStatus(mailFolder.getSkip() ? 0 : 1);
|
|
|
+ personalFolderService.updateById(personalFolder);
|
|
|
+
|
|
|
+ if (mailInfoList.size() > 0) {
|
|
|
+ List<PersonalMessage> personalMessageList = mailInfoList.stream().map(mailInfo -> {
|
|
|
+ PersonalMessage personalMessage = new PersonalMessage();
|
|
|
+ personalMessage.setMailboxId(mailboxInfo.getId());
|
|
|
+ personalMessage.setFolderId(mailFolder.getId());
|
|
|
+ personalMessage.setFolderName(mailFolder.getName());
|
|
|
+ personalMessage.setMessageId(mailInfo.getMessageId());
|
|
|
+ personalMessage.setMessageNumber(mailInfo.getMessageNumber());
|
|
|
+ personalMessage.setSubject(mailInfo.getSubject());
|
|
|
+ personalMessage.setFlags(mailInfo.getFlags());
|
|
|
+ personalMessage.setFromEmail(mailInfo.getFromEmail());
|
|
|
+ personalMessage.setFromPersonalName(mailInfo.getFromPersonalName());
|
|
|
+ personalMessage.setFromType(mailInfo.getFromType());
|
|
|
+ personalMessage.setSendDate(mailInfo.getSendDate());
|
|
|
+ personalMessage.setContentSync(0);
|
|
|
+ personalMessage.setAddressSync(0);
|
|
|
+ personalMessage.setAddressSync(0);
|
|
|
+ return personalMessage;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+ personalMessageService.saveBatch(personalMessageList);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ // 企业邮箱
|
|
|
+ else {
|
|
|
+
|
|
|
+ // 邮箱是否同步
|
|
|
+ if (mailboxInfo.getSkip()) {
|
|
|
+ enterpriseMailboxService.update(q -> q.eq(BaseEntity::getId, mailboxInfo.getId()).set(EnterpriseMailbox::getSyncStatus, mailboxInfo.getSkip() ? 0 : 1).set(BaseEntity::getUpdateTime, new Date()));
|
|
|
+ }
|
|
|
+
|
|
|
+ EnterpriseFolder enterpriseFolder = new EnterpriseFolder();
|
|
|
+ enterpriseFolder.setId(mailFolder.getId());
|
|
|
+ enterpriseFolder.setLastMessageNumber(mailFolder.getLastMessageNumber());
|
|
|
+ enterpriseFolder.setLastReceivedDate(mailFolder.getLastReceivedDate());
|
|
|
+ enterpriseFolder.setSyncStatus(mailFolder.getSkip() ? 0 : 1);
|
|
|
+ enterpriseFolderService.updateById(enterpriseFolder);
|
|
|
+
|
|
|
+ if (mailInfoList.size() > 0) {
|
|
|
+ List<EnterpriseMessage> enterpriseMessageList = mailInfoList.stream().map(mailInfo -> {
|
|
|
+ EnterpriseMessage enterpriseMessage = new EnterpriseMessage();
|
|
|
+ enterpriseMessage.setMailboxId(mailboxInfo.getId());
|
|
|
+ enterpriseMessage.setFolderId(mailFolder.getId());
|
|
|
+ enterpriseMessage.setFolderName(mailFolder.getName());
|
|
|
+ enterpriseMessage.setMessageId(mailInfo.getMessageId());
|
|
|
+ enterpriseMessage.setMessageNumber(mailInfo.getMessageNumber());
|
|
|
+ enterpriseMessage.setSubject(mailInfo.getSubject());
|
|
|
+ enterpriseMessage.setFlags(mailInfo.getFlags());
|
|
|
+ enterpriseMessage.setFromEmail(mailInfo.getFromEmail());
|
|
|
+ enterpriseMessage.setFromPersonalName(mailInfo.getFromPersonalName());
|
|
|
+ enterpriseMessage.setFromType(mailInfo.getFromType());
|
|
|
+ enterpriseMessage.setSendDate(mailInfo.getSendDate());
|
|
|
+ enterpriseMessage.setContentSync(0);
|
|
|
+ enterpriseMessage.setAddressSync(0);
|
|
|
+ enterpriseMessage.setAddressSync(0);
|
|
|
+ return enterpriseMessage;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+ enterpriseMessageService.saveBatch(enterpriseMessageList);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ // 提交事务
|
|
|
+ platformTransactionManager.commit(transactionStatus);
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 回滚事务
|
|
|
+ platformTransactionManager.rollback(transactionStatus);
|
|
|
+ throw new RuntimeException("保存数据失败");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加同步邮件数量
|
|
|
+ */
|
|
|
+ private static synchronized void addMailCount() {
|
|
|
+ mailCount++;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|