From 0dac5a544f853a32dd5a1e7a84d9bf0751b259ec Mon Sep 17 00:00:00 2001 From: Xbhog <5200621+xbhog@user.noreply.gitee.com> Date: Mon, 3 Jun 2024 14:50:47 +0000 Subject: [PATCH] =?UTF-8?q?!163=20=E5=B0=86spring-cloud-stream=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=99=AE=E9=80=9A=E7=9A=84mq=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E7=94=A8=E6=B3=95=20*=20update:=20sky=E6=97=A5=E5=BF=97=20*=20?= =?UTF-8?q?update:=20RocketMQ=E7=9A=84=E9=9B=86=E6=88=90=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=20*=20feat:1.=20rabbit:=20=E6=99=AE=E9=80=9A=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E3=80=81=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 8 ++ ruoyi-example/pom.xml | 2 +- .../stream/controller/TestMqController.java | 60 ---------- .../org/dromara/stream/mq/TestMessaging.java | 20 ---- .../stream/mq/consumer/DelayConsumer.java | 22 ---- .../stream/mq/consumer/LogStreamConsumer.java | 22 ---- .../mq/consumer/TestStreamConsumer.java | 22 ---- .../stream/mq/producer/DelayProducer.java | 27 ----- .../stream/mq/producer/LogStreamProducer.java | 24 ---- .../mq/producer/TestStreamProducer.java | 24 ---- .../src/main/resources/application.yml | 109 ------------------ .../src/main/resources/banner.txt | 10 -- ruoyi-example/ruoyi-test-mq/README.md | 102 ++++++++++++++++ .../pom.xml | 26 ++--- .../stream/RuoYiTestMqApplication.java} | 7 +- .../dromara/stream/config/RabbitConfig.java | 54 +++++++++ .../stream/config/RabbitTtlQueueConfig.java | 69 +++++++++++ .../controller/PushMessageController.java | 72 ++++++++++++ .../consumer/kafkaMq/KafkaNormalConsumer.java | 24 ++++ .../mq/consumer/rabbit/ConsumerListener.java | 36 ++++++ .../rocketmq/NormalRocketConsumer.java | 20 ++++ .../rocketmq/TransactionRocketConsumer.java | 21 ++++ .../listener/TranscationRocketListener.java | 42 +++++++ .../producer/kafkaMq/KafkaNormalProducer.java | 24 ++++ .../rabbitMq/DelayRabbitProducer.java | 25 ++++ .../rabbitMq/NormalRabbitProducer.java | 24 ++++ .../rocketMq/NormalRocketProducer.java | 25 ++++ .../rocketMq/TransactionRocketProducer.java | 41 +++++++ .../src/main/resources/application.yml | 44 +++++++ .../src/main/resources/logback-plus.xml | 3 +- 30 files changed, 649 insertions(+), 360 deletions(-) delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml delete mode 100644 ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt create mode 100644 ruoyi-example/ruoyi-test-mq/README.md rename ruoyi-example/{ruoyi-stream-mq => ruoyi-test-mq}/pom.xml (82%) rename ruoyi-example/{ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java => ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java} (84%) create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml rename ruoyi-example/{ruoyi-stream-mq => ruoyi-test-mq}/src/main/resources/logback-plus.xml (96%) diff --git a/pom.xml b/pom.xml index 9c965722..9772dea0 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ 1.3.6 0.2.0 1.16.6 + 2.3.0 2.7.0 @@ -384,6 +385,13 @@ ${mapstruct-plus.version} + + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rocketmq-version} + + diff --git a/ruoyi-example/pom.xml b/ruoyi-example/pom.xml index eebbb562..1670137e 100644 --- a/ruoyi-example/pom.xml +++ b/ruoyi-example/pom.xml @@ -10,7 +10,7 @@ ruoyi-demo - ruoyi-stream-mq + ruoyi-test-mq ruoyi-example diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java deleted file mode 100644 index bbfee282..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.dromara.stream.controller; - -import org.dromara.common.core.domain.R; -import org.dromara.stream.mq.producer.DelayProducer; -import org.dromara.stream.mq.producer.LogStreamProducer; -import org.dromara.stream.mq.producer.TestStreamProducer; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -/** - * 测试mq - */ -@Slf4j -@RestController -@AllArgsConstructor -@RequestMapping("/test-mq") -public class TestMqController { - - private final DelayProducer delayProducer; - private final TestStreamProducer testStreamProducer; - private final LogStreamProducer logStreamProducer; - - /** - * 发送消息Rabbitmq - * - * @param msg 消息内容 - * @param delay 延时时间 - */ - @GetMapping("/sendRabbitmq") - public R sendRabbitmq(String msg, Long delay) { - delayProducer.sendMsg(msg, delay); - return R.ok(); - } - - /** - * 发送消息Rocketmq - * - * @param msg 消息内容 - */ - @GetMapping("/sendRocketmq") - public R sendRocketmq(String msg) { - testStreamProducer.streamTestMsg(msg); - return R.ok(); - } - - /** - * 发送消息Kafka - * - * @param msg 消息内容 - */ - @GetMapping("/sendKafka") - public R sendKafka(String msg) { - logStreamProducer.streamLogMsg(msg); - return R.ok(); - } - -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java deleted file mode 100644 index 9759beed..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.dromara.stream.mq; - -import lombok.Data; -import lombok.experimental.Accessors; - -/** - * @author Lion Li - */ -@Data -@Accessors(chain = true) -public class TestMessaging { - /** - * 消息id - */ - private String msgId; - /** - * 消息内容 - */ - private String msgText; -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java deleted file mode 100644 index 60452868..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.dromara.stream.mq.consumer; - - -import org.dromara.stream.mq.TestMessaging; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import java.util.function.Consumer; - -@Slf4j -@Component -public class DelayConsumer { - - @Bean - Consumer delay() { - log.info("初始化订阅"); - return obj -> { - log.info("消息接收成功:" + obj); - }; - } -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java deleted file mode 100644 index b21eb551..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.dromara.stream.mq.consumer; - -import org.dromara.stream.mq.TestMessaging; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import java.util.function.Consumer; - -@Slf4j -@Component -public class LogStreamConsumer { - - @Bean - Consumer log() { - log.info("初始化订阅"); - return msg -> { - log.info("通过stream消费到消息 => {}", msg.toString()); - }; - } - -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java deleted file mode 100644 index 0163c780..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.dromara.stream.mq.consumer; - -import org.dromara.stream.mq.TestMessaging; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -import java.util.function.Consumer; - -@Slf4j -@Component -public class TestStreamConsumer { - - @Bean - Consumer demo() { - log.info("初始化订阅"); - return msg -> { - log.info("通过stream消费到消息 => {}", msg.toString()); - }; - } - -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java deleted file mode 100644 index b879d916..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.dromara.stream.mq.producer; - -import org.dromara.stream.mq.TestMessaging; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Component; - -import java.util.UUID; - -@Component -public class DelayProducer { - - @Autowired - private StreamBridge streamBridge; - - public void sendMsg(String msg, Long delay) { - // 构建消息对象 - TestMessaging testMessaging = new TestMessaging() - .setMsgId(UUID.randomUUID().toString()) - .setMsgText(msg); - Message message = MessageBuilder.withPayload(testMessaging) - .setHeader("x-delay", delay).build(); - streamBridge.send("delay-out-0", message); - } -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java deleted file mode 100644 index bff39240..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.dromara.stream.mq.producer; - -import org.dromara.stream.mq.TestMessaging; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Component; - -import java.util.UUID; - -@Component -public class LogStreamProducer { - - @Autowired - private StreamBridge streamBridge; - - public void streamLogMsg(String msg) { - // 构建消息对象 - TestMessaging testMessaging = new TestMessaging() - .setMsgId(UUID.randomUUID().toString()) - .setMsgText(msg); - streamBridge.send("log-out-0", MessageBuilder.withPayload(testMessaging).build()); - } -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java deleted file mode 100644 index a23c1fc5..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.dromara.stream.mq.producer; - -import org.dromara.stream.mq.TestMessaging; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.stereotype.Component; - -import java.util.UUID; - -@Component -public class TestStreamProducer { - - @Autowired - private StreamBridge streamBridge; - - public void streamTestMsg(String msg) { - // 构建消息对象 - TestMessaging testMessaging = new TestMessaging() - .setMsgId(UUID.randomUUID().toString()) - .setMsgText(msg); - streamBridge.send("demo-out-0", MessageBuilder.withPayload(testMessaging).build()); - } -} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml deleted file mode 100644 index a3342005..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml +++ /dev/null @@ -1,109 +0,0 @@ -server: - port: 9402 - -# Spring -spring: - application: - # 应用名称 - name: ruoyi-stream-mq - profiles: - # 环境配置 - active: @profiles.active@ - cloud: - stream: - function: - # 重点配置 与 binding 名与消费者对应 - definition: delay;demo;log - ---- # rabbitmq 配置 -spring: - rabbitmq: - host: localhost - port: 5672 - username: root - password: root - cloud: - stream: - rabbit: - bindings: - delay-in-0: - consumer: - delayedExchange: true - delay-out-0: - producer: - delayedExchange: true - bindings: - delay-in-0: - destination: delay.exchange.cloud - content-type: application/json - group: delay-group - binder: rabbit - delay-out-0: - destination: delay.exchange.cloud - content-type: application/json - group: delay-group - binder: rabbit - ---- # rocketmq 配置 -spring: - cloud: - stream: - rocketmq: - binder: - # rocketmq 地址 - name-server: localhost:9876 - bindings: - demo-out-0: - producer: - # 必须得写 - group: default - bindings: - demo-out-0: - content-type: application/json - destination: stream-test-topic - group: test-group - binder: rocketmq - demo-in-0: - content-type: application/json - destination: stream-test-topic - group: test-group - binder: rocketmq - ---- # kafka 配置 -spring: - cloud: - stream: - kafka: - binder: - brokers: localhost:9092 - bindings: - log-out-0: - destination: stream-log-topic - contentType: application/json - group: log_group - binder: kafka - log-in-0: - destination: stream-log-topic - contentType: application/json - group: log_group - binder: kafka - ---- # nacos 配置 -spring: - cloud: - nacos: - # nacos 服务地址 - server-addr: @nacos.server@ - username: @nacos.username@ - password: @nacos.password@ - discovery: - # 注册组 - group: @nacos.discovery.group@ - namespace: ${spring.profiles.active} - config: - # 配置组 - group: @nacos.config.group@ - namespace: ${spring.profiles.active} - config: - import: - - optional:nacos:application-common.yml diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt b/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt deleted file mode 100644 index 8a8054c1..00000000 --- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt +++ /dev/null @@ -1,10 +0,0 @@ -Spring Boot Version: ${spring-boot.version} -Spring Application Name: ${spring.application.name} - _ _ - (_) | | - _ __ _ _ ___ _ _ _ ______ ___| |_ _ __ ___ __ _ _ __ ___ ______ _ __ ___ __ _ -| '__| | | |/ _ \| | | | |______/ __| __| '__/ _ \/ _` | '_ ` _ \______| '_ ` _ \ / _` | -| | | |_| | (_) | |_| | | \__ \ |_| | | __/ (_| | | | | | | | | | | | | (_| | -|_| \__,_|\___/ \__, |_| |___/\__|_| \___|\__,_|_| |_| |_| |_| |_| |_|\__, | - __/ | | | - |___/ |_| diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md new file mode 100644 index 00000000..9fba827c --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/README.md @@ -0,0 +1,102 @@ +# ruoyi-test-mq + +## 模块说明 + +该模块基于需求【[将spring-cloud-stream改为普通spring的mq依赖用法】下修改编写; + +原模块缺点:功能复杂学习成本高 大部分用户用不明白 功能封闭 特性无法使用 项目中基本不会有切换mq的事情发生; + +现模块:集成基础的rabbit、rocketmq、kafka等主流的中间件,功能包含 + +1. rabbit: 普通消息、延迟队列 +2. rocket:普通消息、事务消息 +3. kafka:普通消息 + +后续可实现的: + +1. kafka stream流的使用 +2. rocket 顺序、异步、延时等 + +## 项目目录 + +```xml +├─src +│ └─main +│ ├─java +│ │ └─org +│ │ └─dromara +│ │ └─stream +│ │ │ RuoYiTestMqApplication.java +│ │ │ +│ │ ├─config +│ │ │ RabbitConfig.java 普通消息配置类 +│ │ │ RabbitTtlQueueConfig.java 延迟队列配置类 +│ │ │ +│ │ ├─controller 测试类 +│ │ │ PushMessageController.java +│ │ │ +│ │ └─mq +│ │ ├─consumer +│ │ │ ├─kafkaMq +│ │ │ │ KafkaNormalConsumer.java +│ │ │ ├─rabbit +│ │ │ │ ConsumerListener.java +│ │ │ └─rocketmq +│ │ │ NormalRocketConsumer.java +│ │ │ TransactionRocketConsumer.java +│ │ ├─listener +│ │ │ TranscationRocketListener.java +│ │ └─producer +│ │ ├─kafkaMq +│ │ │ KafkaNormalProducer.java +│ │ ├─rabbitMq +│ │ │ DelayRabbitProducer.java +│ │ │ NormalRabbitProducer.java +│ │ └─rocketMq +│ │ NormalRocketProducer.java +│ │ TransactionRocketProducer.java +│ │ +│ └─resources +│ application.yml IP:Host根据实际情况替换 +│ logback-plus.xml +``` + +## 使用方式 + +rocketmq: + +**注意:需要进入到rockerMQ文件路径中执行** + +创建普通消息的topic + +```sh +sh mqadmin updateTopic -n -t -c -a +message.type=NORMAL +``` + +```shell +bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster +``` + +创建事务消息的topic + +```sh +sh mqadmin updateTopic -n -t -c -a +message.type=TRANSACTION +``` + +```shell +bin/mqadmin updatetopic -n localhost:9876 -t transaction_topic -c DefaultCluster -a +message.type=TRANSACTION +``` + +kafka: + +```shell +kafka-topics.sh --create --topic --bootstrap-server --partitions --replication-factor +``` + +```shell +kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +``` + +## 验证方式 + +可通过`PushMessageController`实现`Restful`进行测试; \ No newline at end of file diff --git a/ruoyi-example/ruoyi-stream-mq/pom.xml b/ruoyi-example/ruoyi-test-mq/pom.xml similarity index 82% rename from ruoyi-example/ruoyi-stream-mq/pom.xml rename to ruoyi-example/ruoyi-test-mq/pom.xml index 4c5064b8..88cdb8f5 100644 --- a/ruoyi-example/ruoyi-stream-mq/pom.xml +++ b/ruoyi-example/ruoyi-test-mq/pom.xml @@ -9,10 +9,10 @@ 4.0.0 - ruoyi-stream-mq + ruoyi-test-mq - ruoyi-stream-mq SpringCloud-Stream-MQ 案例项目 + ruoyi-test-mq 案例项目 @@ -21,21 +21,19 @@ org.dromara ruoyi-common-nacos - - org.springframework.cloud - spring-cloud-starter-stream-rabbit + org.springframework.boot + spring-boot-starter-amqp + + + org.apache.rocketmq + rocketmq-spring-boot-starter + + + org.springframework.kafka + spring-kafka - - com.alibaba.cloud - spring-cloud-starter-stream-rocketmq - - - - org.springframework.cloud - spring-cloud-starter-stream-kafka - org.dromara diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java similarity index 84% rename from ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java index 2c46467a..a705c67b 100644 --- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java @@ -5,15 +5,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; /** - * SpringCloud-Stream-MQ 案例项目 - * + * SpringBoot-MQ 案例项目 * @author Lion Li */ @SpringBootApplication -public class RuoYiStreamMqApplication { +public class RuoYiTestMqApplication { public static void main(String[] args) { - SpringApplication application = new SpringApplication(RuoYiStreamMqApplication.class); + SpringApplication application = new SpringApplication(RuoYiTestMqApplication.class); application.setApplicationStartup(new BufferingApplicationStartup(2048)); application.run(args); System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ "); diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java new file mode 100644 index 00000000..3995599c --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java @@ -0,0 +1,54 @@ +package org.dromara.stream.config; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author xbhog + */ +@Configuration +public class RabbitConfig { + + public static final String EXCHANGE_NAME = "demo-exchange"; + public static final String QUEUE_NAME = "demo-queue"; + public static final String ROUTING_KEY = "demo.routing.key"; + /** + * 创建交换机 + * ExchangeBuilder有四种交换机模式 + * Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。 + * Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。 + * Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。 + * Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。 + * durable 交换器是否持久化(false 不持久化,true 持久化) + **/ + @Bean + public TopicExchange exchange() { + return new TopicExchange(EXCHANGE_NAME); + } + /** + * 创建队列 + * durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码 + * deliveryMode 消息是否持久化(1 不持久化,2 持久化) + **/ + @Bean + public Queue queue() { + return new Queue(QUEUE_NAME, false); + } + /** + * 绑定交换机和队列 + * bing 方法参数可以是队列和交换机 + * to 方法参数必须是交换机 + * with 方法参数是路由Key 这里是以rabbit.开头 + * noargs 就是不要参数的意思 + * 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定 + **/ + @Bean + public Binding binding(Queue queue, TopicExchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java new file mode 100644 index 00000000..9fc9693a --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java @@ -0,0 +1,69 @@ +package org.dromara.stream.config; + +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * RabbitTTL队列 + * @author xbhog + */ +@Configuration +public class RabbitTtlQueueConfig { + + // 延迟队列名称 + public static final String DELAY_QUEUE_NAME = "delay-queue"; + // 延迟交换机名称 + public static final String DELAY_EXCHANGE_NAME = "delay-exchange"; + // 延迟路由键名称 + public static final String DELAY_ROUTING_KEY = "delay.routing.key"; + + // 死信交换机名称 + public static final String DEAD_LETTER_EXCHANGE = "dlx-exchange"; + // 死信队列名称 + public static final String DEAD_LETTER_QUEUE = "dlx-queue"; + // 死信路由键名称 + public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key"; + // 延迟消息的默认 TTL(毫秒) + @Value("${rabbitmq.delay.ttl:5000}") + private long messageTTL; + + // 声明延迟队列 + @Bean + public Queue delayQueue() { + return QueueBuilder.durable(DELAY_QUEUE_NAME) + .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) + .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) + .withArgument("x-message-ttl", messageTTL) + .build(); + } + // 声明延迟交换机 + @Bean + public TopicExchange delayExchange() { + return new TopicExchange(DELAY_EXCHANGE_NAME); + } + // 将延迟队列绑定到延迟交换机 + @Bean + public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY); + } + // 声明死信队列 + @Bean + public Queue deadLetterQueue() { + return new Queue(DEAD_LETTER_QUEUE); + } + // 声明死信交换机 + @Bean + public TopicExchange deadLetterExchange() { + return new TopicExchange(DEAD_LETTER_EXCHANGE); + } + + // 将死信队列绑定到死信交换机 + @Bean + public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) { + return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY); + } +} + diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java new file mode 100644 index 00000000..f815b515 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java @@ -0,0 +1,72 @@ +package org.dromara.stream.controller; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.mq.producer.kafkaMq.KafkaNormalProducer; +import org.dromara.stream.mq.producer.rabbitMq.DelayRabbitProducer; +import org.dromara.stream.mq.producer.rabbitMq.NormalRabbitProducer; +import org.dromara.stream.mq.producer.rocketMq.NormalRocketProducer; +import org.dromara.stream.mq.producer.rocketMq.TransactionRocketProducer; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author xbhog + */ +@Slf4j +@RestController +@RequestMapping("push/message") +public class PushMessageController { + + @Resource + private NormalRabbitProducer normalRabbitProducer; + + @Resource + private DelayRabbitProducer delayRabbitProducer; + + @Resource + private NormalRocketProducer normalRocketProducer; + + @Resource + private TransactionRocketProducer transactionRocketProducer; + + @Resource + private KafkaNormalProducer normalKafkaProducer; + + /** + * rabbit普通消息的处理 + */ + @GetMapping("/rabbitMsg/sendNormal") + public void sendMq() { + normalRabbitProducer.sendMq("hello normal RabbitMsg"); + } + + /** + * rabbit延迟队列类型:类似生产者 + */ + @GetMapping("/rabbitMsg/sendDelay") + public void sendMessage() { + delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg"); + } + + /** + * rockerMQ实例 + * 需要手动创建相关的Topic和group + */ + @GetMapping("/rocketMq/send") + public void sendRockerMq(){ + normalRocketProducer.sendMessage(); + } + @GetMapping("/rocketMq/transactionMsg") + public void sendRockerMqTransactionMsg(){ + transactionRocketProducer.sendTransactionMessage(); + } + /** + * kafkaSpringboot集成 + */ + @GetMapping("/kafkaMsg/send") + public void sendKafkaMsg(){ + normalKafkaProducer.sendKafkaMsg(); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java new file mode 100644 index 00000000..11721153 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java @@ -0,0 +1,24 @@ +package org.dromara.stream.mq.consumer.kafkaMq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/05/19 18:04 + **/ +@Slf4j +@Component +public class KafkaNormalConsumer { + + //默认获取最后一条消息 + @KafkaListener(topics = "test-topic",groupId = "demo") + public void timiKafka(ConsumerRecord record){ + Object key = record.key(); + Object value = record.value(); + log.info("【消费者】received the message key {},value:{}",key,value); + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java new file mode 100644 index 00000000..9d05927a --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java @@ -0,0 +1,36 @@ +package org.dromara.stream.mq.consumer.rabbit; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.config.RabbitConfig; +import org.dromara.stream.config.RabbitTtlQueueConfig; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024年5月18日 + */ +@Slf4j +@Component +public class ConsumerListener { + + /** + * 设置监听哪一个队列 这个队列是RabbitConfig里面设置好的队列名字 + * 普通消息 + **/ + @RabbitListener(queues = RabbitConfig.QUEUE_NAME) + public void listenQueue(Message message) { + log.info("【消费者】Start consuming data:{}",new String(message.getBody())); + } + + /** + * 处理延迟队列的操作 + * 该部分处理的延迟操作在消费上的时间可能与设置的TTl不同; + * 一般会超长;原因是消息可能并不会按时死亡;可通过延迟队列插件处理 + */ + @RabbitListener(queues = RabbitTtlQueueConfig.DEAD_LETTER_QUEUE) + public void receiveMessage(String message){ + log.info("【消费者】Received delayed message:{}",message); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java new file mode 100644 index 00000000..5a95f9c2 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java @@ -0,0 +1,20 @@ +package org.dromara.stream.mq.consumer.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/06/01 16:53 + **/ +@Slf4j +@Component +@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "springboot-mq-consumer-1") +public class NormalRocketConsumer implements RocketMQListener { + @Override + public void onMessage(String message) { + log.info("【消费者】接收消息:{}" ,message); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java new file mode 100644 index 00000000..75ebc954 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java @@ -0,0 +1,21 @@ +package org.dromara.stream.mq.consumer.rocketmq; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/06/01 16:54 + **/ +@Slf4j +@Component +@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction_topic") +public class TransactionRocketConsumer implements RocketMQListener { + + @Override + public void onMessage(String message) { + log.info("【消费者】===>接收事务消息:{}",message); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java new file mode 100644 index 00000000..1b526f8a --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java @@ -0,0 +1,42 @@ +package org.dromara.stream.mq.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/06/01 17:05 + **/ +@Slf4j +@Component +@RocketMQTransactionListener +public class TranscationRocketListener implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { + log.info("执行本地事务"); + String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS")); + if ("TAG-1".equals(tag)) { + //这里只讲TAGA消息提交,状态为可执行 + log.info("【监听器】这里是校验TAG-1;提交状态:COMMIT"); + return RocketMQLocalTransactionState.COMMIT; + } else if ("TAG-2".equals(tag)) { + log.info("【监听器】这里是校验TAG-2;提交状态:ROLLBACK"); + return RocketMQLocalTransactionState.ROLLBACK; + } else if ("TAG-3".equals(tag)) { + log.info("【监听器】这里是校验TAG-3;提交状态:UNKNOWN"); + return RocketMQLocalTransactionState.UNKNOWN; + } + log.info("=========【监听器】提交状态:UNKNOWN"); + return RocketMQLocalTransactionState.UNKNOWN; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message message) { + log.info("【监听器】检查本地交易===>{}", message); + return RocketMQLocalTransactionState.COMMIT; + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java new file mode 100644 index 00000000..db3580ea --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java @@ -0,0 +1,24 @@ +package org.dromara.stream.mq.producer.kafkaMq; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.CompletableFuture; + +/** + * @author xbhog + * @date 2024/05/19 18:02 + **/ +@Slf4j +@Component +public class KafkaNormalProducer { + @Resource + private KafkaTemplate kafkaTemplate; + + public void sendKafkaMsg(){ + CompletableFuture send = kafkaTemplate.send("test-topic","hello", "kafkaTest"); + send.join(); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java new file mode 100644 index 00000000..4c1aeacb --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java @@ -0,0 +1,25 @@ +package org.dromara.stream.mq.producer.rabbitMq; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.config.RabbitTtlQueueConfig; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; + +/** + * @author xbhog + * @date 2024/05/25 17:15 + **/ +@Slf4j +@Component +public class DelayRabbitProducer { + @Resource + private RabbitTemplate rabbitTemplate; + + @GetMapping("/sendDelay") + public void sendDelayMessage(String message) { + rabbitTemplate.convertAndSend(RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message); + log.info("【生产者】Delayed message send: " + message); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java new file mode 100644 index 00000000..af528ff4 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java @@ -0,0 +1,24 @@ +package org.dromara.stream.mq.producer.rabbitMq; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.config.RabbitConfig; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + */ +@Slf4j +@Component +public class NormalRabbitProducer { + + @Resource + RabbitTemplate rabbitTemplate; + + + public void sendMq(String message) { + rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message); + log.info("【生产者】Message send: " + message); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java new file mode 100644 index 00000000..ffd9d106 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java @@ -0,0 +1,25 @@ +package org.dromara.stream.mq.producer.rocketMq; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/06/01 16:49 + **/ +@Slf4j +@Component +public class NormalRocketProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + public void sendMessage(){ + SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build()); + log.info("发送普通同步消息-msg,syncSendMessage===>{}", sendResult); + } +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java new file mode 100644 index 00000000..de6341e3 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java @@ -0,0 +1,41 @@ +package org.dromara.stream.mq.producer.rocketMq; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionSendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * @author xbhog + * @date 2024/06/01 16:54 + **/ +@Slf4j +@Component +public class TransactionRocketProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + public void sendTransactionMessage(){ + List tags = Arrays.asList("TAG-1", "TAG-2", "TAG-3"); + for (int i = 0; i < 3; i++) { + Message message = MessageBuilder.withPayload("===>事务消息-" + i).build(); + //destination formats: `topicName:tags` message – message Message arg – ext arg + TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction_topic:" + tags.get(i), message, i + 1); + if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) { + log.info("【生产者】事物消息发送成功;成功结果:{}",res); + }else{ + log.info("【生产者】事务发送失败:失败原因:{}",res); + } + } + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml new file mode 100644 index 00000000..f8209bca --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -0,0 +1,44 @@ +server: + port: 9402 + +# Spring +spring: + application: + # 应用名称 + name: ruoyi-test-mq + profiles: + # 环境配置 + active: @profiles.active@ + #MQ配置 + rabbitmq: + host: 192.168.1.13 + port: 5672 + username: mq + password: mq + publisher-returns: true + publisher-confirm-type: correlated + kafka: + bootstrap-servers: 192.168.1.13:9092 + producer: + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer +rocketmq: + name-server: 192.168.1.13:9876 + producer: + group: dist-test # 生产者组 +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml b/ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml similarity index 96% rename from ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml rename to ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml index 05075604..e96832ee 100644 --- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml @@ -19,7 +19,8 @@ - + +