From fcfa6dc976cc53a7b8dc8deb797661730f7fbec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9E=A0=E8=81=AA?= <1353683470@qq.com> Date: Thu, 10 Apr 2025 10:04:53 +0000 Subject: [PATCH] =?UTF-8?q?!223=20=E6=9B=B4=E6=96=B0=20RabbitMQ=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=91=E9=80=81=E5=92=8C=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E6=89=8B=E5=8A=A8=E7=A1=AE=E8=AE=A4=E6=9C=BA=E5=88=B6=20*=20?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=20RabbitMQ=20=E4=BC=98=E5=8C=96=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=92=8C=E6=B6=88=E8=B4=B9=E6=89=8B=E5=8A=A8=E7=A1=AE?= =?UTF-8?q?=E8=AE=A4=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../stream/callback/RabbitCallback.java | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/callback/RabbitCallback.java diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/callback/RabbitCallback.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/callback/RabbitCallback.java new file mode 100644 index 00000000..481228c6 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/callback/RabbitCallback.java @@ -0,0 +1,72 @@ +package org.dromara.stream.callback; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.config.RabbitConfig; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Rabbit回调 + * @author JC + */ + +@Slf4j +@Component +public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { + + private static final int MAX_RETRY_COUNT = 3; + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + if (ack) { + log.info("消息发送成功: {}", correlationData); + } else { + log.error("消息发送失败: {}, 原因: {}", correlationData, cause); + handleFailedMessage(correlationData); + } + } + + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("消息返回: ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}, Message: {}", + returnedMessage.getReplyCode(), + returnedMessage.getReplyText(), + returnedMessage.getExchange(), + returnedMessage.getRoutingKey(), + returnedMessage.getMessage()); + retrySendMessage(returnedMessage); + } + + private void handleFailedMessage(CorrelationData correlationData) { + int retryCount = getRetryCount(correlationData); + if (retryCount < MAX_RETRY_COUNT) { + retryCount++; + log.info("正在重试发送消息: {}, 当前重试次数: {}", correlationData, retryCount); + retrySend(correlationData); + } else { + log.error("消息发送失败超过最大重试次数: {}", correlationData); + } + } + + private int getRetryCount(CorrelationData correlationData) { + // 这里可以实现获取重试次数的逻辑,比如从数据库或缓存中获取 + // 为了简单起见,这里返回0 + return 0; + } + + private void retrySend(CorrelationData correlationData) { + String messageContent = correlationData.getId(); + rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, messageContent, correlationData); + } + + private void retrySendMessage(ReturnedMessage returnedMessage) { + log.info("正在重试发送返回的消息: {}", returnedMessage.getMessage()); + rabbitTemplate.convertAndSend(returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage()); + } +}