24282 10 hónapja
szülő
commit
3266236114

+ 4 - 0
jy-business/pom.xml

@@ -26,6 +26,10 @@
             <groupId>com.jy</groupId>
             <artifactId>jy-flow</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 36 - 0
jy-business/src/main/java/com/jy/business/config/RabbitConfig.java

@@ -0,0 +1,36 @@
+package com.jy.business.config;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.annotation.EnableRabbit;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@EnableRabbit
+@Configuration
+public class RabbitConfig {
+
+    public static final String JY_DIRECT_EXCHANGE = "jyDirectExchange";
+    public static final String TRANSACTIONS_QUEUE = "transactionsQueue";
+
+    //创建一个队列
+    @Bean
+    public Queue transactionsQueue() {
+        return new Queue(TRANSACTIONS_QUEUE, true);
+    }
+
+    //创建一个Direct类型的交换机
+    @Bean
+    public DirectExchange jyDirectExchange() {
+        return new DirectExchange(JY_DIRECT_EXCHANGE, true, false);
+    }
+
+    //绑定交换机和队列
+    @Bean
+    public Binding binding() {
+        return BindingBuilder.bind(transactionsQueue()).to(jyDirectExchange()).withQueueName();
+    }
+
+}

+ 49 - 0
jy-business/src/main/java/com/jy/business/listener/JyRabbitListener.java

@@ -0,0 +1,49 @@
+package com.jy.business.listener;
+
+import com.alibaba.fastjson2.JSON;
+import com.jy.business.capital.model.dto.CapitalTransactionsDto;
+import com.jy.business.capital.service.CapitalTransactionsService;
+import com.jy.business.config.RabbitConfig;
+import com.rabbitmq.client.Channel;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Exchange;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+@Slf4j
+@Component
+public class JyRabbitListener {
+
+    @Resource
+    private CapitalTransactionsService capitalTransactionsService;
+
+    @RabbitListener(bindings = {
+            @QueueBinding(
+                    value = @Queue(RabbitConfig.TRANSACTIONS_QUEUE),
+                    exchange = @Exchange(RabbitConfig.JY_DIRECT_EXCHANGE)
+            )
+    })
+    public void msgSend(String msg, Channel channel, Message message) throws IOException {
+        log.info("消费者收到消息:【{}】", msg);
+        long deliveryTag = message.getMessageProperties().getDeliveryTag();
+
+        try {
+            CapitalTransactionsDto dto = JSON.parseObject(msg).to(CapitalTransactionsDto.class);
+            dto.setTargetType(80);
+            capitalTransactionsService.add(dto);
+
+            channel.basicAck(deliveryTag, false);
+        } catch (Exception e) {
+            channel.basicNack(deliveryTag, false, true);
+            log.error("消息消费失败:【{}】", e.getMessage(), e);
+        }
+
+    }
+
+}

+ 14 - 0
jy-starter/src/main/resources/application-dev.yml

@@ -12,6 +12,20 @@ spring:
           url: jdbc:mysql://sh-cdb-22oyn58i.sql.tencentcdb.com:63963/jy_erp_test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=Asia/Shanghai
           username: root
           password: PdKKy%vZ)2mGykg
+  rabbitmq:
+    port: 5672
+    host: 111.229.165.209
+    username: JyByteSailingAdmin
+    password: ds7us8nqt@777s
+    #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
+    publisher-confirm-type: correlated
+    #保证交换机能把消息推送到队列中
+    publisher-returns: true
+    virtual-host: JyByteSailingTest
+    #这个配置是保证消费者会消费消息,手动确认
+    listener:
+      simple:
+        acknowledge-mode: manual
 
   data:
     redis: