|
@@ -2,17 +2,15 @@ package com.fjhx.rocketmq.producer;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.fjhx.rocketmq.config.RocketMQProperties;
|
|
import com.fjhx.rocketmq.config.RocketMQProperties;
|
|
-import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
|
import org.apache.rocketmq.client.producer.*;
|
|
import org.apache.rocketmq.client.producer.*;
|
|
import org.apache.rocketmq.common.message.Message;
|
|
import org.apache.rocketmq.common.message.Message;
|
|
-import org.apache.rocketmq.common.message.MessageQueue;
|
|
|
|
|
|
+import org.springblade.core.tool.api.R;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.web.bind.annotation.PostMapping;
|
|
import org.springframework.web.bind.annotation.PostMapping;
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.concurrent.*;
|
|
|
|
|
|
+import java.util.UUID;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @author caozhoujun
|
|
* @author caozhoujun
|
|
@@ -30,154 +28,71 @@ public class ProducerController {
|
|
private static final String TOPIC="rocketmq_test";
|
|
private static final String TOPIC="rocketmq_test";
|
|
private static final String TAG="rocketmq_official";
|
|
private static final String TAG="rocketmq_official";
|
|
|
|
|
|
-// /**
|
|
|
|
-// * 同步发送消息
|
|
|
|
-// */
|
|
|
|
-// @PostMapping("/sendSyncMessage")
|
|
|
|
-// private HttpResult sync(@RequestBody JSONObject map) throws Exception {
|
|
|
|
-// if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
-// if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
-// return HttpResult.ok(1401,"消息格式有误","fail");
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// try {
|
|
|
|
-// //创建消息
|
|
|
|
-// Message msg = new Message();
|
|
|
|
-// msg.setTopic(map.get("topic").toString());
|
|
|
|
-// msg.setTags(map.get("tag").toString());
|
|
|
|
-// msg.setKeys(String.valueOf(UUIDUtil.getUUID()));
|
|
|
|
-// msg.setBody(JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
-// if(map.containsKey("level")){
|
|
|
|
-// msg.setDelayTimeLevel(Integer.parseInt(map.get("level").toString()));
|
|
|
|
-// }
|
|
|
|
-// //同步发送消息
|
|
|
|
-// SendResult result = defaultProducer.send(msg);
|
|
|
|
-// if(result.toString().contains("SEND_OK")){
|
|
|
|
-// return HttpResult.ok(1500,"success","发送成功",result);
|
|
|
|
-// }
|
|
|
|
-// }catch (Exception e){
|
|
|
|
-// EmailUtil.sendEmail("1164636026@qq.com","rq服务出现错误,异常信息为:"+e);
|
|
|
|
-// EmailUtil.sendEmail("504178754@qq.com","rq服务出现错误,异常信息为:"+e);
|
|
|
|
-// e.printStackTrace();
|
|
|
|
-// Thread.sleep(1000);
|
|
|
|
-// }
|
|
|
|
-// return HttpResult.error(1401,"fail","发送失败");
|
|
|
|
-// }
|
|
|
|
-// /**
|
|
|
|
-// * 异步发送消息
|
|
|
|
-// */
|
|
|
|
-//
|
|
|
|
-// @PostMapping("/sendAsyncMessage")
|
|
|
|
-// private HttpResult async(@RequestBody JSONObject map) throws Exception {
|
|
|
|
-// if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
-// if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
-// return HttpResult.ok(1401,"消息格式有误","fail");
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// //创建消息
|
|
|
|
-// Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
-// //异步发送消息
|
|
|
|
-// defaultProducer.send(message, new SendCallback() {
|
|
|
|
-// @Override
|
|
|
|
-// public void onSuccess(SendResult sendResult) {
|
|
|
|
-// System.out.println("异步发送消息:"+sendResult);
|
|
|
|
-// }
|
|
|
|
-// @Override
|
|
|
|
-// public void onException(Throwable e) {
|
|
|
|
-// e.printStackTrace();
|
|
|
|
-// //补偿机制,根据业务情况进行使用,看是否进行重试
|
|
|
|
-// try {
|
|
|
|
-// defaultProducer.setRetryTimesWhenSendFailed(5);
|
|
|
|
-// defaultProducer.send(message,5000L);
|
|
|
|
-// } catch (Exception ex) {
|
|
|
|
-// ex.printStackTrace();
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// });
|
|
|
|
-// return HttpResult.ok(1500,"success","成功");
|
|
|
|
-// }
|
|
|
|
-// /**
|
|
|
|
-// * 单项发送消息
|
|
|
|
-// */
|
|
|
|
-//
|
|
|
|
-// @PostMapping("/sendOneWayMessage")
|
|
|
|
-// private HttpResult oneWay(@RequestBody JSONObject map) throws Exception {
|
|
|
|
-// if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
-// if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
-// return HttpResult.ok(1401,"消息格式有误","fail");
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// //创建消息
|
|
|
|
-// Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
-// //同步发送消息
|
|
|
|
-// defaultProducer.sendOneway(message);
|
|
|
|
-// return HttpResult.ok(1500,"success","成功");
|
|
|
|
-// }
|
|
|
|
-// /**
|
|
|
|
-// * 发送事务消息,暂时没有优化
|
|
|
|
-// * @return
|
|
|
|
-// */
|
|
|
|
-//
|
|
|
|
-// @PostMapping("/sendTransactionMess")
|
|
|
|
-// public HttpResult sendTransactionMsg(@RequestBody JSONObject map) throws MQClientException {
|
|
|
|
-// if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
-// if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
-// return HttpResult.ok(1401,"消息格式有误","fail");
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// TransactionMQProducer producer = new TransactionMQProducer(rocketMQProperties.getTransactionProducerGroupName());
|
|
|
|
-// producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
|
|
|
|
-// TransactionListener transactionListener = new com.quake.quake_qinxin.rocketmq.config.TransactionListener();
|
|
|
|
-// producer.setTransactionListener(transactionListener);
|
|
|
|
-// producer.start();
|
|
|
|
-// ExecutorService executorService = new ThreadPoolExecutor(
|
|
|
|
-// 2,
|
|
|
|
-// 5,
|
|
|
|
-// 100,
|
|
|
|
-// TimeUnit.SECONDS,
|
|
|
|
-// new ArrayBlockingQueue<Runnable>(
|
|
|
|
-// 2000),
|
|
|
|
-// new ThreadFactory(){
|
|
|
|
-// @Override
|
|
|
|
-// public Thread newThread(Runnable runnable){
|
|
|
|
-// Thread thread = new Thread();
|
|
|
|
-// thread.setName("cliend-transaction-msg-check-thread");
|
|
|
|
-// return thread;
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// );
|
|
|
|
-// producer.setExecutorService(executorService);
|
|
|
|
-// Message message = new Message(map.get("tag").toString(),map.get("topic").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
-// SendResult result = producer.sendMessageInTransaction(message,"hello!");
|
|
|
|
-// producer.shutdown();
|
|
|
|
-// return HttpResult.ok(1500,"success","成功",result);
|
|
|
|
-// }
|
|
|
|
-//
|
|
|
|
-// /**
|
|
|
|
-// * 支持顺序发送消息
|
|
|
|
-// */
|
|
|
|
-//
|
|
|
|
-// @PostMapping("/sendMessOrder")
|
|
|
|
-// public HttpResult sendMsgOrder(@RequestBody JSONObject map) {
|
|
|
|
-// SendResult result=null;
|
|
|
|
-// if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
-// if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
-// return HttpResult.ok(1401,"消息格式有误","fail");
|
|
|
|
-// }
|
|
|
|
-// }
|
|
|
|
-// Message message = new Message( map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()), JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
-// try{
|
|
|
|
-// result = defaultProducer.send(message, new MessageQueueSelector() {
|
|
|
|
-// @Override
|
|
|
|
-// public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
|
|
|
-// int index = ((Integer) arg) % mqs.size();
|
|
|
|
-// return mqs.get(index);
|
|
|
|
-// }
|
|
|
|
-// },1);
|
|
|
|
-// }
|
|
|
|
-// catch (Exception e){
|
|
|
|
-// e.printStackTrace();
|
|
|
|
-// }
|
|
|
|
-// return HttpResult.ok(1500,"success","成功",result);
|
|
|
|
-// }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 同步发送消息
|
|
|
|
+ */
|
|
|
|
+ @PostMapping("/sendSyncMessage")
|
|
|
|
+ private R sync(@RequestBody JSONObject map) throws Exception {
|
|
|
|
+ Message msg = new Message();
|
|
|
|
+ msg.setTopic(map.get("topic").toString());
|
|
|
|
+ msg.setTags(map.get("tag").toString());
|
|
|
|
+ msg.setKeys(String.valueOf(UUID.randomUUID()));
|
|
|
|
+ msg.setBody(JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
+ //同步发送消息
|
|
|
|
+ SendResult result = defaultProducer.send(msg);
|
|
|
|
+ if(result.toString().contains("SEND_OK")){
|
|
|
|
+ return R.success();
|
|
|
|
+ }
|
|
|
|
+ return R.fail("发送失败");
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * 异步发送消息
|
|
|
|
+ */
|
|
|
|
+ @PostMapping("/sendAsyncMessage")
|
|
|
|
+ private R async(@RequestBody JSONObject map) throws Exception {
|
|
|
|
+ if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
+ if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
+ return R.fail("消息格式有误");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ //创建消息
|
|
|
|
+ Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUID.randomUUID()),JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
+ //异步发送消息
|
|
|
|
+ defaultProducer.send(message, new SendCallback() {
|
|
|
|
+ @Override
|
|
|
|
+ public void onSuccess(SendResult sendResult) {
|
|
|
|
+ System.out.println("异步发送消息:"+sendResult);
|
|
|
|
+ }
|
|
|
|
+ @Override
|
|
|
|
+ public void onException(Throwable e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ //补偿机制,根据业务情况进行使用,看是否进行重试
|
|
|
|
+ try {
|
|
|
|
+ defaultProducer.setRetryTimesWhenSendFailed(5);
|
|
|
|
+ defaultProducer.send(message,5000L);
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ ex.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ return R.success();
|
|
|
|
+ }
|
|
|
|
+ /**
|
|
|
|
+ * 单项发送消息
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ @PostMapping("/sendOneWayMessage")
|
|
|
|
+ private R oneWay(@RequestBody JSONObject map) throws Exception {
|
|
|
|
+ if(!map.get("topic").toString().contains("rocketmq_erp")){
|
|
|
|
+ if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
|
|
|
|
+ return R.fail("消息格式有误");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ //创建消息
|
|
|
|
+ Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUID.randomUUID()),JSONObject.toJSONBytes(map.get("msg")));
|
|
|
|
+ //同步发送消息
|
|
|
|
+ defaultProducer.sendOneway(message);
|
|
|
|
+ return R.success();
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|