浏览代码

基础模块

caozj 2 年之前
父节点
当前提交
0b11e4d9df

+ 130 - 0
bladex-tool/blade-core-tool/src/main/java/org/springblade/core/tool/utils/SpringUtils.java

@@ -0,0 +1,130 @@
+package org.springblade.core.tool.utils;
+
+import org.springframework.aop.framework.AopContext;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+/**
+ * spring工具类 方便在非spring管理环境中获取bean
+ */
+@Component
+public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware {
+
+    /**
+     * Spring应用上下文环境
+     */
+    private static ConfigurableListableBeanFactory beanFactory;
+
+    private static ApplicationContext applicationContext;
+
+    @Override
+    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
+        SpringUtils.beanFactory = beanFactory;
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        SpringUtils.applicationContext = applicationContext;
+    }
+
+    /**
+     * 获取对象
+     *
+     * @param name
+     * @return Object 一个以所给名字注册的bean的实例
+     * @throws BeansException
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getBean(String name) throws BeansException {
+        return (T) beanFactory.getBean(name);
+    }
+
+    /**
+     * 获取类型为requiredType的对象
+     *
+     * @param clz
+     * @return
+     * @throws BeansException
+     */
+    public static <T> T getBean(Class<T> clz) throws BeansException {
+        T result = (T) beanFactory.getBean(clz);
+        return result;
+    }
+
+    /**
+     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
+     *
+     * @param name
+     * @return boolean
+     */
+    public static boolean containsBean(String name) {
+        return beanFactory.containsBean(name);
+    }
+
+    /**
+     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
+     *
+     * @param name
+     * @return boolean
+     * @throws NoSuchBeanDefinitionException
+     */
+    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
+        return beanFactory.isSingleton(name);
+    }
+
+    /**
+     * @param name
+     * @return Class 注册对象的类型
+     * @throws NoSuchBeanDefinitionException
+     */
+    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
+        return beanFactory.getType(name);
+    }
+
+    /**
+     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
+     *
+     * @param name
+     * @return
+     * @throws NoSuchBeanDefinitionException
+     */
+    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
+        return beanFactory.getAliases(name);
+    }
+
+    /**
+     * 获取aop代理对象
+     *
+     * @param invoker
+     * @return
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T getAopProxy(T invoker) {
+        return (T) AopContext.currentProxy();
+    }
+
+    /**
+     * 获取当前的环境配置,无配置返回null
+     *
+     * @return 当前的环境配置
+     */
+    public static String[] getActiveProfiles() {
+        return applicationContext.getEnvironment().getActiveProfiles();
+    }
+
+//    /**
+//     * 获取当前的环境配置,当有多个环境配置时,只获取第一个
+//     *
+//     * @return 当前的环境配置
+//     */
+//    public static String getActiveProfile() {
+//        final String[] activeProfiles = getActiveProfiles();
+//        return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
+//    }
+
+}

+ 5 - 1
hx-saas-project/pom.xml

