caozj před 2 roky
rodič
revize
9e07bc9a93

+ 5 - 0
hx-saas-project/saas-rocketmq/pom.xml

@@ -24,6 +24,11 @@
             <groupId>org.springblade</groupId>
             <artifactId>blade-starter-excel</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>4.9.0</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 85 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/RocketMQConfiguration.java

@@ -0,0 +1,85 @@
+package com.fjhx.rocketmq.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2022/7/15
+ */
+@Configuration
+@EnableConfigurationProperties(RocketMQProperties.class)
+@Slf4j
+public class RocketMQConfiguration {
+
+    @Autowired
+    private RocketMQProperties rocketMQProperties;
+
+    //事件监听
+    @Autowired
+    private ApplicationEventPublisher publisher = null;
+
+    /**
+     * 容器初始化的时候 打印参数
+     */
+    @PostConstruct
+    public void init() {
+        System.out.println("--------Rocketmq初始化---------");
+        System.err.println(rocketMQProperties.getNamesrvAddr());
+        System.err.println(rocketMQProperties.getProducerGroupName());
+        System.err.println(rocketMQProperties.getProducerInstanceName());
+        System.err.println(rocketMQProperties.getProducerTranInstanceName());
+    }
+
+    /**
+     * 创建普通消息发送者实例
+     * @return
+     * @throws MQClientException
+     */
+    @Bean
+    public DefaultMQProducer defaultProducer() throws MQClientException {
+        DefaultMQProducer producer = new DefaultMQProducer(
+                rocketMQProperties.getProducerGroupName());
+        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
+        producer.setInstanceName(rocketMQProperties.getProducerInstanceName());
+        producer.setVipChannelEnabled(false);
+        producer.setRetryTimesWhenSendAsyncFailed(10);
+        producer.start();
+        log.info("rocketmq producer server is starting....");
+        return producer;
+    }
+
+    /**
+     * 创建支持消息事务发送的实例
+     * @return
+     * @throws MQClientException
+     */
+    @Bean
+    public TransactionMQProducer transactionProducer() throws MQClientException {
+        TransactionMQProducer producer = new TransactionMQProducer(
+                rocketMQProperties.getTransactionProducerGroupName());
+        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
+        producer.setInstanceName(rocketMQProperties
+                .getProducerTranInstanceName());
+        producer.setRetryTimesWhenSendAsyncFailed(10);
+        // 事务回查最小并发数
+        producer.setCheckThreadPoolMinSize(2);
+        // 事务回查最大并发数
+        producer.setCheckThreadPoolMaxSize(2);
+        // 队列数
+        producer.setCheckRequestHoldMax(2000);
+        producer.start();
+        log.info("rocketmq transaction producer server is starting....");
+        return producer;
+    }
+}

+ 33 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/RocketMQProperties.java

@@ -0,0 +1,33 @@
+package com.fjhx.rocketmq.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2020/5/19
+ */
+@PropertySource("classpath:config/rocketmq.properties")
+@ConfigurationProperties(prefix = "hx.rocketmq")
+@Configuration
+@Setter
+@Getter
+@ToString
+@Accessors(chain = true)
+public class RocketMQProperties {
+    private String namesrvAddr;
+    private String producerGroupName;
+    private String transactionProducerGroupName;
+    private String producerInstanceName;
+    private String producerTranInstanceName;
+    private List<String> subscribe = new ArrayList<String>();
+}

+ 63 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/TransactionListener.java

@@ -0,0 +1,63 @@
+package com.fjhx.rocketmq.config;
+
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2020/6/1
+ */
+public class TransactionListener implements org.apache.rocketmq.client.producer.TransactionListener {
+
+    private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
+    /**
+     * 执行本地事务
+     * @param message
+     * @param o
+     * @return
+     */
+    @Override
+    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
+        String id = message.getTransactionId();
+        //0:未知状态 1:执行成功  2:执行失败
+        map.put(id,0);
+        try {
+            System.out.println("正在执行本地事务");
+            Thread.sleep(5000);
+            map.put(id,1);
+            System.out.println("执行本地事务成功");
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            map.put(id,2);
+            System.out.println("执行本地事务失败");
+            return LocalTransactionState.ROLLBACK_MESSAGE;
+        }
+        System.out.println(id);
+        return LocalTransactionState.COMMIT_MESSAGE;
+    }
+
+    /**
+     * 执行消息回查
+     * @param messageExt
+     * @return
+     */
+    @Override
+    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
+        System.out.println("执行消息回查");
+        String id = messageExt.getTransactionId();
+        Integer result = map.get(id);
+        switch (result){
+            case 0:
+                return LocalTransactionState.UNKNOW;
+            case 1:
+                return LocalTransactionState.COMMIT_MESSAGE;
+            case 2:
+                return LocalTransactionState.ROLLBACK_MESSAGE;
+        }
+        return LocalTransactionState.UNKNOW;
+    }
+}

