ConsumerServiceERP.java 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package com.fjhx.rocketmq.consumer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.client.exception.MQClientException;
  8. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  9. import org.apache.rocketmq.common.message.Message;
  10. import org.apache.rocketmq.remoting.common.RemotingHelper;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import java.util.List;
  14. /**
  15. * @author caozhoujun
  16. * @version 1.2
  17. * @date 2020/6/22
  18. */
  19. @Component
  20. public class ConsumerServiceERP {
  21. private DefaultMQPushConsumer consumer;
  22. private String consumerGroup = "rocketmq_test_group_official_erp";
  23. public ConsumerServiceERP() throws MQClientException {
  24. consumer = new DefaultMQPushConsumer(consumerGroup);
  25. consumer.setNamesrvAddr("124.70.202.81:9876;124.70.177.208:9876;124.70.207.0:9876;124.70.188.19:9876");
  26. // 设置消费地点,从最后一个进行消费(其实就是消费策略)
  27. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  28. // 订阅主题的哪些标签
  29. consumer.subscribe("rocketmq_erp", "erp");
  30. // 注册监听器
  31. consumer.registerMessageListener((MessageListenerConcurrently)
  32. (msgs, context) -> {
  33. try {
  34. // 获取Message
  35. Message msg = msgs.get(0);
  36. // 标签
  37. String str = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
  38. System.out.println("消费erp数据" + str);
  39. JSONObject jsonObject = JSONObject.parseObject(str);
  40. String whodoesp = jsonObject.get("whodoesp").toString();
  41. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  45. }
  46. });
  47. consumer.start();
  48. }
  49. }