@@ -102,7 +102,11 @@
             <artifactId>blade-system-api</artifactId>
             <version>${bladex.project.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.2.0</version>
+        </dependency>
         <!-- 工具 -->
         <dependency>
             <groupId>org.projectlombok</groupId>

+ 17 - 0
hx-saas-project/saas-entity/src/main/java/com/fjhx/rocketmq/Message.java

@@ -0,0 +1,17 @@
+package com.fjhx.rocketmq;
+
+import lombok.Data;
+
+/**
+ * @Author:caozj
+ * @DATE:2022/7/15 23:33
+ */
+@Data
+public class Message {
+    private String topic;
+    private String tag;
+    private Object content;
+    private String key;
+    private String
+
+}

+ 3 - 3
hx-saas-project/saas-rocketmq/pom.xml

@@ -24,9 +24,9 @@
             <artifactId>blade-starter-excel</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>${rocketmq.version}</version>
+            <groupId>com.fjhx</groupId>
+            <artifactId>saas-feign-api</artifactId>
+            <version>${bladex.project.version}</version>
         </dependency>
     </dependencies>
 

+ 0 - 86
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/RocketMQConfiguration.java

@@ -1,86 +0,0 @@
-package com.fjhx.rocketmq.config;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import javax.annotation.PostConstruct;
-
-/**
- * 事务消息
- * @author caozhoujun
- * @version 1.2
- * @date 2022/7/15
- */
-@Configuration
-@EnableConfigurationProperties(RocketMQProperties.class)
-@Slf4j
-public class RocketMQConfiguration {
-
-    @Autowired
-    private RocketMQProperties rocketMQProperties;
-
-    //事件监听
-    @Autowired
-    private ApplicationEventPublisher publisher = null;
-
-    /**
-     * 容器初始化的时候 打印参数
-     */
-    @PostConstruct
-    public void init() {
-        System.out.println("--------Rocketmq初始化---------");
-        System.err.println(rocketMQProperties.getNamesrvAddr());
-        System.err.println(rocketMQProperties.getProducerGroupName());
-        System.err.println(rocketMQProperties.getProducerInstanceName());
-        System.err.println(rocketMQProperties.getProducerTranInstanceName());
-    }
-
-    /**
-     * 创建普通消息发送者实例
-     * @return
-     * @throws MQClientException
-     */
-    @Bean
-    public DefaultMQProducer defaultProducer() throws MQClientException {
-        DefaultMQProducer producer = new DefaultMQProducer(
-                rocketMQProperties.getProducerGroupName());
-        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
-        producer.setInstanceName(rocketMQProperties.getProducerInstanceName());
-        producer.setVipChannelEnabled(false);
-        producer.setRetryTimesWhenSendAsyncFailed(10);
-        producer.start();
-        log.info("rocketmq producer server is starting....");
-        return producer;
-    }
-
-    /**
-     * 创建支持消息事务发送的实例
-     * @return
-     * @throws MQClientException
-     */
-    @Bean
-    public TransactionMQProducer transactionProducer() throws MQClientException {
-        TransactionMQProducer producer = new TransactionMQProducer(
-                rocketMQProperties.getTransactionProducerGroupName());
-        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
-        producer.setInstanceName(rocketMQProperties
-                .getProducerTranInstanceName());
-        producer.setRetryTimesWhenSendAsyncFailed(10);
-        // 事务回查最小并发数
-        producer.setCheckThreadPoolMinSize(2);
-        // 事务回查最大并发数
-        producer.setCheckThreadPoolMaxSize(2);
-        // 队列数
-        producer.setCheckRequestHoldMax(2000);
-        producer.start();
-        log.info("rocketmq transaction producer server is starting....");
-        return producer;
-    }
-}

+ 0 - 33
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/RocketMQProperties.java

@@ -1,33 +0,0 @@
-package com.fjhx.rocketmq.config;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.experimental.Accessors;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author caozhoujun
- * @version 1.2
- * @date 2022/7/15
- */
-@PropertySource("classpath:config/rocketmq.properties")
-@ConfigurationProperties(prefix = "hx.rocketmq")
-@Configuration
-@Setter
-@Getter
-@ToString
-@Accessors(chain = true)
-public class RocketMQProperties {
-    private String namesrvAddr;
-    private String producerGroupName;
-    private String transactionProducerGroupName;
-    private String producerInstanceName;
-    private String producerTranInstanceName;
-    private List<String> subscribe = new ArrayList<String>();
-}

+ 0 - 63
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/config/TransactionListener.java

@@ -1,63 +0,0 @@
-package com.fjhx.rocketmq.config;
-
-import org.apache.rocketmq.client.producer.LocalTransactionState;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author caozhoujun
- * @version 1.2
- * @date 2022/7/15
- */
-public class TransactionListener implements org.apache.rocketmq.client.producer.TransactionListener {
-
-    private ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<>();
-    /**
-     * 执行本地事务
-     * @param message
-     * @param o
-     * @return
-     */
-    @Override
-    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
-        String id = message.getTransactionId();
-        //0:未知状态 1:执行成功  2:执行失败
-        map.put(id,0);
-        try {
-            System.out.println("正在执行本地事务");
-            Thread.sleep(5000);
-            map.put(id,1);
-            System.out.println("执行本地事务成功");
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            map.put(id,2);
-            System.out.println("执行本地事务失败");
-            return LocalTransactionState.ROLLBACK_MESSAGE;
-        }
-        System.out.println(id);
-        return LocalTransactionState.COMMIT_MESSAGE;
-    }
-
-    /**
-     * 执行消息回查
-     * @param messageExt
-     * @return
-     */
-    @Override
-    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
-        System.out.println("执行消息回查");
-        String id = messageExt.getTransactionId();
-        Integer result = map.get(id);
-        switch (result){
-            case 0:
-                return LocalTransactionState.UNKNOW;
-            case 1:
-                return LocalTransactionState.COMMIT_MESSAGE;
-            case 2:
-                return LocalTransactionState.ROLLBACK_MESSAGE;
-        }
-        return LocalTransactionState.UNKNOW;
-    }
-}

+ 25 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/consumer/ConsumerService.java