+ 139 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/consumer/ConsumerServiceERP.java

@@ -0,0 +1,139 @@
+package com.fjhx.rocketmq.consumer;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.quake.quake_qinxin.core.util.*;
+import com.quake.quake_qinxin.model.ESPContent;
+import com.quake.quake_qinxin.model.ESPList;
+import com.quake.quake_qinxin.model.SqlServerParam;
+import com.quake.quake_qinxin.service.IRocketmqMessageService;
+import net.sf.json.JSONArray;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2020/6/22
+ */
+@Component
+public class ConsumerServiceERP {
+
+    @Autowired
+    private IRocketmqMessageService service;
+
+    private DefaultMQPushConsumer consumer;
+
+    private String consumerGroup = "rocketmq_test_group_official_erp";
+
+
+
+    public ConsumerServiceERP() throws MQClientException {
+        consumer = new DefaultMQPushConsumer(consumerGroup);
+        consumer.setNamesrvAddr("124.70.202.81:9876;124.70.177.208:9876;124.70.207.0:9876;124.70.188.19:9876");
+        // 设置消费地点,从最后一个进行消费(其实就是消费策略)
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        // 订阅主题的哪些标签
+        consumer.subscribe("rocketmq_erp", "erp");
+        // 注册监听器
+        consumer.registerMessageListener((MessageListenerConcurrently)
+                (msgs, context) -> {
+                    try {
+                        // 获取Message
+                        Message msg = msgs.get(0);
+                        // 标签
+                        String str = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
+                        System.out.println("消费erp数据" + str);
+                        JSONObject jsonObject = JSONObject.parseObject(str);
+                        String whodoesp = jsonObject.get("whodoesp").toString();
+                        String receiveGroup = jsonObject.get("receiveGroup").toString();
+                        JSONArray contentArray = JSONArray.fromObject(jsonObject.get("content"));
+                        JSONArray listArray = JSONArray.fromObject(jsonObject.get("doList"));
+                        ESPContent content = new ESPContent();
+                        String uuid = UUIDUtil.getNum19();
+                        for (int i = 0; i < listArray.size(); i++) {
+                            List<List<ESPContent>> lists = StringUtil.groupList(contentArray);
+                            System.out.println(lists);
+                            ESPList espList = JSON.parseObject(listArray.get(i).toString(), ESPList.class);
+                            for(List<ESPContent> espContents:lists){
+                                System.out.println(getType(espContents));
+                                try {
+                                    WebsocketUtil.webSocketSendMessageByUsername(espList.getEspId(),StringUtil.toErsStr(whodoesp+"&"+uuid,espList, content, "start+"+receiveGroup));
+                                    //WebsocketUtil.webSocketSendMessage(jsonObject.get("group").toString(), StringUtil.toErsStr(whodoesp+"&"+uuid,espList, content, "start+"+receiveGroup));
+                                }catch (Exception e){
+                                    System.out.println("websocket服务出现错误");
+                                    EmailUtil.sendEmail("1164636026@qq.com","websocket服务出现错误,异常信息为:"+e);
+                                    EmailUtil.sendEmail("504178754@qq.com","websocket服务出现错误,异常信息为:"+e);
+                                }
+                                for (int j = 0; j < espContents.size(); j++) {
+                                    net.sf.json.JSONObject sfObject =  net.sf.json.JSONObject.fromObject(espContents.get(j));
+                                    ESPContent espContent = (ESPContent) net.sf.json.JSONObject.toBean(sfObject, ESPContent.class);
+                                    String param = StringUtil.toErsStr(whodoesp+"&"+uuid,espList, espContent, "");
+                                    String result = WebsocketUtil.webSocketSendMessageByUsername(espList.getEspId(),param);
+                                    //String result =  WebsocketUtil.webSocketSendMessage(jsonObject.get("group").toString(), param);
+                                    System.out.println("websocket监听发送成功与否:"+result+param);
+                                    SqlServerParam p = SqlServerUtil.sqlServerParamStart(espContent,whodoesp,espList,uuid);
+                                    try {
+                                        SqlServerUtil.sqlServerSaveLog(JSON.toJSONString(p));
+                                    }catch (Exception e){
+                                        System.out.println("sqlServer服务出现错误");
+                                        EmailUtil.sendEmail("1164636026@qq.com","sqlServer服务出现错误,异常信息为:"+e);
+                                        EmailUtil.sendEmail("504178754@qq.com","sqlServer服务出现错误,异常信息为:"+e);
+                                    }
+                                    Thread.sleep(150);
+                                }
+                                WebsocketUtil.webSocketSendMessageByUsername(espList.getEspId(), StringUtil.toErsStr(whodoesp+"&"+uuid,espList, content, "end"));
+                                //WebsocketUtil.webSocketSendMessage(jsonObject.get("group").toString(), StringUtil.toErsStr(whodoesp+"&"+uuid,espList, content, "end"));
+                                Thread.sleep(200);
+                            }
+//                            for (int j = 0; j < contentArray.size(); j++) {
+//                                ESPContent espContent = JSON.parseObject(contentArray.get(j).toString(), ESPContent.class);
+//                                String param = StringUtil.toErsStr(whodoesp+"&"+uuid,espList, espContent, "");
+//                                String result =  WebsocketUtil.webSocketSendMessage(jsonObject.get("group").toString(), param);
+//                                System.out.println("websocket监听发送成功与否:"+result);
+//                                SqlServerParam p = SqlServerUtil.sqlServerParamStart(espContent,whodoesp,espList,uuid);
+//                                try {
+//                                    SqlServerUtil.sqlServerSaveLog(JSON.toJSONString(p));
+//                                }catch (Exception e){
+//                                    System.out.println("sqlServer服务出现错误");
+//                                    EmailUtil.sendEmail("1164636026@qq.com","sqlServer服务出现错误,异常信息为:"+e);
+//                                    EmailUtil.sendEmail("504178754@qq.com","sqlServer服务出现错误,异常信息为:"+e);
+//                                }
+//                                Thread.sleep(150);
+//                            }
+//                            WebsocketUtil.webSocketSendMessage(jsonObject.get("group").toString(), StringUtil.toErsStr(whodoesp+"&"+uuid,espList, content, "end"));
+                        }
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    }
+
+                });
+        consumer.start();
+    }
+
+    /**
+     * 获取数据类型
+     * @param object
+     * @return
+     */
+    public static String getType(Object object){
+        String typeName=object.getClass().getName();
+        int length= typeName.lastIndexOf(".");
+        String type =typeName.substring(length+1);
+        return type;
+    }
+    public static void main(String[] args) {
+        int a = 0;
+        System.out.println(getType(a));
+    }
+}

+ 183 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/producer/ProducerController.java

@@ -0,0 +1,183 @@
+package com.fjhx.rocketmq.producer;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fjhx.rocketmq.config.RocketMQProperties;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2020/5/19
+ */
+@RestController
+public class ProducerController {
+    @Autowired
+    private DefaultMQProducer defaultProducer;
+
+    @Autowired
+    private RocketMQProperties rocketMQProperties;
+
+    private static final String TOPIC="rocketmq_test";
+    private static final String TAG="rocketmq_official";
+
+//    /**
+//     * 同步发送消息
+//     */
+//    @PostMapping("/sendSyncMessage")
+//    private  HttpResult sync(@RequestBody JSONObject map) throws Exception {
+//        if(!map.get("topic").toString().contains("rocketmq_erp")){
+//            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
+//                return HttpResult.ok(1401,"消息格式有误","fail");
+//            }
+//        }
+//        try {
+//            //创建消息
+//            Message msg = new Message();
+//            msg.setTopic(map.get("topic").toString());
+//            msg.setTags(map.get("tag").toString());
+//            msg.setKeys(String.valueOf(UUIDUtil.getUUID()));
+//            msg.setBody(JSONObject.toJSONBytes(map.get("msg")));
+//            if(map.containsKey("level")){
+//                msg.setDelayTimeLevel(Integer.parseInt(map.get("level").toString()));
+//            }
+//            //同步发送消息
+//            SendResult result = defaultProducer.send(msg);
+//            if(result.toString().contains("SEND_OK")){
+//                return HttpResult.ok(1500,"success","发送成功",result);
+//            }
+//        }catch (Exception e){
+//            EmailUtil.sendEmail("1164636026@qq.com","rq服务出现错误,异常信息为:"+e);
+//            EmailUtil.sendEmail("504178754@qq.com","rq服务出现错误,异常信息为:"+e);
+//            e.printStackTrace();
+//            Thread.sleep(1000);
+//        }
+//      return HttpResult.error(1401,"fail","发送失败");
+//    }
+//    /**
+//     * 异步发送消息
+//     */
+//
+//    @PostMapping("/sendAsyncMessage")
+//    private  HttpResult async(@RequestBody JSONObject map) throws Exception {
+//        if(!map.get("topic").toString().contains("rocketmq_erp")){
+//            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
+//                return HttpResult.ok(1401,"消息格式有误","fail");
+//            }
+//        }
+//        //创建消息
+//        Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
+//        //异步发送消息
+//        defaultProducer.send(message, new SendCallback() {
+//            @Override
+//            public void onSuccess(SendResult sendResult) {
+//                System.out.println("异步发送消息:"+sendResult);
+//            }
+//            @Override
+//            public void onException(Throwable e) {
+//                e.printStackTrace();
+//                //补偿机制,根据业务情况进行使用,看是否进行重试
+//                try {
+//                    defaultProducer.setRetryTimesWhenSendFailed(5);
+//                    defaultProducer.send(message,5000L);
+//                } catch (Exception ex) {
+//                    ex.printStackTrace();
+//                }
+//            }
+//        });
+//        return HttpResult.ok(1500,"success","成功");
+//    }
+//    /**
+//     * 单项发送消息
+//     */
+//
+//    @PostMapping("/sendOneWayMessage")
+//    private  HttpResult oneWay(@RequestBody JSONObject map) throws Exception {
+//        if(!map.get("topic").toString().contains("rocketmq_erp")){
+//            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
+//                return HttpResult.ok(1401,"消息格式有误","fail");
+//            }
+//        }
+//        //创建消息
+//        Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
+//        //同步发送消息
+//        defaultProducer.sendOneway(message);
+//        return HttpResult.ok(1500,"success","成功");
+//    }
+//    /**
+//     * 发送事务消息,暂时没有优化
+//     * @return
+//     */
+//
+//    @PostMapping("/sendTransactionMess")
+//    public HttpResult sendTransactionMsg(@RequestBody JSONObject map) throws MQClientException {
+//        if(!map.get("topic").toString().contains("rocketmq_erp")){
+//            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
+//                return HttpResult.ok(1401,"消息格式有误","fail");
+//            }
+//        }
+//        TransactionMQProducer producer = new TransactionMQProducer(rocketMQProperties.getTransactionProducerGroupName());
+//        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
+//        TransactionListener transactionListener = new com.quake.quake_qinxin.rocketmq.config.TransactionListener();
+//        producer.setTransactionListener(transactionListener);
+//        producer.start();
+//        ExecutorService executorService = new ThreadPoolExecutor(
+//                2,
+//                5,
+//                100,
+//                TimeUnit.SECONDS,
+//                new ArrayBlockingQueue<Runnable>(
+//                        2000),
+//                        new ThreadFactory(){
+//                            @Override
+//                            public Thread newThread(Runnable runnable){
+//                                Thread thread = new Thread();
+//                                thread.setName("cliend-transaction-msg-check-thread");
+//                                return thread;
+//                            }
+//                        }
+//                );
+//        producer.setExecutorService(executorService);
+//        Message message = new Message(map.get("tag").toString(),map.get("topic").toString(),String.valueOf(UUIDUtil.getUUID()),JSONObject.toJSONBytes(map.get("msg")));
+//        SendResult result = producer.sendMessageInTransaction(message,"hello!");
+//        producer.shutdown();
+//        return HttpResult.ok(1500,"success","成功",result);
+//    }
+//
+//    /**
+//     * 支持顺序发送消息
+//     */
+//
+//    @PostMapping("/sendMessOrder")
+//    public HttpResult sendMsgOrder(@RequestBody JSONObject map) {
+//        SendResult result=null;
+//        if(!map.get("topic").toString().contains("rocketmq_erp")){
+//            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
+//                return HttpResult.ok(1401,"消息格式有误","fail");
+//            }
+//        }
+//        Message message = new Message( map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUIDUtil.getUUID()), JSONObject.toJSONBytes(map.get("msg")));
+//        try{
+//             result = defaultProducer.send(message, new MessageQueueSelector() {
+//                @Override
+//                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+//                    int index = ((Integer) arg) % mqs.size();
+//                    return mqs.get(index);
+//                }
+//            },1);
+//        }
+//        catch (Exception e){
+//            e.printStackTrace();
+//        }
+//        return HttpResult.ok(1500,"success","成功",result);
+//    }
+}

+ 10 - 0
hx-saas-project/saas-rocketmq/src/main/resources/config/rocketmq.properties

@@ -0,0 +1,10 @@
+
+hx.rocketmq.namesrvAddr=124.70.202.81:9876;124.70.177.208:9876;124.70.207.0:9876;124.70.188.19:9876
+
+hx.rocketmq.producerGroupName=user_group_p
+
+hx.rocketmq.transactionProducerGroupName=order_transaction
+
+hx.rocketmq.producerInstanceName=user_producer_instance
+
+hx.rocketmq.producerTranInstanceName=user_producer_transacition