AccountServiceImpl.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package com.fjhx.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.io.FileUtil;
  4. import cn.hutool.core.lang.Assert;
  5. import cn.hutool.core.thread.ThreadUtil;
  6. import cn.hutool.core.util.ObjectUtil;
  7. import com.alibaba.fastjson.JSON;
  8. import com.alibaba.fastjson.JSONObject;
  9. import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
  10. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  11. import com.fjhx.config.TaskPoolConfig;
  12. import com.fjhx.config.exception.EmailEngineException;
  13. import com.fjhx.config.redis.RedisCache;
  14. import com.fjhx.constants.RedisConstant;
  15. import com.fjhx.entity.EmailInfo;
  16. import com.fjhx.entity.EmailMessage;
  17. import com.fjhx.entity.EmailMessageAttachment;
  18. import com.fjhx.entity.EmailMessageContent;
  19. import com.fjhx.enums.SendEventEnum;
  20. import com.fjhx.service.*;
  21. import com.fjhx.utils.EmailEngineUtil;
  22. import com.fjhx.utils.RetryUtil;
  23. import com.fjhx.vo.*;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.beans.factory.annotation.Qualifier;
  27. import org.springframework.stereotype.Service;
  28. import org.springframework.transaction.annotation.Transactional;
  29. import org.springframework.web.context.ServletContextAware;
  30. import javax.servlet.ServletContext;
  31. import java.util.ArrayList;
  32. import java.util.Collections;
  33. import java.util.List;
  34. import java.util.concurrent.Executor;
  35. import java.util.stream.Collectors;
  36. @Slf4j
  37. @Service
  38. public class AccountServiceImpl implements IAccountService, ServletContextAware {
  39. @Autowired
  40. private IEmailInfoService emailInfoService;
  41. @Autowired
  42. private IEmailMessageService emailMessageService;
  43. @Autowired
  44. private IEmailMessageSendService emailMessageSendService;
  45. @Autowired
  46. private IEmailMessageAttachmentService emailMessageAttachmentService;
  47. @Autowired
  48. private IEmailMessageContentService emailMessageContentService;
  49. @Qualifier(TaskPoolConfig.SYNC_MAILBOX_EXECUTOR)
  50. @Autowired
  51. private Executor syncMailboxExecutor;
  52. @Qualifier(TaskPoolConfig.EXECUTOR)
  53. @Autowired
  54. private Executor syncExecutor;
  55. List<String> syncEmailMessageList = Collections.synchronizedList(new ArrayList<>(100));
  56. List<String> syncMessageAttachmentList = Collections.synchronizedList(new ArrayList<>(100));
  57. /**
  58. * 每页获取多少封邮件
  59. */
  60. private static final int SIZE = 50;
  61. @Override
  62. public void setServletContext(ServletContext servletContext) {
  63. syncExecutor.execute(this::syncMessageContent);
  64. syncExecutor.execute(this::syncMessageAttachment);
  65. }
  66. /**
  67. * 同步正文
  68. */
  69. private void syncMessageContent() {
  70. List<String> collect = emailMessageService.list(Wrappers.<EmailMessage>lambdaQuery()
  71. .select(EmailMessage::getEmail, EmailMessage::getMessageId)
  72. .eq(EmailMessage::getContentSync, false)
  73. .orderByDesc(EmailMessage::getFromDate).last("limit 50"))
  74. .stream()
  75. .map(item -> item.getEmail() + "," + item.getMessageId())
  76. .collect(Collectors.toList());
  77. for (String item : collect) {
  78. if (!syncEmailMessageList.contains(item)) {
  79. syncEmailMessageList.add(item);
  80. }
  81. }
  82. while (syncEmailMessageList.size() > 0) {
  83. String remove = null;
  84. try {
  85. synchronized (this) {
  86. log.info("下载正文");
  87. remove = syncEmailMessageList.remove(0);
  88. if (ObjectUtils.isEmpty(remove)) {
  89. continue;
  90. }
  91. String[] split = remove.split(",");
  92. String email = split[0];
  93. String messageId = split[1];
  94. // 正文内容
  95. MessageDetailVo messageDetail = EmailEngineUtil.getMessageDetail(email, messageId);
  96. MessageDetailVo.TextDTO text = messageDetail.getText();
  97. if (ObjectUtils.isNotEmpty(text)) {
  98. EmailMessageContent emailMessageContent = new EmailMessageContent();
  99. emailMessageContent.setMessageId(messageId);
  100. emailMessageContent.setHtmlContent(text.getHtml());
  101. emailMessageContentService.saveOrUpdate(emailMessageContent);
  102. EmailMessage emailMessage = new EmailMessage();
  103. emailMessage.setMessageId(messageId);
  104. emailMessage.setContentSync(true);
  105. emailMessageService.updateById(emailMessage);
  106. }
  107. log.info("下载正文成功");
  108. }
  109. } catch (Exception e) {
  110. log.error("同步正文发生异常:{}", remove, e);
  111. }
  112. }
  113. ThreadUtil.sleep(60 * 1000);
  114. this.syncMessageContent();
  115. }
  116. private void syncMessageAttachment() {
  117. List<String> collect = emailMessageAttachmentService.list(Wrappers.<EmailMessageAttachment>lambdaQuery()
  118. .select(EmailMessageAttachment::getAttachmentId, EmailMessageAttachment::getEmail, EmailMessageAttachment::getPath)
  119. .eq(EmailMessageAttachment::getIsDownload, false)
  120. .orderByDesc(EmailMessageAttachment::getCreateTime)
  121. .last("limit 50"))
  122. .stream()
  123. .map(item -> item.getAttachmentId() + "," + item.getEmail() + "," + item.getPath())
  124. .collect(Collectors.toList());
  125. for (String item : collect) {
  126. if (!syncMessageAttachmentList.contains(item)) {
  127. syncMessageAttachmentList.add(item);
  128. }
  129. }
  130. while (syncMessageAttachmentList.size() > 0) {
  131. String remove = null;
  132. try {
  133. synchronized (this) {
  134. log.info("下载附件");
  135. remove = syncMessageAttachmentList.remove(0);
  136. if (ObjectUtil.isEmpty(remove)) {
  137. continue;
  138. }
  139. String[] split = remove.split(",");
  140. String attachmentId = split[0];
  141. String email = split[1];
  142. String path = split[2];
  143. // 下载附件
  144. RetryUtil.execute(() -> EmailEngineUtil.downloadAttachment(email, attachmentId, path));
  145. EmailMessageAttachment attachment = new EmailMessageAttachment();
  146. attachment.setAttachmentId(attachmentId);
  147. attachment.setIsDownload(true);
  148. emailMessageAttachmentService.updateById(attachment);
  149. log.info("下载附件成功");
  150. }
  151. } catch (Exception e) {
  152. log.error("下载附件发生异常:{}", remove, e);
  153. }
  154. }
  155. ThreadUtil.sleep(60 * 1000);
  156. this.syncMessageAttachment();
  157. }
  158. @Transactional(rollbackFor = Exception.class)
  159. @Override
  160. public EmailInfo binding(BindingVo bindingVo) {
  161. String email = bindingVo.getEmail();
  162. EmailInfo emailInfo;
  163. try {
  164. emailInfo = emailInfoService.getOne(Wrappers.<EmailInfo>lambdaQuery().eq(EmailInfo::getEmail, email));
  165. // 如果存在,直接返回邮箱信息
  166. if (emailInfo != null) {
  167. return emailInfo;
  168. }
  169. // 保存账号信息到数据库
  170. emailInfo = this.saveEmailInfo(bindingVo);
  171. // 添加账号
  172. EmailEngineUtil.createAccount(bindingVo);
  173. // redis添加同步进度
  174. this.progressInitialization(bindingVo);
  175. } catch (Exception e) {
  176. EmailEngineUtil.deleteAccount(bindingVo.getEmail());
  177. throw e;
  178. }
  179. // 异步遍历文件夹下的所有邮件
  180. this.asyncReadEmail(emailInfo.getId(), bindingVo, 0);
  181. return emailInfo;
  182. }
  183. @Transactional(rollbackFor = Exception.class)
  184. @Override
  185. public void listener(ListenerVo listenerVo) {
  186. log.info("监听到新消息:{}", JSON.toJSONString(listenerVo));
  187. switch (SendEventEnum.get(listenerVo.getEvent())) {
  188. case MESSAGE_NEW:
  189. this.handleMessageNewEvent(listenerVo);
  190. break;
  191. case MESSAGE_DELETED:
  192. this.handleMessageDeletedEvent(listenerVo);
  193. break;
  194. case MESSAGE_UPDATED:
  195. this.handleMessageUpdatedEvent(listenerVo);
  196. break;
  197. default:
  198. log.error("监听到未知事件:{}", JSONObject.toJSONString(listenerVo));
  199. }
  200. }
  201. @Override
  202. public void submit(SubmitVo submitVo) {
  203. EmailEngineUtil.submit(submitVo);
  204. }
  205. /**
  206. * 添加邮件信息
  207. */
  208. private EmailInfo saveEmailInfo(BindingVo bindingVo) {
  209. EmailInfo emailInfo = BeanUtil.copyProperties(bindingVo, EmailInfo.class);
  210. emailInfoService.save(emailInfo);
  211. return emailInfo;
  212. }
  213. /**
  214. * 同步邮件进度初始化
  215. */
  216. private void progressInitialization(BindingVo bindingVo) {
  217. String email = bindingVo.getEmail();
  218. int pages = bindingVo.getPages();
  219. MessageVo messageVo = EmailEngineUtil.getMessageList(email, bindingVo.getPath(), 0, 1);
  220. ProgressVo progressVo = new ProgressVo();
  221. progressVo.setEmail(email);
  222. progressVo.setTotalMessageCount(messageVo.getTotal() > pages * SIZE ? pages * SIZE : messageVo.getTotal());
  223. progressVo.setCompleteMessageCount(0);
  224. progressVo.setPercentage(0);
  225. RedisCache.set(RedisConstant.PROGRESS_KEY + email, progressVo);
  226. }
  227. /**
  228. * 异步读取文件
  229. */
  230. private void asyncReadEmail(Long emailInfoId, BindingVo bindingVo, Integer pageFlag) {
  231. String email = bindingVo.getEmail();
  232. String mailbox = bindingVo.getPath();
  233. int pages = bindingVo.getPages();
  234. syncMailboxExecutor.execute(() -> {
  235. int page = pageFlag;
  236. try {
  237. while (page < pages) {
  238. // 分页获取文件夹邮件
  239. MessageVo result = EmailEngineUtil.getMessageList(email, mailbox, page, SIZE);
  240. // 邮件信息
  241. List<MessageVo.MessagesDTO> messagesDTOList = result.getMessages();
  242. // 批量保存邮件信息
  243. this.saveBatchMessage(emailInfoId, email, messagesDTOList);
  244. // 更新同步邮件进度
  245. this.synchronizationProgress(email, messagesDTOList);
  246. page++;
  247. }
  248. } catch (EmailEngineException e) {
  249. log.error("下载失败,等待10分钟");
  250. ThreadUtil.sleep(1000 * 60 * 10);
  251. asyncReadEmail(emailInfoId, bindingVo, page);
  252. } catch (Exception e) {
  253. log.error("未知原因同步邮件失败:{}", email, e);
  254. }
  255. });
  256. String redisKey = RedisConstant.PROGRESS_KEY + email;
  257. ProgressVo progressVo = RedisCache.get(redisKey);
  258. progressVo.setPercentage(100);
  259. RedisCache.set(redisKey, progressVo);
  260. }
  261. /**
  262. * 批量保存邮件信息
  263. */
  264. private void saveBatchMessage(Long emailInfoId, String email, List<MessageVo.MessagesDTO> messagesDTOList) {
  265. List<EmailMessage> emailMessageList = new ArrayList<>();
  266. List<EmailMessageAttachment> emailMessageAttachmentList = new ArrayList<>();
  267. for (MessageVo.MessagesDTO messagesDTO : messagesDTOList) {
  268. EmailMessage message = this.createMessage(emailInfoId, email, messagesDTO);
  269. emailMessageList.add(message);
  270. List<MessageVo.MessagesDTO.AttachmentsDTO> attachments = messagesDTO.getAttachments();
  271. if (attachments != null && attachments.size() > 0) {
  272. for (MessageVo.MessagesDTO.AttachmentsDTO attachment : attachments) {
  273. emailMessageAttachmentList.add(this.createMessageAttachment(message, attachment));
  274. }
  275. }
  276. }
  277. // 保存邮件
  278. emailMessageService.saveOrUpdateBatch(emailMessageList);
  279. // 保存邮箱附件
  280. emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList);
  281. }
  282. /**
  283. * 生成邮件实体
  284. */
  285. private EmailMessage createMessage(Long emailInfoId, String email, MessageVo.MessagesDTO messagesDTO) {
  286. EmailMessage emailMessage = new EmailMessage();
  287. emailMessage.setMessageId(messagesDTO.getId());
  288. emailMessage.setEmailInfoId(emailInfoId);
  289. emailMessage.setEmail(email);
  290. emailMessage.setUnseen(messagesDTO.getUnseen());
  291. emailMessage.setFlagged(messagesDTO.getFlagged());
  292. emailMessage.setSubject(messagesDTO.getSubject());
  293. emailMessage.setFromDate(messagesDTO.getDate());
  294. emailMessage.setContentSync(false);
  295. // 发件人
  296. MessageVo.MessagesDTO.ToDTO from = messagesDTO.getFrom();
  297. if (ObjectUtils.isNotEmpty(from)) {
  298. emailMessage.setFromName(from.getName());
  299. emailMessage.setFromAddress(from.getAddress());
  300. }
  301. return emailMessage;
  302. }
  303. /**
  304. * 生成附件实体
  305. */
  306. private EmailMessageAttachment createMessageAttachment(EmailMessage message, MessageVo.MessagesDTO.AttachmentsDTO attachment) {
  307. String newFileName = attachment.getId() + "." + FileUtil.getSuffix(attachment.getFilename());
  308. EmailMessageAttachment emailMessageAttachment = new EmailMessageAttachment();
  309. emailMessageAttachment.setAttachmentId(attachment.getId());
  310. emailMessageAttachment.setMessageId(message.getMessageId());
  311. emailMessageAttachment.setEmail(message.getEmail());
  312. emailMessageAttachment.setName(attachment.getFilename());
  313. emailMessageAttachment.setSize(attachment.getEncodedSize());
  314. emailMessageAttachment.setPath(message.getEmail() + "\\" + newFileName);
  315. emailMessageAttachment.setIsDownload(false);
  316. return emailMessageAttachment;
  317. }
  318. /**
  319. * 同步邮件进度
  320. */
  321. private void synchronizationProgress(String email, List<MessageVo.MessagesDTO> messagesDTOList) {
  322. String redisKey = RedisConstant.PROGRESS_KEY + email;
  323. ProgressVo progressVo = RedisCache.get(redisKey);
  324. int completeMessageCount = progressVo.getCompleteMessageCount() + messagesDTOList.size();
  325. progressVo.setCompleteMessageCount(completeMessageCount);
  326. progressVo.setPercentage(100 * completeMessageCount / progressVo.getTotalMessageCount());
  327. RedisCache.set(redisKey, progressVo);
  328. }
  329. /**
  330. * 处理收到新邮件事件
  331. */
  332. private void handleMessageNewEvent(ListenerVo listenerVo) {
  333. // 查询邮箱
  334. String account = listenerVo.getAccount();
  335. EmailInfo emailInfo;
  336. try {
  337. emailInfo = emailInfoService.getOne(Wrappers.<EmailInfo>lambdaQuery().eq(EmailInfo::getEmail, account));
  338. Assert.notNull(emailInfo, "未找到邮箱信息");
  339. } catch (Exception e) {
  340. log.error("查找邮箱失败: {}", account, e);
  341. return;
  342. }
  343. // 获取邮件详情
  344. MessageVo.MessagesDTO messagesDTO = listenerVo.getData().toJavaObject(MessageVo.MessagesDTO.class);
  345. // 保存邮件
  346. EmailMessage emailMessage = this.createMessage(emailInfo.getId(), emailInfo.getEmail(), messagesDTO);
  347. emailMessageService.saveOrUpdate(emailMessage);
  348. // 保存邮箱附件
  349. List<EmailMessageAttachment> emailMessageAttachmentList;
  350. List<MessageVo.MessagesDTO.AttachmentsDTO> attachments = messagesDTO.getAttachments();
  351. if (attachments != null && attachments.size() > 0) {
  352. emailMessageAttachmentList = attachments.stream()
  353. .map(attachment -> this.createMessageAttachment(emailMessage, attachment))
  354. .collect(Collectors.toList());
  355. emailMessageAttachmentService.saveOrUpdateBatch(emailMessageAttachmentList);
  356. for (EmailMessageAttachment attachment : emailMessageAttachmentList) {
  357. String value = attachment.getAttachmentId() + "," + attachment.getEmail() + "," + attachment.getPath();
  358. syncMessageAttachmentList.add(0, value);
  359. }
  360. }
  361. syncEmailMessageList.add(0, emailMessage.getEmail() + "," + emailMessage.getMessageId());
  362. }
  363. /**
  364. * 处理删除邮件事件
  365. */
  366. private void handleMessageDeletedEvent(ListenerVo listenerVo) {
  367. MessageDeletedEventVo vo = listenerVo.getData().toJavaObject(MessageDeletedEventVo.class);
  368. emailMessageService.remove(Wrappers.<EmailMessage>lambdaQuery().eq(EmailMessage::getMessageId, vo.getId()));
  369. }
  370. /**
  371. * 邮件flag被改变事件
  372. */
  373. private void handleMessageUpdatedEvent(ListenerVo listenerVo) {
  374. log.error("邮件flag改变:{}", JSON.toJSONString(listenerVo));
  375. }
  376. }