@@ -0,0 +1,25 @@
+package com.fjhx.rocketmq.consumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+
+/**
+ * @author caozhoujun
+ * @version 1.2
+ * @date 2020/6/22
+ */
+@Service
+@RocketMQMessageListener(topic = "topicOne",
+        consumerGroup = "consumerOne",
+        selectorExpression = "tagOne")
+public class ConsumerService implements RocketMQListener<String> {
+
+    private DefaultMQPushConsumer consumer;
+
+    @Override
+    public void onMessage(String s) {
+        System.err.println("消费者成功消费消息:"+s);
+    }
+}

+ 0 - 57
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/consumer/ConsumerServiceERP.java

@@ -1,57 +0,0 @@
-package com.fjhx.rocketmq.consumer;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * @author caozhoujun
- * @version 1.2
- * @date 2020/6/22
- */
-@Component
-public class ConsumerServiceERP {
-
-    private DefaultMQPushConsumer consumer;
-
-    private String consumerGroup = "rocketmq_test_group_official_erp";
-
-
-
-    public ConsumerServiceERP() throws MQClientException {
-        consumer = new DefaultMQPushConsumer(consumerGroup);
-        consumer.setNamesrvAddr("127.0.0.1:9876");
-        // 设置消费地点,从最后一个进行消费(其实就是消费策略)
-        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-        // 订阅主题的哪些标签
-        consumer.subscribe("testone", "erp");
-        // 注册监听器
-        consumer.registerMessageListener((MessageListenerConcurrently)
-                (msgs, context) -> {
-                    try {
-                        // 获取Message
-                        Message msg = msgs.get(0);
-                        // 标签
-                        String str = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
-                        System.out.println("消费erp数据" + str);
-                        JSONObject jsonObject = JSONObject.parseObject(str);
-                        String whodoesp = jsonObject.get("whodoesp").toString();
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    }
-
-                });
-        consumer.start();
-    }
-}

+ 0 - 83
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/producer/ProducerController.java

@@ -1,13 +1,5 @@
 package com.fjhx.rocketmq.producer;
 
-import com.alibaba.fastjson.JSONObject;
-import com.fjhx.rocketmq.config.RocketMQProperties;
-import org.apache.rocketmq.client.producer.*;
-import org.apache.rocketmq.common.message.Message;
-import org.springblade.core.tool.api.R;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.UUID;
@@ -19,80 +11,5 @@ import java.util.UUID;
  */
 @RestController
 public class ProducerController {
-    @Autowired
-    private DefaultMQProducer defaultProducer;
-
-    @Autowired
-    private RocketMQProperties rocketMQProperties;
-
-    private static final String TOPIC="rocketmq_test";
-    private static final String TAG="rocketmq_official";
-
-    /**
-     * 同步发送消息
-     */
-    @PostMapping("/sendSyncMessage")
-    private R sync(@RequestBody JSONObject map) throws Exception {
-        Message msg = new Message();
-        msg.setTopic(map.get("topic").toString());
-        msg.setTags(map.get("tag").toString());
-        msg.setKeys(String.valueOf(UUID.randomUUID()));
-        msg.setBody(JSONObject.toJSONBytes(map.get("msg")));
-        //同步发送消息
-        SendResult result = defaultProducer.send(msg);
-        if(result.toString().contains("SEND_OK")){
-            return R.success();
-        }
-        return R.fail("发送失败");
-    }
-    /**
-     * 异步发送消息
-     */
-    @PostMapping("/sendAsyncMessage")
-    private  R async(@RequestBody JSONObject map) throws Exception {
-        if(!map.get("topic").toString().contains("rocketmq_erp")){
-            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
-                return R.fail("消息格式有误");
-            }
-        }
-        //创建消息
-        Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUID.randomUUID()),JSONObject.toJSONBytes(map.get("msg")));
-        //异步发送消息
-        defaultProducer.send(message, new SendCallback() {
-            @Override
-            public void onSuccess(SendResult sendResult) {
-                System.out.println("异步发送消息:"+sendResult);
-            }
-            @Override
-            public void onException(Throwable e) {
-                e.printStackTrace();
-                //补偿机制,根据业务情况进行使用,看是否进行重试
-                try {
-                    defaultProducer.setRetryTimesWhenSendFailed(5);
-                    defaultProducer.send(message,5000L);
-                } catch (Exception ex) {
-                    ex.printStackTrace();
-                }
-            }
-        });
-        return R.success();
-    }
-    /**
-     * 单项发送消息
-     */
-
-    @PostMapping("/sendOneWayMessage")
-    private  R oneWay(@RequestBody JSONObject map) throws Exception {
-        if(!map.get("topic").toString().contains("rocketmq_erp")){
-            if(!map.get("msg").toString().contains("table")||map.get("msg").toString()==null||"".equals(map.get("msg").toString())){
-                return R.fail("消息格式有误");
-            }
-        }
-        //创建消息
-        Message message = new Message(map.get("topic").toString(),map.get("tag").toString(),String.valueOf(UUID.randomUUID()),JSONObject.toJSONBytes(map.get("msg")));
-        //同步发送消息
-        defaultProducer.sendOneway(message);
-        return R.success();
-    }
 
 }

