Forráskód Böngészése

创建路由器、队列

home 2 éve
szülő
commit
36edaaab2f

+ 3 - 3
src/main/java/com/fjhx/rabbitmq/JsRabbitMQListener.java

@@ -6,6 +6,7 @@ import com.rabbitmq.client.Channel;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.retry.annotation.Backoff;
@@ -28,9 +29,8 @@ public class JsRabbitMQListener implements ChannelAwareMessageListener {
     @RabbitHandler
     @RabbitListener(
             queues = {
-                    "rfid1_to_rfid2_queue"
-            },
-            concurrency = "8"
+                    "instructions_queue"
+            }
     )
     @Retryable(value = {Exception.class}, backoff = @Backoff(delay = 3000, multiplier = 1))
     @Override

+ 4 - 2
src/main/java/com/fjhx/rabbitmq/RabbitMQProducer.java

@@ -58,13 +58,15 @@ public class RabbitMQProducer implements ConfirmCallback {
 
             // 封装消息
             Message message = MessageBuilder.withBody(msg.getBytes())
-                    .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8")
+                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
+                    .setContentEncoding("UTF-8")
                     .setMessageId(messageId).build();
 
             // 发送消息
             this.rabbitTemplate.setMandatory(true);
             this.rabbitTemplate.setConfirmCallback(this);
-            rabbitTemplate.convertAndSend(exchange, routingKey, message);
+            this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(messageId));
+
         } catch (AmqpException e) {
             e.printStackTrace();
         }

+ 31 - 14
src/main/java/com/fjhx/rabbitmq/config/RabbitMQConfig.java

@@ -18,19 +18,31 @@ import org.springframework.context.annotation.Configuration;
 @Configuration
 public class RabbitMQConfig {
 
-    @Value("${spring.rabbitmq.send.socket.queue}")
-    public String queue;
-
     @Value("${spring.rabbitmq.send.socket.exchange}")
     public String exchange;
 
     @Value("${spring.rabbitmq.send.socket.routingKey}")
     public String routingKey;
 
+    @Value("${spring.rabbitmq.send.socket.queue}")
+    public String queue;
+
+    @Value("${spring.rabbitmq.send.socket.instructionsRoutingKey}")
+    public String instructionsRoutingKey;
+
+    @Value("${spring.rabbitmq.send.socket.instructionsQueue}")
+    public String instructionsQueue;
+
+    /**
+     * rabbitMq里初始化exchange.
+     */
+    @Bean
+    public TopicExchange exchange() {
+        return new TopicExchange(exchange);
+    }
+
     /**
      * rabbitMq里初始化队列queue.
-     *
-     * @return
      */
     @Bean
     public Queue queue() {
@@ -38,22 +50,27 @@ public class RabbitMQConfig {
     }
 
     /**
-     * rabbitMq里初始化exchange.
-     *
-     * @return
+     * 绑定exchange & queue & routekey.
      */
     @Bean
-    public TopicExchange exchange() {
-        return new TopicExchange(exchange);
+    public Binding bindingExchange() {
+        return BindingBuilder.bind(queue()).to(exchange()).with(routingKey);
+    }
+
+    /**
+     * rabbitMq里初始化队列queue.
+     */
+    @Bean
+    public Queue instructionsQueue() {
+        return new Queue(instructionsQueue);
     }
 
     /**
      * 绑定exchange & queue & routekey.
-     *
-     * @return
      */
     @Bean
-    public Binding bindingExchange() {
-        return BindingBuilder.bind(queue()).to(exchange()).with(routingKey);
+    public Binding bindingInstructionsExchange() {
+        return BindingBuilder.bind(instructionsQueue()).to(exchange()).with(instructionsRoutingKey);
     }
+
 }

+ 1 - 0
src/main/java/com/fjhx/rfid/r2000/common/service/IRFIDService.java

@@ -19,4 +19,5 @@ public interface IRFIDService {
      */
     @Async
     void receiveInstructions(JSONObject jsonObject);
+
 }

+ 11 - 5
src/main/resources/application.yml

@@ -54,11 +54,17 @@ spring:
     send:
       socket:
         # 交换机
-        exchange: rfid1_to_rfid2_exchange
-        # 路由器
-        routingKey: rfid1_to_rfid2_routingKey
-        # 队列
-        queue: rfid1_to_rfid2_queue
+        exchange: rfid_exchange
+
+        # rfid数据路由器
+        routingKey: rfid_data_routingKey
+        # rfid数据队列
+        queue: rfid_data_queue
+
+        # 指令路由器
+        instructionsRoutingKey: instructions_routingKey
+        # 指令队列
+        instructionsQueue: instructions_queue
 
 
 #redisson分布式锁配置