package com.fjhx.rocketmq.consumer; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; /** * @author caozhoujun * @version 1.2 * @date 2020/6/22 */ @Component public class ConsumerServiceERP { private DefaultMQPushConsumer consumer; private String consumerGroup = "rocketmq_test_group_official_erp"; public ConsumerServiceERP() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr("124.70.202.81:9876;124.70.177.208:9876;124.70.207.0:9876;124.70.188.19:9876"); // 设置消费地点,从最后一个进行消费(其实就是消费策略) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 订阅主题的哪些标签 consumer.subscribe("rocketmq_erp", "erp"); // 注册监听器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { // 获取Message Message msg = msgs.get(0); // 标签 String str = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("消费erp数据" + str); JSONObject jsonObject = JSONObject.parseObject(str); String whodoesp = jsonObject.get("whodoesp").toString(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }