diff --git a/pom.xml b/pom.xml
index 9c965722..9772dea0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,7 @@
1.3.6
0.2.0
1.16.6
+ 2.3.0
2.7.0
@@ -384,6 +385,13 @@
${mapstruct-plus.version}
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ ${rocketmq-version}
+
+
diff --git a/ruoyi-example/pom.xml b/ruoyi-example/pom.xml
index eebbb562..1670137e 100644
--- a/ruoyi-example/pom.xml
+++ b/ruoyi-example/pom.xml
@@ -10,7 +10,7 @@
ruoyi-demo
- ruoyi-stream-mq
+ ruoyi-test-mq
ruoyi-example
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java
deleted file mode 100644
index bbfee282..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/controller/TestMqController.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.dromara.stream.controller;
-
-import org.dromara.common.core.domain.R;
-import org.dromara.stream.mq.producer.DelayProducer;
-import org.dromara.stream.mq.producer.LogStreamProducer;
-import org.dromara.stream.mq.producer.TestStreamProducer;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * 测试mq
- */
-@Slf4j
-@RestController
-@AllArgsConstructor
-@RequestMapping("/test-mq")
-public class TestMqController {
-
- private final DelayProducer delayProducer;
- private final TestStreamProducer testStreamProducer;
- private final LogStreamProducer logStreamProducer;
-
- /**
- * 发送消息Rabbitmq
- *
- * @param msg 消息内容
- * @param delay 延时时间
- */
- @GetMapping("/sendRabbitmq")
- public R sendRabbitmq(String msg, Long delay) {
- delayProducer.sendMsg(msg, delay);
- return R.ok();
- }
-
- /**
- * 发送消息Rocketmq
- *
- * @param msg 消息内容
- */
- @GetMapping("/sendRocketmq")
- public R sendRocketmq(String msg) {
- testStreamProducer.streamTestMsg(msg);
- return R.ok();
- }
-
- /**
- * 发送消息Kafka
- *
- * @param msg 消息内容
- */
- @GetMapping("/sendKafka")
- public R sendKafka(String msg) {
- logStreamProducer.streamLogMsg(msg);
- return R.ok();
- }
-
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java
deleted file mode 100644
index 9759beed..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/TestMessaging.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.dromara.stream.mq;
-
-import lombok.Data;
-import lombok.experimental.Accessors;
-
-/**
- * @author Lion Li
- */
-@Data
-@Accessors(chain = true)
-public class TestMessaging {
- /**
- * 消息id
- */
- private String msgId;
- /**
- * 消息内容
- */
- private String msgText;
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java
deleted file mode 100644
index 60452868..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/DelayConsumer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.dromara.stream.mq.consumer;
-
-
-import org.dromara.stream.mq.TestMessaging;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.function.Consumer;
-
-@Slf4j
-@Component
-public class DelayConsumer {
-
- @Bean
- Consumer delay() {
- log.info("初始化订阅");
- return obj -> {
- log.info("消息接收成功:" + obj);
- };
- }
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java
deleted file mode 100644
index b21eb551..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/LogStreamConsumer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.dromara.stream.mq.consumer;
-
-import org.dromara.stream.mq.TestMessaging;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.function.Consumer;
-
-@Slf4j
-@Component
-public class LogStreamConsumer {
-
- @Bean
- Consumer log() {
- log.info("初始化订阅");
- return msg -> {
- log.info("通过stream消费到消息 => {}", msg.toString());
- };
- }
-
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java
deleted file mode 100644
index 0163c780..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/consumer/TestStreamConsumer.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.dromara.stream.mq.consumer;
-
-import org.dromara.stream.mq.TestMessaging;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.annotation.Bean;
-import org.springframework.stereotype.Component;
-
-import java.util.function.Consumer;
-
-@Slf4j
-@Component
-public class TestStreamConsumer {
-
- @Bean
- Consumer demo() {
- log.info("初始化订阅");
- return msg -> {
- log.info("通过stream消费到消息 => {}", msg.toString());
- };
- }
-
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java
deleted file mode 100644
index b879d916..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/DelayProducer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.dromara.stream.mq.producer;
-
-import org.dromara.stream.mq.TestMessaging;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-@Component
-public class DelayProducer {
-
- @Autowired
- private StreamBridge streamBridge;
-
- public void sendMsg(String msg, Long delay) {
- // 构建消息对象
- TestMessaging testMessaging = new TestMessaging()
- .setMsgId(UUID.randomUUID().toString())
- .setMsgText(msg);
- Message message = MessageBuilder.withPayload(testMessaging)
- .setHeader("x-delay", delay).build();
- streamBridge.send("delay-out-0", message);
- }
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java
deleted file mode 100644
index bff39240..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/LogStreamProducer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.dromara.stream.mq.producer;
-
-import org.dromara.stream.mq.TestMessaging;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-@Component
-public class LogStreamProducer {
-
- @Autowired
- private StreamBridge streamBridge;
-
- public void streamLogMsg(String msg) {
- // 构建消息对象
- TestMessaging testMessaging = new TestMessaging()
- .setMsgId(UUID.randomUUID().toString())
- .setMsgText(msg);
- streamBridge.send("log-out-0", MessageBuilder.withPayload(testMessaging).build());
- }
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java
deleted file mode 100644
index a23c1fc5..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/mq/producer/TestStreamProducer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.dromara.stream.mq.producer;
-
-import org.dromara.stream.mq.TestMessaging;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.function.StreamBridge;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-@Component
-public class TestStreamProducer {
-
- @Autowired
- private StreamBridge streamBridge;
-
- public void streamTestMsg(String msg) {
- // 构建消息对象
- TestMessaging testMessaging = new TestMessaging()
- .setMsgId(UUID.randomUUID().toString())
- .setMsgText(msg);
- streamBridge.send("demo-out-0", MessageBuilder.withPayload(testMessaging).build());
- }
-}
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml
deleted file mode 100644
index a3342005..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml
+++ /dev/null
@@ -1,109 +0,0 @@
-server:
- port: 9402
-
-# Spring
-spring:
- application:
- # 应用名称
- name: ruoyi-stream-mq
- profiles:
- # 环境配置
- active: @profiles.active@
- cloud:
- stream:
- function:
- # 重点配置 与 binding 名与消费者对应
- definition: delay;demo;log
-
---- # rabbitmq 配置
-spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: root
- password: root
- cloud:
- stream:
- rabbit:
- bindings:
- delay-in-0:
- consumer:
- delayedExchange: true
- delay-out-0:
- producer:
- delayedExchange: true
- bindings:
- delay-in-0:
- destination: delay.exchange.cloud
- content-type: application/json
- group: delay-group
- binder: rabbit
- delay-out-0:
- destination: delay.exchange.cloud
- content-type: application/json
- group: delay-group
- binder: rabbit
-
---- # rocketmq 配置
-spring:
- cloud:
- stream:
- rocketmq:
- binder:
- # rocketmq 地址
- name-server: localhost:9876
- bindings:
- demo-out-0:
- producer:
- # 必须得写
- group: default
- bindings:
- demo-out-0:
- content-type: application/json
- destination: stream-test-topic
- group: test-group
- binder: rocketmq
- demo-in-0:
- content-type: application/json
- destination: stream-test-topic
- group: test-group
- binder: rocketmq
-
---- # kafka 配置
-spring:
- cloud:
- stream:
- kafka:
- binder:
- brokers: localhost:9092
- bindings:
- log-out-0:
- destination: stream-log-topic
- contentType: application/json
- group: log_group
- binder: kafka
- log-in-0:
- destination: stream-log-topic
- contentType: application/json
- group: log_group
- binder: kafka
-
---- # nacos 配置
-spring:
- cloud:
- nacos:
- # nacos 服务地址
- server-addr: @nacos.server@
- username: @nacos.username@
- password: @nacos.password@
- discovery:
- # 注册组
- group: @nacos.discovery.group@
- namespace: ${spring.profiles.active}
- config:
- # 配置组
- group: @nacos.config.group@
- namespace: ${spring.profiles.active}
- config:
- import:
- - optional:nacos:application-common.yml
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt b/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt
deleted file mode 100644
index 8a8054c1..00000000
--- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-Spring Boot Version: ${spring-boot.version}
-Spring Application Name: ${spring.application.name}
- _ _
- (_) | |
- _ __ _ _ ___ _ _ _ ______ ___| |_ _ __ ___ __ _ _ __ ___ ______ _ __ ___ __ _
-| '__| | | |/ _ \| | | | |______/ __| __| '__/ _ \/ _` | '_ ` _ \______| '_ ` _ \ / _` |
-| | | |_| | (_) | |_| | | \__ \ |_| | | __/ (_| | | | | | | | | | | | | (_| |
-|_| \__,_|\___/ \__, |_| |___/\__|_| \___|\__,_|_| |_| |_| |_| |_| |_|\__, |
- __/ | | |
- |___/ |_|
diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md
new file mode 100644
index 00000000..9fba827c
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/README.md
@@ -0,0 +1,102 @@
+# ruoyi-test-mq
+
+## 模块说明
+
+该模块基于需求【[将spring-cloud-stream改为普通spring的mq依赖用法】下修改编写;
+
+原模块缺点:功能复杂学习成本高 大部分用户用不明白 功能封闭 特性无法使用 项目中基本不会有切换mq的事情发生;
+
+现模块:集成基础的rabbit、rocketmq、kafka等主流的中间件,功能包含
+
+1. rabbit: 普通消息、延迟队列
+2. rocket:普通消息、事务消息
+3. kafka:普通消息
+
+后续可实现的:
+
+1. kafka stream流的使用
+2. rocket 顺序、异步、延时等
+
+## 项目目录
+
+```xml
+├─src
+│ └─main
+│ ├─java
+│ │ └─org
+│ │ └─dromara
+│ │ └─stream
+│ │ │ RuoYiTestMqApplication.java
+│ │ │
+│ │ ├─config
+│ │ │ RabbitConfig.java 普通消息配置类
+│ │ │ RabbitTtlQueueConfig.java 延迟队列配置类
+│ │ │
+│ │ ├─controller 测试类
+│ │ │ PushMessageController.java
+│ │ │
+│ │ └─mq
+│ │ ├─consumer
+│ │ │ ├─kafkaMq
+│ │ │ │ KafkaNormalConsumer.java
+│ │ │ ├─rabbit
+│ │ │ │ ConsumerListener.java
+│ │ │ └─rocketmq
+│ │ │ NormalRocketConsumer.java
+│ │ │ TransactionRocketConsumer.java
+│ │ ├─listener
+│ │ │ TranscationRocketListener.java
+│ │ └─producer
+│ │ ├─kafkaMq
+│ │ │ KafkaNormalProducer.java
+│ │ ├─rabbitMq
+│ │ │ DelayRabbitProducer.java
+│ │ │ NormalRabbitProducer.java
+│ │ └─rocketMq
+│ │ NormalRocketProducer.java
+│ │ TransactionRocketProducer.java
+│ │
+│ └─resources
+│ application.yml IP:Host根据实际情况替换
+│ logback-plus.xml
+```
+
+## 使用方式
+
+rocketmq:
+
+**注意:需要进入到rockerMQ文件路径中执行**
+
+创建普通消息的topic
+
+```sh
+sh mqadmin updateTopic -n -t -c -a +message.type=NORMAL
+```
+
+```shell
+bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster
+```
+
+创建事务消息的topic
+
+```sh
+sh mqadmin updateTopic -n -t -c -a +message.type=TRANSACTION
+```
+
+```shell
+bin/mqadmin updatetopic -n localhost:9876 -t transaction_topic -c DefaultCluster -a +message.type=TRANSACTION
+```
+
+kafka:
+
+```shell
+kafka-topics.sh --create --topic --bootstrap-server --partitions --replication-factor
+```
+
+```shell
+kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
+```
+
+## 验证方式
+
+可通过`PushMessageController`实现`Restful`进行测试;
\ No newline at end of file
diff --git a/ruoyi-example/ruoyi-stream-mq/pom.xml b/ruoyi-example/ruoyi-test-mq/pom.xml
similarity index 82%
rename from ruoyi-example/ruoyi-stream-mq/pom.xml
rename to ruoyi-example/ruoyi-test-mq/pom.xml
index 4c5064b8..88cdb8f5 100644
--- a/ruoyi-example/ruoyi-stream-mq/pom.xml
+++ b/ruoyi-example/ruoyi-test-mq/pom.xml
@@ -9,10 +9,10 @@
4.0.0
- ruoyi-stream-mq
+ ruoyi-test-mq
- ruoyi-stream-mq SpringCloud-Stream-MQ 案例项目
+ ruoyi-test-mq 案例项目
@@ -21,21 +21,19 @@
org.dromara
ruoyi-common-nacos
-
- org.springframework.cloud
- spring-cloud-starter-stream-rabbit
+ org.springframework.boot
+ spring-boot-starter-amqp
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
+
+ org.springframework.kafka
+ spring-kafka
-
- com.alibaba.cloud
- spring-cloud-starter-stream-rocketmq
-
-
-
- org.springframework.cloud
- spring-cloud-starter-stream-kafka
-
org.dromara
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java
similarity index 84%
rename from ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java
rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java
index 2c46467a..a705c67b 100644
--- a/ruoyi-example/ruoyi-stream-mq/src/main/java/org/dromara/stream/RuoYiStreamMqApplication.java
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/RuoYiTestMqApplication.java
@@ -5,15 +5,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
/**
- * SpringCloud-Stream-MQ 案例项目
- *
+ * SpringBoot-MQ 案例项目
* @author Lion Li
*/
@SpringBootApplication
-public class RuoYiStreamMqApplication {
+public class RuoYiTestMqApplication {
public static void main(String[] args) {
- SpringApplication application = new SpringApplication(RuoYiStreamMqApplication.class);
+ SpringApplication application = new SpringApplication(RuoYiTestMqApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ ");
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java
new file mode 100644
index 00000000..3995599c
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java
@@ -0,0 +1,54 @@
+package org.dromara.stream.config;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author xbhog
+ */
+@Configuration
+public class RabbitConfig {
+
+ public static final String EXCHANGE_NAME = "demo-exchange";
+ public static final String QUEUE_NAME = "demo-queue";
+ public static final String ROUTING_KEY = "demo.routing.key";
+ /**
+ * 创建交换机
+ * ExchangeBuilder有四种交换机模式
+ * Direct Exchange:直连交换机,根据Routing Key(路由键)进行投递到不同队列。
+ * Fanout Exchange:扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。
+ * Topic Exchange:主题交换机,对路由键进行模式匹配后进行投递,符号#表示一个或多个词,*表示一个词。
+ * Header Exchange:头交换机,不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。
+ * durable 交换器是否持久化(false 不持久化,true 持久化)
+ **/
+ @Bean
+ public TopicExchange exchange() {
+ return new TopicExchange(EXCHANGE_NAME);
+ }
+ /**
+ * 创建队列
+ * durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码
+ * deliveryMode 消息是否持久化(1 不持久化,2 持久化)
+ **/
+ @Bean
+ public Queue queue() {
+ return new Queue(QUEUE_NAME, false);
+ }
+ /**
+ * 绑定交换机和队列
+ * bing 方法参数可以是队列和交换机
+ * to 方法参数必须是交换机
+ * with 方法参数是路由Key 这里是以rabbit.开头
+ * noargs 就是不要参数的意思
+ * 这个方法的意思是把rabbit开头的消息 和 上面的队列 和 上面的交换机绑定
+ **/
+ @Bean
+ public Binding binding(Queue queue, TopicExchange exchange) {
+ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
+ }
+
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java
new file mode 100644
index 00000000..9fc9693a
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java
@@ -0,0 +1,69 @@
+package org.dromara.stream.config;
+
+import org.springframework.amqp.core.*;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+/**
+ * RabbitTTL队列
+ * @author xbhog
+ */
+@Configuration
+public class RabbitTtlQueueConfig {
+
+ // 延迟队列名称
+ public static final String DELAY_QUEUE_NAME = "delay-queue";
+ // 延迟交换机名称
+ public static final String DELAY_EXCHANGE_NAME = "delay-exchange";
+ // 延迟路由键名称
+ public static final String DELAY_ROUTING_KEY = "delay.routing.key";
+
+ // 死信交换机名称
+ public static final String DEAD_LETTER_EXCHANGE = "dlx-exchange";
+ // 死信队列名称
+ public static final String DEAD_LETTER_QUEUE = "dlx-queue";
+ // 死信路由键名称
+ public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";
+ // 延迟消息的默认 TTL(毫秒)
+ @Value("${rabbitmq.delay.ttl:5000}")
+ private long messageTTL;
+
+ // 声明延迟队列
+ @Bean
+ public Queue delayQueue() {
+ return QueueBuilder.durable(DELAY_QUEUE_NAME)
+ .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
+ .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)
+ .withArgument("x-message-ttl", messageTTL)
+ .build();
+ }
+ // 声明延迟交换机
+ @Bean
+ public TopicExchange delayExchange() {
+ return new TopicExchange(DELAY_EXCHANGE_NAME);
+ }
+ // 将延迟队列绑定到延迟交换机
+ @Bean
+ public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) {
+ return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY);
+ }
+ // 声明死信队列
+ @Bean
+ public Queue deadLetterQueue() {
+ return new Queue(DEAD_LETTER_QUEUE);
+ }
+ // 声明死信交换机
+ @Bean
+ public TopicExchange deadLetterExchange() {
+ return new TopicExchange(DEAD_LETTER_EXCHANGE);
+ }
+
+ // 将死信队列绑定到死信交换机
+ @Bean
+ public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
+ return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
+ }
+}
+
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java
new file mode 100644
index 00000000..f815b515
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java
@@ -0,0 +1,72 @@
+package org.dromara.stream.controller;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.stream.mq.producer.kafkaMq.KafkaNormalProducer;
+import org.dromara.stream.mq.producer.rabbitMq.DelayRabbitProducer;
+import org.dromara.stream.mq.producer.rabbitMq.NormalRabbitProducer;
+import org.dromara.stream.mq.producer.rocketMq.NormalRocketProducer;
+import org.dromara.stream.mq.producer.rocketMq.TransactionRocketProducer;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * @author xbhog
+ */
+@Slf4j
+@RestController
+@RequestMapping("push/message")
+public class PushMessageController {
+
+ @Resource
+ private NormalRabbitProducer normalRabbitProducer;
+
+ @Resource
+ private DelayRabbitProducer delayRabbitProducer;
+
+ @Resource
+ private NormalRocketProducer normalRocketProducer;
+
+ @Resource
+ private TransactionRocketProducer transactionRocketProducer;
+
+ @Resource
+ private KafkaNormalProducer normalKafkaProducer;
+
+ /**
+ * rabbit普通消息的处理
+ */
+ @GetMapping("/rabbitMsg/sendNormal")
+ public void sendMq() {
+ normalRabbitProducer.sendMq("hello normal RabbitMsg");
+ }
+
+ /**
+ * rabbit延迟队列类型:类似生产者
+ */
+ @GetMapping("/rabbitMsg/sendDelay")
+ public void sendMessage() {
+ delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg");
+ }
+
+ /**
+ * rockerMQ实例
+ * 需要手动创建相关的Topic和group
+ */
+ @GetMapping("/rocketMq/send")
+ public void sendRockerMq(){
+ normalRocketProducer.sendMessage();
+ }
+ @GetMapping("/rocketMq/transactionMsg")
+ public void sendRockerMqTransactionMsg(){
+ transactionRocketProducer.sendTransactionMessage();
+ }
+ /**
+ * kafkaSpringboot集成
+ */
+ @GetMapping("/kafkaMsg/send")
+ public void sendKafkaMsg(){
+ normalKafkaProducer.sendKafkaMsg();
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java
new file mode 100644
index 00000000..11721153
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java
@@ -0,0 +1,24 @@
+package org.dromara.stream.mq.consumer.kafkaMq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024/05/19 18:04
+ **/
+@Slf4j
+@Component
+public class KafkaNormalConsumer {
+
+ //默认获取最后一条消息
+ @KafkaListener(topics = "test-topic",groupId = "demo")
+ public void timiKafka(ConsumerRecord record){
+ Object key = record.key();
+ Object value = record.value();
+ log.info("【消费者】received the message key {},value:{}",key,value);
+ }
+
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java
new file mode 100644
index 00000000..9d05927a
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java
@@ -0,0 +1,36 @@
+package org.dromara.stream.mq.consumer.rabbit;
+
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.stream.config.RabbitConfig;
+import org.dromara.stream.config.RabbitTtlQueueConfig;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024年5月18日
+ */
+@Slf4j
+@Component
+public class ConsumerListener {
+
+ /**
+ * 设置监听哪一个队列 这个队列是RabbitConfig里面设置好的队列名字
+ * 普通消息
+ **/
+ @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
+ public void listenQueue(Message message) {
+ log.info("【消费者】Start consuming data:{}",new String(message.getBody()));
+ }
+
+ /**
+ * 处理延迟队列的操作
+ * 该部分处理的延迟操作在消费上的时间可能与设置的TTl不同;
+ * 一般会超长;原因是消息可能并不会按时死亡;可通过延迟队列插件处理
+ */
+ @RabbitListener(queues = RabbitTtlQueueConfig.DEAD_LETTER_QUEUE)
+ public void receiveMessage(String message){
+ log.info("【消费者】Received delayed message:{}",message);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java
new file mode 100644
index 00000000..5a95f9c2
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java
@@ -0,0 +1,20 @@
+package org.dromara.stream.mq.consumer.rocketmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 16:53
+ **/
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "springboot-mq-consumer-1")
+public class NormalRocketConsumer implements RocketMQListener {
+ @Override
+ public void onMessage(String message) {
+ log.info("【消费者】接收消息:{}" ,message);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java
new file mode 100644
index 00000000..75ebc954
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java
@@ -0,0 +1,21 @@
+package org.dromara.stream.mq.consumer.rocketmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 16:54
+ **/
+@Slf4j
+@Component
+@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction_topic")
+public class TransactionRocketConsumer implements RocketMQListener {
+
+ @Override
+ public void onMessage(String message) {
+ log.info("【消费者】===>接收事务消息:{}",message);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java
new file mode 100644
index 00000000..1b526f8a
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java
@@ -0,0 +1,42 @@
+package org.dromara.stream.mq.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 17:05
+ **/
+@Slf4j
+@Component
+@RocketMQTransactionListener
+public class TranscationRocketListener implements RocketMQLocalTransactionListener {
+ @Override
+ public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
+ log.info("执行本地事务");
+ String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
+ if ("TAG-1".equals(tag)) {
+ //这里只讲TAGA消息提交,状态为可执行
+ log.info("【监听器】这里是校验TAG-1;提交状态:COMMIT");
+ return RocketMQLocalTransactionState.COMMIT;
+ } else if ("TAG-2".equals(tag)) {
+ log.info("【监听器】这里是校验TAG-2;提交状态:ROLLBACK");
+ return RocketMQLocalTransactionState.ROLLBACK;
+ } else if ("TAG-3".equals(tag)) {
+ log.info("【监听器】这里是校验TAG-3;提交状态:UNKNOWN");
+ return RocketMQLocalTransactionState.UNKNOWN;
+ }
+ log.info("=========【监听器】提交状态:UNKNOWN");
+ return RocketMQLocalTransactionState.UNKNOWN;
+ }
+
+ @Override
+ public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
+ log.info("【监听器】检查本地交易===>{}", message);
+ return RocketMQLocalTransactionState.COMMIT;
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java
new file mode 100644
index 00000000..db3580ea
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java
@@ -0,0 +1,24 @@
+package org.dromara.stream.mq.producer.kafkaMq;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * @author xbhog
+ * @date 2024/05/19 18:02
+ **/
+@Slf4j
+@Component
+public class KafkaNormalProducer {
+ @Resource
+ private KafkaTemplate kafkaTemplate;
+
+ public void sendKafkaMsg(){
+ CompletableFuture send = kafkaTemplate.send("test-topic","hello", "kafkaTest");
+ send.join();
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java
new file mode 100644
index 00000000..4c1aeacb
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java
@@ -0,0 +1,25 @@
+package org.dromara.stream.mq.producer.rabbitMq;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.stream.config.RabbitTtlQueueConfig;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.GetMapping;
+
+/**
+ * @author xbhog
+ * @date 2024/05/25 17:15
+ **/
+@Slf4j
+@Component
+public class DelayRabbitProducer {
+ @Resource
+ private RabbitTemplate rabbitTemplate;
+
+ @GetMapping("/sendDelay")
+ public void sendDelayMessage(String message) {
+ rabbitTemplate.convertAndSend(RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message);
+ log.info("【生产者】Delayed message send: " + message);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java
new file mode 100644
index 00000000..af528ff4
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java
@@ -0,0 +1,24 @@
+package org.dromara.stream.mq.producer.rabbitMq;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.stream.config.RabbitConfig;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ */
+@Slf4j
+@Component
+public class NormalRabbitProducer {
+
+ @Resource
+ RabbitTemplate rabbitTemplate;
+
+
+ public void sendMq(String message) {
+ rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);
+ log.info("【生产者】Message send: " + message);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java
new file mode 100644
index 00000000..ffd9d106
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java
@@ -0,0 +1,25 @@
+package org.dromara.stream.mq.producer.rocketMq;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 16:49
+ **/
+@Slf4j
+@Component
+public class NormalRocketProducer {
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ public void sendMessage(){
+ SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build());
+ log.info("发送普通同步消息-msg,syncSendMessage===>{}", sendResult);
+ }
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java
new file mode 100644
index 00000000..de6341e3
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java
@@ -0,0 +1,41 @@
+package org.dromara.stream.mq.producer.rocketMq;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionSendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author xbhog
+ * @date 2024/06/01 16:54
+ **/
+@Slf4j
+@Component
+public class TransactionRocketProducer {
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ public void sendTransactionMessage(){
+ List tags = Arrays.asList("TAG-1", "TAG-2", "TAG-3");
+ for (int i = 0; i < 3; i++) {
+ Message message = MessageBuilder.withPayload("===>事务消息-" + i).build();
+ //destination formats: `topicName:tags` message – message Message arg – ext arg
+ TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction_topic:" + tags.get(i), message, i + 1);
+ if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {
+ log.info("【生产者】事物消息发送成功;成功结果:{}",res);
+ }else{
+ log.info("【生产者】事务发送失败:失败原因:{}",res);
+ }
+ }
+ }
+
+}
diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml
new file mode 100644
index 00000000..f8209bca
--- /dev/null
+++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml
@@ -0,0 +1,44 @@
+server:
+ port: 9402
+
+# Spring
+spring:
+ application:
+ # 应用名称
+ name: ruoyi-test-mq
+ profiles:
+ # 环境配置
+ active: @profiles.active@
+ #MQ配置
+ rabbitmq:
+ host: 192.168.1.13
+ port: 5672
+ username: mq
+ password: mq
+ publisher-returns: true
+ publisher-confirm-type: correlated
+ kafka:
+ bootstrap-servers: 192.168.1.13:9092
+ producer:
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+rocketmq:
+ name-server: 192.168.1.13:9876
+ producer:
+ group: dist-test # 生产者组
+--- # nacos 配置
+spring:
+ cloud:
+ nacos:
+ # nacos 服务地址
+ server-addr: @nacos.server@
+ discovery:
+ # 注册组
+ group: @nacos.discovery.group@
+ namespace: ${spring.profiles.active}
+ config:
+ # 配置组
+ group: @nacos.config.group@
+ namespace: ${spring.profiles.active}
+ config:
+ import:
+ - optional:nacos:application-common.yml
diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml b/ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml
similarity index 96%
rename from ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml
rename to ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml
index 05075604..e96832ee 100644
--- a/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback-plus.xml
+++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/logback-plus.xml
@@ -19,7 +19,8 @@
-
+
+