diff --git a/ruoyi-modules/hw-mqtt-broker/pom.xml b/ruoyi-modules/hw-mqtt-broker/pom.xml new file mode 100644 index 0000000..e89901b --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/pom.xml @@ -0,0 +1,136 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.7.9 + + + + hw-mqtt-broker + + MQTT Broker + + + 2.1.0 + 1.8 + UTF-8 + 2.7.9 + 2.7.9 + 1.2.83 + 2.6.0 + 5.9.2 + + + + + net.dreamlu + mica-mqtt-server-spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + net.dreamlu + mica-lite + + + net.dreamlu + mica-logging + + + net.dreamlu + mica-redis + + + net.dreamlu + mica-openapi + + + + org.springframework.boot + spring-boot-starter-actuator + + + io.micrometer + micrometer-registry-prometheus + + + org.projectlombok + lombok + provided + + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring.boot.version} + pom + import + + + net.dreamlu + mica-bom + ${mica.version} + pom + import + + + com.alibaba + fastjson + ${fastjson.version} + + + net.dreamlu + mica-mqtt-server-spring-boot-starter + ${revision} + + + + org.tinylog + slf4j-tinylog + ${tinylog.version} + + + org.tinylog + tinylog-impl + ${tinylog.version} + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java new file mode 100644 index 0000000..f02f687 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java @@ -0,0 +1,13 @@ +package com.hw.mqtt; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class HwMqttBrokerApplication { + + public static void main(String[] args) { + SpringApplication.run(HwMqttBrokerApplication.class, args); + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java new file mode 100644 index 0000000..c69d843 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java @@ -0,0 +1,22 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler; +import org.springframework.context.annotation.Configuration; +import org.tio.core.ChannelContext; + +/** + * mqtt tcp、websocket 认证 + * @author WenJY + * @date 2023-03-14 12:19 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttAuthHandler implements IMqttServerAuthHandler { + + @Override + public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) { + // 客户端认证逻辑实现 + return true; + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java new file mode 100644 index 0000000..2c4a374 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java @@ -0,0 +1,37 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; +import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.annotation.Configuration; +import org.tio.http.common.HttpRequest; +import org.tio.http.common.HttpResponse; + +/** + * mqtt http 接口认证 + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttHttpAuthFilter implements HttpFilter, InitializingBean { + + @Override + public boolean filter(HttpRequest request) throws Exception { + // 自行实现逻辑 + return true; + } + + @Override + public HttpResponse response(HttpRequest request) { + // 认证不通过时的响应 + return Result.fail(request, ResultCode.E103); + } + + @Override + public void afterPropertiesSet() throws Exception { + MqttHttpRoutes.addFilter(this); + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java new file mode 100644 index 0000000..733b153 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java @@ -0,0 +1,23 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator; +import org.springframework.context.annotation.Configuration; +import org.tio.core.ChannelContext; + +/** + * 自定义订阅校验 + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttSubscribeValidator implements IMqttServerSubscribeValidator { + + @Override + public boolean isValid(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) { + // 校验客户端订阅的 topic,校验成功返回 true,失败返回 false + return true; + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java new file mode 100644 index 0000000..84f5a2b --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java @@ -0,0 +1,22 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService; +import org.springframework.context.annotation.Configuration; +import org.tio.core.ChannelContext; + +/** + * 自定义 clientId + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttUniqueIdService implements IMqttServerUniqueIdService { + + @Override + public String getUniqueId(ChannelContext context, String clientId, String userName, String password) { + // 返回的 uniqueId 会替代 mqtt client 传过来的 clientId,请保证返回的 uniqueId 唯一。 + return clientId; + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/controller/ServerController.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/controller/ServerController.java new file mode 100644 index 0000000..1f75b87 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/controller/ServerController.java @@ -0,0 +1,81 @@ +package com.hw.mqtt.controller; + +import com.hw.mqtt.domain.AjaxResult; +import com.hw.mqtt.service.IMqttBrokerService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; + +/** + * Broker Http 接口 + */ +@Tag(name = "Mqtt::服务端") +@RequestMapping("/mqtt/server") +@RestController +public class ServerController { + @Autowired + private IMqttBrokerService service; + + @Operation(summary = "Broker发布消息") + @PostMapping("/publish") + public AjaxResult publish(String topic,String payload) { + AjaxResult ajaxResult; + boolean result = false; + try{ + result = service.publish(topic,payload); + ajaxResult = AjaxResult.success(result); + }catch (Exception ex){ + ajaxResult = AjaxResult.error("Broker发布消息异常:"+ex.getMessage(),result); + } + return ajaxResult; + } + + @Operation(summary = "获取在线客户端数量") + @GetMapping("/getOnlineClientSize") + public AjaxResult getOnlineClientSize() { + AjaxResult ajaxResult; + long onlineClientSize = 0; + try{ + onlineClientSize = service.getOnlineClientSize(); + ajaxResult = AjaxResult.success(onlineClientSize); + }catch (Exception ex){ + ajaxResult = AjaxResult.error("获取在线客户端数量异常:"+ex.getMessage(),onlineClientSize); + } + return ajaxResult; + } + + @Operation(summary = "获取在线客户端ID") + @GetMapping("/getOnlineClients") + public AjaxResult getOnlineClients() { + AjaxResult ajaxResult; + List nodes = new ArrayList<>(); + try{ + nodes = service.getOnlineClients(); + ajaxResult = AjaxResult.success(nodes); + }catch (Exception ex){ + ajaxResult = AjaxResult.error("获取在线客户端ID异常:"+ex.getMessage(),nodes); + } + return ajaxResult; + } + + @Operation(summary = "主动关闭指定客户端连接") + @PostMapping("/closeClientById") + public AjaxResult closeClientById(String clientId) { + AjaxResult ajaxResult; + boolean result = false; + try{ + result = service.closeClientById(clientId); + ajaxResult = AjaxResult.success(result); + }catch (Exception ex){ + ajaxResult = AjaxResult.error("主动关闭指定客户端异常:"+ex.getMessage(),result); + } + return ajaxResult; + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java new file mode 100644 index 0000000..763eb0e --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/AjaxResult.java @@ -0,0 +1,225 @@ +package com.hw.mqtt.domain; +import java.util.HashMap; +import java.util.Objects; +/** + * @author Wen JY + * @description: TODO 操作消息提醒 + * @date 2023-08-17 13:33:31 + * @version: 1.0 + */ +public class AjaxResult extends HashMap +{ + private static final long serialVersionUID = 1L; + + /** 状态码 */ + public static final String CODE_TAG = "code"; + + /** 返回内容 */ + public static final String MSG_TAG = "msg"; + + /** 数据对象 */ + public static final String DATA_TAG = "data"; + + /** + * 状态类型 + */ + public enum Type + { + /** 成功 */ + SUCCESS(0), + /** 警告 */ + WARN(301), + /** 错误 */ + ERROR(500); + private final int value; + + Type(int value) + { + this.value = value; + } + + public int value() + { + return this.value; + } + } + + /** + * 初始化一个新创建的 AjaxResult 对象,使其表示一个空消息。 + */ + public AjaxResult() + { + } + + /** + * 初始化一个新创建的 AjaxResult 对象 + * + * @param type 状态类型 + * @param msg 返回内容 + */ + public AjaxResult(Type type, String msg) + { + super.put(CODE_TAG, type.value); + super.put(MSG_TAG, msg); + } + + /** + * 初始化一个新创建的 AjaxResult 对象 + * + * @param type 状态类型 + * @param msg 返回内容 + * @param data 数据对象 + */ + public AjaxResult(Type type, String msg, Object data) + { + super.put(CODE_TAG, type.value); + super.put(MSG_TAG, msg); + if (isNotNull(data)) + { + super.put(DATA_TAG, data); + } + } + + /** + * 返回成功消息 + * + * @return 成功消息 + */ + public static AjaxResult success() + { + return AjaxResult.success("操作成功"); + } + + /** + * 返回成功数据 + * + * @return 成功消息 + */ + public static AjaxResult success(Object data) + { + return AjaxResult.success("操作成功", data); + } + + /** + * 返回成功消息 + * + * @param msg 返回内容 + * @return 成功消息 + */ + public static AjaxResult success(String msg) + { + return AjaxResult.success(msg, null); + } + + /** + * 返回成功消息 + * + * @param msg 返回内容 + * @param data 数据对象 + * @return 成功消息 + */ + public static AjaxResult success(String msg, Object data) + { + return new AjaxResult(Type.SUCCESS, msg, data); + } + + /** + * 返回警告消息 + * + * @param msg 返回内容 + * @return 警告消息 + */ + public static AjaxResult warn(String msg) + { + return AjaxResult.warn(msg, null); + } + + /** + * 返回警告消息 + * + * @param msg 返回内容 + * @param data 数据对象 + * @return 警告消息 + */ + public static AjaxResult warn(String msg, Object data) + { + return new AjaxResult(Type.WARN, msg, data); + } + + /** + * 返回错误消息 + * + * @return + */ + public static AjaxResult error() + { + return AjaxResult.error("操作失败"); + } + + /** + * 返回错误消息 + * + * @param msg 返回内容 + * @return 警告消息 + */ + public static AjaxResult error(String msg) + { + return AjaxResult.error(msg, null); + } + + /** + * 返回错误消息 + * + * @param msg 返回内容 + * @param data 数据对象 + * @return 警告消息 + */ + public static AjaxResult error(String msg, Object data) + { + return new AjaxResult(Type.ERROR, msg, data); + } + + /** + * 是否为成功消息 + * + * @return 结果 + */ + public boolean isSuccess() + { + return !isError(); + } + + /** + * 是否为错误消息 + * + * @return 结果 + */ + public boolean isError() + { + return Objects.equals(Type.ERROR.value, this.get(CODE_TAG)); + } + + /** + * 方便链式调用 + * + * @param key 键 + * @param value 值 + * @return 数据对象 + */ + @Override + public AjaxResult put(String key, Object value) + { + super.put(key, value); + return this; + } + + public static boolean isNotNull(Object object) + { + return !isNull(object); + } + + public static boolean isNull(Object object) + { + return object == null; + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java new file mode 100644 index 0000000..767ed5a --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/domain/ServerNode.java @@ -0,0 +1,25 @@ +package com.hw.mqtt.domain; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author WenJY + * @date 2023年03月14日 13:46 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ServerNode { + + /** + * 节点名称 + */ + private String name; + /** + * ip:port + */ + private String peerHost; + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java new file mode 100644 index 0000000..e4fdb0b --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/enums/RedisKeys.java @@ -0,0 +1,58 @@ +package com.hw.mqtt.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * @author WenJY + * @date 2023年03月14日 13:36 + */ +@Getter +@RequiredArgsConstructor +public enum RedisKeys { + + /** + * mqtt 服务端节点 + */ + SERVER_NODES("mqtt:server:nodes:"), + /** + * mqtt <-> redis pug/sub 集群内消息交互 + */ + REDIS_CHANNEL_EXCHANGE("mqtt:channel:exchange"), + /** + * 设备 -> 云端 redis pug/sub 上行消息通道,适合 mq 的集群模式消费 + */ + REDIS_CHANNEL_UP("mqtt:channel:up"), + /** + * 云端 -> 设备 redis pug/sub 下行数据通道,广播到 mqtt 集群 + */ + REDIS_CHANNEL_DOWN("mqtt:channel:down"), + /** + * 连接状态存储 + */ + CONNECT_STATUS("mqtt:connect:status:"), + /** + * 遗嘱消息存储 + */ + MESSAGE_STORE_WILL("mqtt:messages:will:"), + /** + * 保留消息存储 + */ + MESSAGE_STORE_RETAIN("mqtt:messages:retain:"), + ; + + + private final String key; + + /** + * 用于拼接后缀 + * + * @param suffix 后缀 + * @return 完整的 redis key + */ + public String getKey(String suffix) { + return this.key.concat(suffix); + } + +} + diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java new file mode 100644 index 0000000..951e25e --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hw.mqtt.listener; + +import com.hw.mqtt.enums.RedisKeys; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; +import net.dreamlu.mica.redis.cache.MicaRedisCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; +import org.tio.core.ChannelContext; + +/** + * 状态监听器 + * 性能损失小 + * @author WenJY + * @date 2023-03-14 12:17 + * @param null + * @return null + */ +@Service +public class MqttConnectStatusListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean { + private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class); + + private final ApplicationContext context; + private final MicaRedisCache redisCache; + private MqttServerCreator serverCreator; + + public MqttConnectStatusListener(ApplicationContext context, MicaRedisCache redisCache) { + this.context = context; + this.redisCache = redisCache; + } + + @Override + public void online(ChannelContext context, String clientId, String username) { + logger.info("Mqtt clientId:{} username:{} online.", clientId, username); + redisCache.sAdd(getRedisKey(), clientId); + } + + @Override + public void offline(ChannelContext context, String clientId, String username, String reason) { + logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason); + redisCache.sRem(getRedisKey(), clientId); + } + + /** + * 设备上下线存储,key 的值为 前缀:nodeName + * + * @return redis key + */ + private String getRedisKey() { + return RedisKeys.CONNECT_STATUS.getKey(serverCreator.getNodeName()); + } + + @Override + public void afterSingletonsInstantiated() { + this.serverCreator = context.getBean(MqttServerCreator.class); + } + + @Override + public void destroy() throws Exception { + // 停机时删除集合 + redisCache.del(getRedisKey()); + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java new file mode 100644 index 0000000..f194479 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java @@ -0,0 +1,40 @@ +package com.hw.mqtt.listener; + +import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; +import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; +import org.tio.core.ChannelContext; + +/** + * 消息监听器 + * @author WenJY + * @date 2023-03-14 12:18 + * @param null + * @return null + */ +@Service +public class MqttServerMessageListener implements IMqttMessageListener, SmartInitializingSingleton { + private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class); + @Autowired + private ApplicationContext applicationContext; + private MqttServerTemplate mqttServerTemplate; + + @Override + public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) { + logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, ByteBufferUtil.toString(message.getPayload())); + } + + @Override + public void afterSingletonsInstantiated() { + // 单利 bean 初始化完成之后从 ApplicationContext 中获取 bean + mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class); + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java new file mode 100644 index 0000000..fb089d6 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/IMqttBrokerService.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hw.mqtt.service; + + +import java.util.List; + +/** + * mqtt broker 服务 + * + * @author L.cm + */ +public interface IMqttBrokerService { + + /** + * 获取在线客户端数量 + * @author WenJY + * @date 2023-03-14 13:51 + * @return long + */ + long getOnlineClientSize(); + + /** + * 获取所有在线的客户端 + * @author WenJY + * @date 2023-03-14 13:51 + * @return java.util.List + */ + List getOnlineClients(); + + /** + * 向指定主题发送消息 + * @author WenJY + * @date 2023-03-14 13:50 + * @param topic + * @param payload + * @return boolean + */ + boolean publish(String topic,String payload); + + /** + * 主动关闭指定客户端连接 + * @author WenJY + * @date 2023-03-18 9:23 + * @param clientId + */ + boolean closeClientById(String clientId); +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/ServerService.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/ServerService.java new file mode 100644 index 0000000..3b8173b --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/ServerService.java @@ -0,0 +1,29 @@ +package com.hw.mqtt.service; + +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; + +/** + * + * @author WenJY + * @date 2023-03-14 12:19 + * @param null + * @return null + */ +@Service +public class ServerService { + private static final Logger logger = LoggerFactory.getLogger(ServerService.class); + @Autowired + private MqttServerTemplate server; + + public boolean publish(String body) { + boolean result = server.publishAll("/test/message", body.getBytes(StandardCharsets.UTF_8)); + logger.info("Mqtt publishAll result:{};payload:{}", result,body); + return result; + } +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java new file mode 100644 index 0000000..b516fc5 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/service/impl/MqttBrokerServiceImpl.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hw.mqtt.service.impl; + +import com.hw.mqtt.enums.RedisKeys; +import com.hw.mqtt.service.IMqttBrokerService; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import net.dreamlu.mica.core.utils.StringPool; +import net.dreamlu.mica.redis.cache.MicaRedisCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * mqtt broker 服务 + * + * @author L.cm + */ +@Service +public class MqttBrokerServiceImpl implements IMqttBrokerService { + private static final Logger logger = LoggerFactory.getLogger(MqttBrokerServiceImpl.class); + @Autowired private MicaRedisCache redisCache; + @Autowired private MqttServerTemplate server; + + @Override + public long getOnlineClientSize() { + Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR)); + if (keySet.isEmpty()) { + return 0L; + } + long result = 0; + for (String redisKey : keySet) { + Long count = redisCache.getSetOps().size(redisKey); + if (count != null) { + result += count; + } + } + return result; + } + + @Override + public List getOnlineClients() { + Set keySet = redisCache.scan(RedisKeys.CONNECT_STATUS.getKey(StringPool.STAR)); + if (keySet.isEmpty()) { + return Collections.emptyList(); + } + List clientList = new ArrayList<>(); + for (String redisKey : keySet) { + Set members = redisCache.sMembers(redisKey); + if (members != null && !members.isEmpty()) { + clientList.addAll(members); + } + } + return clientList; + } + + @Override + public boolean publish(String topic, String payload) { + boolean result = server.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8)); + logger.info("Mqtt publishAll result:{};topic:{};payload:{}", result, topic, payload); + return result; + } + + @Override + public boolean closeClientById(String clientId) { + try{ + server.close(clientId); + }catch (Exception ex){ + return false; + } + return true; + } + + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java new file mode 100644 index 0000000..ac33ad8 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/task/PublishAllTask.java @@ -0,0 +1,27 @@ +package com.hw.mqtt.task; + +import net.dreamlu.iot.mqtt.core.server.MqttServer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; + +/** + * 自定义任务:心跳下发 + * @author WenJY + * @date 2023-03-14 12:16 + * @param null + * @return null + */ +@Service +public class PublishAllTask { + @Autowired + private MqttServer mqttServer; + + @Scheduled(fixedDelay = 1000) + public void run() { + mqttServer.publishAll("/test/heartbeat", "心跳指令无需处理".getBytes(StandardCharsets.UTF_8)); + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java new file mode 100644 index 0000000..2404422 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/java/com/hw/mqtt/util/RedisUtil.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hw.mqtt.util; + +import net.dreamlu.mica.core.utils.CharPool; + +/** + * redis 工具 + * + * @author L.cm + */ +public class RedisUtil { + + /** + * 转换成 redis 的 pattern 规则 + * + * @return pattern + */ + public static String getTopicPattern(String topicFilter) { + // mqtt 分享主题 $share/{ShareName}/{filter} + return topicFilter + .replace(CharPool.PLUS, CharPool.STAR) + .replace(CharPool.HASH, CharPool.STAR); + } + +} diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/resources/application-server.yml b/ruoyi-modules/hw-mqtt-broker/src/main/resources/application-server.yml new file mode 100644 index 0000000..d73d428 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/resources/application-server.yml @@ -0,0 +1,54 @@ + +server: + port: 30013 +spring: + # redis 配置 + redis: + # 地址 + host: huawei-redis + # 端口,默认为6379 + port: 6379 + # 密码 + password: admin123 + # 连接超时时间 + timeout: 10s + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 0 + # 连接池中的最大空闲连接 + max-idle: 8 + # 连接池的最大数据库连接数 + max-active: 8 + # 连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: -1ms +mqtt: + server: + enabled: true # 是否开启服务端,默认:true + # ip: 0.0.0.0 # 服务端 ip 默认为空,0.0.0.0,建议不要设置 + port: 1883 # 端口,默认:1883 + name: Mqtt-Broker # 名称,默认:Mica-Mqtt-Server + buffer-allocator: HEAP # 堆内存和堆外内存,默认:堆内存 + heartbeat-timeout: 120000 # 心跳超时,单位毫秒,默认: 1000 * 120 + read-buffer-size: 8KB # 接收数据的 buffer size,默认:8k + max-bytes-in-message: 10MB # 消息解析最大 bytes 长度,默认:10M + auth: + enable: false # 是否开启 mqtt 认证 + username: mica # mqtt 认证用户名 + password: mica # mqtt 认证密码 + debug: true # 如果开启 prometheus 指标收集建议关闭 + stat-enable: true # 开启指标收集,debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存 + web-port: 8083 # http、websocket 端口,默认:8083 + websocket-enable: true # 是否开启 websocket,默认: true + http-enable: false # 是否开启 http api,默认: false + http-basic-auth: + enable: false # 是否开启 http basic auth,默认: false + username: mica # http basic auth 用户名 + password: mica # http basic auth 密码 + ssl: # mqtt tcp ssl 认证 + enabled: false # 是否开启 ssl 认证,2.1.0 开始支持双向认证 + keystore-path: # 必须参数:ssl keystore 目录,支持 classpath:/ 路径。 + keystore-pass: # 必选参数:ssl keystore 密码 + truststore-path: # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。 + truststore-pass: # 可选参数:ssl 双向认证 truststore 密码 + client-auth: none # 是否需要客户端认证(双向认证),默认:NONE(不需要) diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/resources/application.yml b/ruoyi-modules/hw-mqtt-broker/src/main/resources/application.yml new file mode 100644 index 0000000..eebb355 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/resources/application.yml @@ -0,0 +1,33 @@ + +spring: + application: + name: Mqtt-Broker + profiles: + active: server +springdoc: + swagger-ui: + urls: + - name: swagger + url: /v3/api-docs +# actuator management +management: + info: + defaults: + enabled: true + metrics: + tags: + application: ${spring.application.name} + endpoint: + health: + show-details: ALWAYS + prometheus: + enabled: true + endpoints: + web: + exposure: + include: '*' +logging: + level: + root: info + server: info # t-io ??????? + org.tio: info # t-io ??????? diff --git a/ruoyi-modules/hw-mqtt-broker/src/main/resources/banner.txt b/ruoyi-modules/hw-mqtt-broker/src/main/resources/banner.txt new file mode 100644 index 0000000..89e25a4 --- /dev/null +++ b/ruoyi-modules/hw-mqtt-broker/src/main/resources/banner.txt @@ -0,0 +1,10 @@ + +${AnsiColor.RED} ## ## ####### ######## ######## +${AnsiColor.RED} ### ### ## ## ## ## +${AnsiColor.RED} #### #### ## ## ## ## +${AnsiColor.RED} ## ### ## ## ## ## ## +${AnsiColor.RED} ## ## ## ## ## ## ## +${AnsiColor.RED} ## ## ## ## ## ## +${AnsiColor.RED} ## ## ##### ## ## ## + +${AnsiColor.BRIGHT_BLUE}:: ${spring.application.name} :: Running Spring Boot ${spring-boot.version} 🏃🏃🏃 ${AnsiColor.DEFAULT} diff --git a/ruoyi-modules/pom.xml b/ruoyi-modules/pom.xml index eea1f8e..72f2ef6 100644 --- a/ruoyi-modules/pom.xml +++ b/ruoyi-modules/pom.xml @@ -13,6 +13,7 @@ ruoyi-gen ruoyi-job ruoyi-file + hw-mqtt-broker ruoyi-modules