24282 2 жил өмнө
parent
commit
3ce83400f2

+ 25 - 3
bladex-saas-project/new-mail/src/main/java/com/fjhx/config/TaskPoolConfig.java

@@ -18,10 +18,14 @@ import java.util.concurrent.TimeUnit;
 @Configuration
 @Configuration
 public class TaskPoolConfig {
 public class TaskPoolConfig {
 
 
-    public static final String emailTaskExecutor = "emailTaskExecutor";
+    public static final String emailInfoTaskExecutor = "emailInfoTaskExecutor";
+    public static final String emailContentTaskExecutor = "emailContentTaskExecutor";
 
 
-    @Bean(name = emailTaskExecutor)
-    public ThreadPoolExecutor emailTaskExecutor() {
+    /**
+     * 获取邮件基本信息线程池
+     */
+    @Bean(name = emailInfoTaskExecutor)
+    public ThreadPoolExecutor emailInfoTaskExecutor() {
         return new ThreadPoolExecutor(
         return new ThreadPoolExecutor(
                 10,
                 10,
                 20,
                 20,
@@ -36,4 +40,22 @@ public class TaskPoolConfig {
         );
         );
     }
     }
 
 
+    /**
+     * 获取邮件正文、附件、地址线程池
+     */
+    @Bean(name = emailContentTaskExecutor)
+    public ThreadPoolExecutor emailContentTaskExecutor() {
+        return new ThreadPoolExecutor(
+                10,
+                20,
+                60,
+                TimeUnit.SECONDS,
+                new ArrayBlockingQueue<>(20),
+                (runnable, executor) -> {
+                    // 本线程执行
+                    runnable.run();
+                }
+        );
+    }
+
 }
 }

+ 20 - 3
bladex-saas-project/new-mail/src/main/java/com/fjhx/service/impl/CoreServiceImpl.java

@@ -23,6 +23,9 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
 
 
 import javax.mail.*;
 import javax.mail.*;
 import javax.mail.internet.InternetAddress;
 import javax.mail.internet.InternetAddress;
@@ -45,7 +48,13 @@ public class CoreServiceImpl implements ApplicationRunner {
     private MailInfoService mailInfoService;
     private MailInfoService mailInfoService;
 
 
     @Autowired
     @Autowired
-    @Qualifier(TaskPoolConfig.emailTaskExecutor)
+    private PlatformTransactionManager platformTransactionManager;
+
+    @Autowired
+    private TransactionDefinition transactionDefinition;
+
+    @Autowired
+    @Qualifier(TaskPoolConfig.emailInfoTaskExecutor)
     private Executor emailTaskExecutor;
     private Executor emailTaskExecutor;
 
 
     // 累计同步次数
     // 累计同步次数
@@ -311,8 +320,16 @@ public class CoreServiceImpl implements ApplicationRunner {
      * 保存邮件数据,更新文件夹最后同步邮件时间
      * 保存邮件数据,更新文件夹最后同步邮件时间
      */
      */
     private void saveMailInfo(MailFolder mailFolder, List<MailInfo> mailInfoList) {
     private void saveMailInfo(MailFolder mailFolder, List<MailInfo> mailInfoList) {
-        mailFolderService.updateById(mailFolder);
-        mailInfoService.saveBatch(mailInfoList);
+        TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
+        try {
+            mailFolderService.updateById(mailFolder);
+            mailInfoService.saveBatch(mailInfoList);
+            // 提交事务
+            platformTransactionManager.commit(transactionStatus);
+        } catch (Exception e) {
+            // 回滚事务
+            platformTransactionManager.rollback(transactionStatus);
+        }
     }
     }
 
 
     /**
     /**

+ 66 - 19
bladex-saas-project/new-mail/src/main/java/com/fjhx/service/impl/MailInfoServiceImpl.java

@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fjhx.config.TaskPoolConfig;
 import com.fjhx.dto.SendDto;
 import com.fjhx.dto.SendDto;
 import com.fjhx.entity.*;
 import com.fjhx.entity.*;
 import com.fjhx.mapper.MailInfoMapper;
 import com.fjhx.mapper.MailInfoMapper;
@@ -15,7 +16,6 @@ import com.fjhx.vo.MailDetailsVo;
 import com.sun.mail.imap.IMAPFolder;
 import com.sun.mail.imap.IMAPFolder;
 import com.sun.mail.imap.IMAPMessage;
 import com.sun.mail.imap.IMAPMessage;
 import com.sun.mail.imap.IMAPStore;
 import com.sun.mail.imap.IMAPStore;
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springblade.core.log.exception.ServiceException;
 import org.springblade.core.log.exception.ServiceException;
 import org.springblade.core.tool.api.R;
 import org.springblade.core.tool.api.R;
@@ -23,7 +23,11 @@ import org.springblade.core.tool.utils.StringPool;
 import org.springblade.resource.entity.ObsUpload;
 import org.springblade.resource.entity.ObsUpload;
 import org.springblade.resource.feign.IObsClient;
 import org.springblade.resource.feign.IObsClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
 
 
 import javax.mail.*;
 import javax.mail.*;
 import javax.mail.internet.MimeUtility;
 import javax.mail.internet.MimeUtility;
@@ -31,6 +35,7 @@ import java.io.*;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.concurrent.Executor;
 
 
 /**
 /**
  * <p>
  * <p>
@@ -42,7 +47,6 @@ import java.util.Objects;
  */
  */
 @Slf4j
 @Slf4j
 @Service
 @Service
-@AllArgsConstructor
 public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> implements MailInfoService {
 public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> implements MailInfoService {
 
 
     @Autowired
     @Autowired
@@ -58,6 +62,16 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
     private MailAttachmentService mailAttachmentService;
     private MailAttachmentService mailAttachmentService;
 
 
     @Autowired
     @Autowired
+    private PlatformTransactionManager platformTransactionManager;
+
+    @Autowired
+    private TransactionDefinition transactionDefinition;
+
+    @Autowired
+    @Qualifier(TaskPoolConfig.emailInfoTaskExecutor)
+    private Executor emailTaskExecutor;
+
+    @Autowired
     private IObsClient obsClient;
     private IObsClient obsClient;
 
 
     @Override
     @Override
@@ -89,11 +103,7 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
                 if (ObjectUtil.notEqual(addressSync, 1)) {
                 if (ObjectUtil.notEqual(addressSync, 1)) {
                     List<MailAddress> mailAddressList = EmailUtil.mailAddressList(message, mailId);
                     List<MailAddress> mailAddressList = EmailUtil.mailAddressList(message, mailId);
                     mailDetailsVo.setMailAddressList(mailAddressList);
                     mailDetailsVo.setMailAddressList(mailAddressList);
-                    new Thread(() -> {
-                        mailAddressService.saveBatch(mailAddressList);
-                        this.update(Wrappers.<MailInfo>lambdaUpdate()
-                                .eq(MailInfo::getId, mailId).set(MailInfo::getAddressSync, 1));
-                    }).start();
+                    saveAddress(mailAddressList, mailId);
                 }
                 }
 
 
                 // 附件正文
                 // 附件正文
@@ -106,15 +116,9 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
                     mailDetailsVo.setMessage(mailMessage);
                     mailDetailsVo.setMessage(mailMessage);
                     mailDetailsVo.setMailAttachmentList(mailAttachmentList);
                     mailDetailsVo.setMailAttachmentList(mailAttachmentList);
 
 
-                    saveMessageAndAttachment(mailDetailsVo, message);
-
-                    new Thread(() -> {
-                        mailMessageService.save(mailMessage);
-                        mailAttachmentService.saveBatch(mailAttachmentList);
+                    getMessageAndAttachment(mailDetailsVo, message);
 
 
-                        this.update(Wrappers.<MailInfo>lambdaUpdate().eq(MailInfo::getId, mailId)
-                                .set(MailInfo::getMessageSync, 1).set(MailInfo::getAttachmentSync, 1));
-                    }).start();
+                    saveMessageAndAttachment(mailMessage, mailAttachmentList, mailId);
 
 
                 }
                 }
 
 
@@ -143,11 +147,14 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
             List<MailAddress> mailAddressList = mailAddressService.list(Wrappers.<MailAddress>lambdaQuery().eq(MailAddress::getMailInfoId, mailId));
             List<MailAddress> mailAddressList = mailAddressService.list(Wrappers.<MailAddress>lambdaQuery().eq(MailAddress::getMailInfoId, mailId));
             mailDetailsVo.setMailAddressList(mailAddressList);
             mailDetailsVo.setMailAddressList(mailAddressList);
         }
         }
+
         if (Objects.equals(messageSync, 1)) {
         if (Objects.equals(messageSync, 1)) {
             List<MailMessage> list = mailMessageService.list(Wrappers.<MailMessage>lambdaQuery().eq(MailMessage::getMailInfoId, mailId));
             List<MailMessage> list = mailMessageService.list(Wrappers.<MailMessage>lambdaQuery().eq(MailMessage::getMailInfoId, mailId));
-            if (list.size() > 0)
+            if (list.size() > 0) {
                 mailDetailsVo.setMessage(list.get(0));
                 mailDetailsVo.setMessage(list.get(0));
+            }
         }
         }
+
         if (Objects.equals(attachmentSync, 1)) {
         if (Objects.equals(attachmentSync, 1)) {
             List<MailAttachment> list = mailAttachmentService.list(Wrappers.<MailAttachment>lambdaQuery().eq(MailAttachment::getMailInfoId, mailId));
             List<MailAttachment> list = mailAttachmentService.list(Wrappers.<MailAttachment>lambdaQuery().eq(MailAttachment::getMailInfoId, mailId));
             mailDetailsVo.setMailAttachmentList(list);
             mailDetailsVo.setMailAttachmentList(list);
@@ -173,7 +180,7 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
     /**
     /**
      * 保存附件和正文
      * 保存附件和正文
      */
      */
-    private void saveMessageAndAttachment(MailDetailsVo mailDetailsVo, Part part) throws MessagingException, IOException {
+    private void getMessageAndAttachment(MailDetailsVo mailDetailsVo, Part part) throws MessagingException, IOException {
         MailMessage message = mailDetailsVo.getMessage();
         MailMessage message = mailDetailsVo.getMessage();
         List<MailAttachment> mailAttachmentList = mailDetailsVo.getMailAttachmentList();
         List<MailAttachment> mailAttachmentList = mailDetailsVo.getMailAttachmentList();
 
 
@@ -221,7 +228,7 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
 
 
                 // 大对象
                 // 大对象
                 else if (bodyPart.isMimeType("multipart/*")) {
                 else if (bodyPart.isMimeType("multipart/*")) {
-                    saveMessageAndAttachment(mailDetailsVo, bodyPart);
+                    getMessageAndAttachment(mailDetailsVo, bodyPart);
                 }
                 }
 
 
                 // 其他类型
                 // 其他类型
@@ -236,10 +243,12 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
 
 
             }
             }
         }
         }
+
         // rfc822
         // rfc822
         else if (part.isMimeType("message/rfc822")) {
         else if (part.isMimeType("message/rfc822")) {
-            saveMessageAndAttachment(mailDetailsVo, (Part) part.getContent());
+            getMessageAndAttachment(mailDetailsVo, (Part) part.getContent());
         }
         }
+
         // 未知格式
         // 未知格式
         else {
         else {
             log.error("未知文件格式:{} ,邮件id:{},待解析", part.getContentType(), message.getMailInfoId());
             log.error("未知文件格式:{} ,邮件id:{},待解析", part.getContentType(), message.getMailInfoId());
@@ -330,4 +339,42 @@ public class MailInfoServiceImpl extends ServiceImpl<MailInfoMapper, MailInfo> i
         return null;
         return null;
     }
     }
 
 
+    /**
+     * 保存地址
+     */
+    private void saveAddress(List<MailAddress> mailAddressList, Long mailId) {
+        emailTaskExecutor.execute(() -> {
+            TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
+            try {
+                mailAddressService.saveBatch(mailAddressList);
+                update(Wrappers.<MailInfo>lambdaUpdate().eq(MailInfo::getId, mailId).set(MailInfo::getAddressSync, 1));
+                // 提交事务
+                platformTransactionManager.commit(transactionStatus);
+            } catch (Exception e) {
+                // 回滚事务
+                platformTransactionManager.rollback(transactionStatus);
+            }
+        });
+    }
+
+    /**
+     * 保存正文和附件
+     */
+    private void saveMessageAndAttachment(MailMessage mailMessage, List<MailAttachment> mailAttachmentList, Long mailId) {
+        emailTaskExecutor.execute(() -> {
+            TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
+            try {
+                mailMessageService.save(mailMessage);
+                mailAttachmentService.saveBatch(mailAttachmentList);
+                update(Wrappers.<MailInfo>lambdaUpdate().eq(MailInfo::getId, mailId).set(MailInfo::getMessageSync, 1).set(MailInfo::getAttachmentSync, 1));
+
+                // 提交事务
+                platformTransactionManager.commit(transactionStatus);
+            } catch (Exception e) {
+                // 回滚事务
+                platformTransactionManager.rollback(transactionStatus);
+            }
+        });
+    }
+
 }
 }