+ 62 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/service/Impl/RocketMqServiceImpl.java

@@ -0,0 +1,62 @@
+package com.fjhx.rocketmq.service.Impl;
+
+import com.fjhx.rocketmq.Message;
+import com.fjhx.rocketmq.service.RocketMqService;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Service;
+
+/**
+ * @Author:caozj
+ * @DATE:2022/7/15 23:36
+ */
+@Service
+public class RocketMqServiceImpl implements RocketMqService {
+
+    @Autowired
+    private RocketMQTemplate rocketMQTemplate;
+
+    /**
+     * 普通消息
+     * @param msg
+     */
+    @Override
+    public void send(Message msg) {
+        System.err.println("send发送消息:"+msg);
+        rocketMQTemplate.send(msg.getTopic() + ":" + msg.getTag(),
+                MessageBuilder.withPayload(msg.getContent()).build());
+    }
+
+    /**
+     * 异步消息
+     * @param msg
+     */
+    @Override
+    public void asyncSend(Message msg) {
+        System.err.println("asyncSend发送消息:"+msg);
+        rocketMQTemplate.asyncSend(msg.getTopic() + ":" + msg.getTag(), msg.getContent(),
+                new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        System.err.println("事物消息发送成功:"+sendResult.getTransactionId());
+                    }
+                    @Override
+                    public void onException(Throwable throwable) {
+                        System.err.println("mqMsg={"+msg+"}消息发送失败:");
+                    }
+                });
+    }
+
+    /**
+     * 单项消息
+     * @param msg
+     */
+    @Override
+    public void syncSendOrderly(Message msg) {
+        System.err.println("syncSendOrderly发送消息:"+msg);
+        rocketMQTemplate.sendOneWay(msg.getTopic() + ":" + msg.getTag(), msg.getContent());
+    }
+}

+ 22 - 0
hx-saas-project/saas-rocketmq/src/main/java/com/fjhx/rocketmq/service/RocketMqService.java

@@ -0,0 +1,22 @@
+package com.fjhx.rocketmq.service;
+
+import com.fjhx.rocketmq.Message;
+
+/**
+ * @Author:caozj
+ * @DATE:2022/7/15 23:34
+ */
+public interface RocketMqService {
+    /**
+     * 同步发送消息
+     */
+    void send(Message mqMsg);
+    /**
+     * 异步发送消息,异步返回消息结果
+     */
+    void asyncSend(Message mqMsg);
+    /**
+     * 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
+     */
+    void syncSendOrderly(Message mqMsg);
+}

+ 6 - 0
hx-saas-project/saas-rocketmq/src/main/resources/application-dev.yml

@@ -11,3 +11,9 @@ spring:
     url: ${blade.datasource.storage.dev.url}
     username: ${blade.datasource.storage.dev.username}
     password: ${blade.datasource.storage.dev.password}
+
+#rocketmq 配置
+rocketmq:
+  name-server: 127.0.0.1:9876
+  producer:
+    group: java

+ 0 - 10
hx-saas-project/saas-rocketmq/src/main/resources/config/rocketmq.properties

@@ -1,10 +0,0 @@
-
-hx.rocketmq.namesrvAddr=127.0.0.1:9876
-
-hx.rocketmq.producerGroupName=user_group_p
-
-hx.rocketmq.transactionProducerGroupName=order_transaction
-
-hx.rocketmq.producerInstanceName=user_producer_instance
-
-hx.rocketmq.producerTranInstanceName=user_producer_transacition

+ 0 - 6
hx-saas-project/saas-storage/src/main/java/com/fjhx/attachment/controller/StockAttachmentController.java

@@ -25,12 +25,6 @@ public class StockAttachmentController {
     @Autowired
     private StockAttachmentService stockAttachmentService;
 
-    @PostMapping("/page")
-    public R page(@RequestBody Map<String, String> condition){
-        Page<StockAttachment> result = stockAttachmentService.getPage(condition);
-        return R.success(result);
-    }
-
     @PostMapping("/add")
     public R add(@RequestBody StockAttachmentVo stockAttachmentVo){
         stockAttachmentService.add(stockAttachmentVo);