123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package com.fjhx.rocketmq.consumer;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import java.util.List;
- /**
- * @author caozhoujun
- * @version 1.2
- * @date 2020/6/22
- */
- @Component
- public class ConsumerServiceERP {
- private DefaultMQPushConsumer consumer;
- private String consumerGroup = "rocketmq_test_group_official_erp";
- public ConsumerServiceERP() throws MQClientException {
- consumer = new DefaultMQPushConsumer(consumerGroup);
- consumer.setNamesrvAddr("124.70.202.81:9876;124.70.177.208:9876;124.70.207.0:9876;124.70.188.19:9876");
- // 设置消费地点,从最后一个进行消费(其实就是消费策略)
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- // 订阅主题的哪些标签
- consumer.subscribe("rocketmq_erp", "erp");
- // 注册监听器
- consumer.registerMessageListener((MessageListenerConcurrently)
- (msgs, context) -> {
- try {
- // 获取Message
- Message msg = msgs.get(0);
- // 标签
- String str = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
- System.out.println("消费erp数据" + str);
- JSONObject jsonObject = JSONObject.parseObject(str);
- String whodoesp = jsonObject.get("whodoesp").toString();
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- }
- }
|