|
@@ -1,12 +1,12 @@
|
|
|
package com.fjhx.rocketmq.service.Impl;
|
|
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.fjhx.message.MessageNotice;
|
|
|
-import com.fjhx.myapp.application.entity.enums.MsgSourceEnum;
|
|
|
-import com.fjhx.rocketmq.Message;
|
|
|
+import com.fjhx.myapp.application.entity.Message;
|
|
|
+import com.fjhx.myapp.application.entity.MessageNotice;
|
|
|
+import com.fjhx.myapp.application.enums.MsgSourceEnum;
|
|
|
import com.fjhx.rocketmq.message.service.StockMessageService;
|
|
|
import com.fjhx.rocketmq.service.RocketMqService;
|
|
|
-import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.rocketmq.client.producer.SendCallback;
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
|
@@ -35,44 +35,49 @@ public class RocketMqServiceImpl implements RocketMqService {
|
|
|
|
|
|
@Autowired
|
|
|
private StockMessageService stockMessageService;
|
|
|
+
|
|
|
/**
|
|
|
* 普通消息
|
|
|
+ *
|
|
|
* @param msg
|
|
|
*/
|
|
|
@Override
|
|
|
public void send(Message msg) {
|
|
|
- System.err.println("send发送消息:"+msg);
|
|
|
+ System.err.println("send发送消息:" + msg);
|
|
|
rocketMQTemplate.send(msg.getTopic() + ":" + msg.getTag(),
|
|
|
MessageBuilder.withPayload(msg.getContent()).build());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 异步消息
|
|
|
+ *
|
|
|
* @param msg
|
|
|
*/
|
|
|
@Override
|
|
|
public void asyncSend(Message msg) {
|
|
|
- System.err.println("asyncSend发送消息:"+msg);
|
|
|
+ System.err.println("asyncSend发送消息:" + msg);
|
|
|
rocketMQTemplate.asyncSend(msg.getTopic() + ":" + msg.getTag(), msg.getContent(),
|
|
|
new SendCallback() {
|
|
|
@Override
|
|
|
public void onSuccess(SendResult sendResult) {
|
|
|
- System.err.println("事物消息发送成功:"+sendResult.getTransactionId());
|
|
|
+ System.err.println("事物消息发送成功:" + sendResult.getTransactionId());
|
|
|
}
|
|
|
+
|
|
|
@Override
|
|
|
public void onException(Throwable throwable) {
|
|
|
- System.err.println("mqMsg={"+msg+"}消息发送失败:");
|
|
|
+ System.err.println("mqMsg={" + msg + "}消息发送失败:");
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 单项消息
|
|
|
+ *
|
|
|
* @param msg
|
|
|
*/
|
|
|
@Override
|
|
|
public void syncSendOrderly(Message msg) {
|
|
|
- System.err.println("syncSendOrderly发送消息:"+msg);
|
|
|
+ System.err.println("syncSendOrderly发送消息:" + msg);
|
|
|
rocketMQTemplate.sendOneWay(msg.getTopic() + ":" + msg.getTag(), msg.getContent());
|
|
|
}
|
|
|
|
|
@@ -80,22 +85,23 @@ public class RocketMqServiceImpl implements RocketMqService {
|
|
|
* 同步消息
|
|
|
*/
|
|
|
public void syncSend(Message msg) {
|
|
|
- System.err.println("同步消息:"+msg);
|
|
|
- SendResult sendMessage = rocketMQTemplate.syncSend(msg.getTopic() + ":" + msg.getTag(), msg.getContent());
|
|
|
+ System.err.println("同步消息:" + msg);
|
|
|
+ SendResult sendMessage = rocketMQTemplate.syncSend(msg.getTopic() + ":" + msg.getTag(), msg.getContent());
|
|
|
System.out.println(sendMessage);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 通用发送消息
|
|
|
+ *
|
|
|
* @param busiId 业务id
|
|
|
* @param userIds 接受用户id集合
|
|
|
* @param content 内容
|
|
|
* @param msgType 消息类型
|
|
|
*/
|
|
|
@Override
|
|
|
- public void send(String topic,String tag,String key,String busiId, List<String> userIds, String content, Integer msgType) {
|
|
|
+ public void send(String topic, String tag, String key, String busiId, List<String> userIds, String content, Integer msgType) {
|
|
|
List<MessageNotice> notices = new ArrayList<>();
|
|
|
- if (CollectionUtils.isEmpty(userIds)) {
|
|
|
+ if (ObjectUtil.isEmpty(userIds)) {
|
|
|
return;
|
|
|
}
|
|
|
for (String userId : userIds) {
|