From 42d8d624ca78821a58e29353f1866ab415607de8 Mon Sep 17 00:00:00 2001 From: Wen JY Date: Thu, 7 Sep 2023 09:58:43 +0800 Subject: [PATCH] =?UTF-8?q?change=20-=20broker=20=E5=90=91=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E4=B8=BB=E9=A2=98=E6=8E=A8=E9=80=81=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=8E=A5=E5=85=A5=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/MqttConnectStatusListener.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java index 4951089..461c4d2 100644 --- a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java @@ -16,9 +16,12 @@ package com.hw.mqtt.listener; +import com.alibaba.fastjson2.JSONArray; +import com.hw.mqtt.domain.AjaxResult; import com.hw.mqtt.enums.RedisKeys; import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; import net.dreamlu.mica.redis.cache.MicaRedisCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,11 @@ import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import org.tio.core.ChannelContext; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + /** * 状态监听器 * 性能损失小 @@ -44,6 +52,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm private final ApplicationContext context; private final MicaRedisCache redisCache; private MqttServerCreator serverCreator; + private MqttServerTemplate mqttServerTemplate; public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) { this.context = context; @@ -54,12 +63,14 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm public void online(ChannelContext context, String clientId, String username) { logger.info("Mqtt clientId:{} username:{} online.", clientId, username); redisCache.sAdd(getRedisKey(), clientId); + pushConnectStatus(clientId,1); } @Override public void offline(ChannelContext context, String clientId, String username, String reason) { logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason); redisCache.sRem(getRedisKey(), clientId); + pushConnectStatus(clientId,2); } /** @@ -74,6 +85,7 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm @Override public void afterSingletonsInstantiated() { this.serverCreator = context.getBean(MqttServerCreator.class); + this.mqttServerTemplate = context.getBean(MqttServerTemplate.class); } @Override @@ -81,4 +93,25 @@ public class MqttConnectStatusListener implements IMqttConnectStatusListener, Sm // 停机时删除集合 redisCache.del(getRedisKey()); } + + /** + * 推送设备状态到指定主题 + * @param clientId + * @param connectStatus + */ + public void pushConnectStatus(String clientId,Integer connectStatus){ + Map entityMap = new HashMap<>(); + entityMap.put("msg","设备设备连接状态信息"); + entityMap.put("deviceType","edge"); + entityMap.put("deviceCode",clientId); + entityMap.put("connectStatus",connectStatus); + entityMap.put("statusTime",System.currentTimeMillis()); + String jsonString = JSONArray.toJSONString(entityMap); + boolean result = mqttServerTemplate.publishAll("/device/status/v1", jsonString.getBytes(StandardCharsets.UTF_8)); + if(result){ + logger.info("客户端:"+clientId+";"+ (connectStatus == 1 ? "连接" :"断开") +"状态推送成功"); + }else { + logger.info("客户端:"+clientId+";"+ (connectStatus == 1 ? "连接" :"断开") +"状态推送失败"); + } + } }