diff --git a/ruoyi-modules/hwmom-dp/pom.xml b/ruoyi-modules/hwmom-dp/pom.xml new file mode 100644 index 00000000..1db1493c --- /dev/null +++ b/ruoyi-modules/hwmom-dp/pom.xml @@ -0,0 +1,93 @@ + + + 4.0.0 + + org.dromara + ruoyi-modules + ${revision} + + + hwmom-dp + + + hwmom-dp data process数据处理模块 + + + + + + org.springframework.kafka + spring-kafka + + + + org.dromara + ruoyi-common-nacos + + + + + org.dromara + ruoyi-common-security + + + + + org.dromara + ruoyi-api-resource + + + com.alibaba.fastjson2 + fastjson2 + 2.0.51 + compile + + + + + + + + + + + + + + + + + + + + + ${project.artifactId} + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 17 + 17 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + + diff --git a/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/HwDataProcessApplication.java b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/HwDataProcessApplication.java new file mode 100644 index 00000000..033dab5e --- /dev/null +++ b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/HwDataProcessApplication.java @@ -0,0 +1,20 @@ +package org.dromara.dp; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; + +@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) +public class HwDataProcessApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(HwDataProcessApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ data process数据处理模块启动成功 ლ(´ڡ`ლ)゙ "); + + + } + +} diff --git a/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/kafka/service/KafkaConsumerService.java b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/kafka/service/KafkaConsumerService.java new file mode 100644 index 00000000..782fc349 --- /dev/null +++ b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/dp/kafka/service/KafkaConsumerService.java @@ -0,0 +1,13 @@ +package org.dromara.dp.kafka.service; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumerService { + + @KafkaListener(topics = "test", groupId = "my-consumer-group") + public void listen(String message) { + System.out.println("Received message: " + message); + } +} diff --git a/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DataProcessServiceImpl.java b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DataProcessServiceImpl.java new file mode 100644 index 00000000..6e0219f2 --- /dev/null +++ b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DataProcessServiceImpl.java @@ -0,0 +1,685 @@ +//package org.dromara.service; +// +//import com.alibaba.fastjson.JSON; +//import com.alibaba.fastjson.JSONArray; +//import com.alibaba.fastjson.JSONObject; +//import com.aliyun.auth.credentials.Credential; +//import com.aliyun.auth.credentials.provider.StaticCredentialProvider; +//import com.aliyun.sdk.service.dysmsapi20170525.AsyncClient; +//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsRequest; +//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsResponse; +//import com.ruoyi.common.core.constant.HwDictConstants; +//import com.ruoyi.common.core.constant.SecurityConstants; +//import com.ruoyi.common.core.constant.TdEngineConstants; +//import com.ruoyi.common.core.domain.R; +//import com.ruoyi.common.core.enums.ResultEnums; +//import com.ruoyi.common.core.utils.RegexUtils; +//import com.ruoyi.common.core.utils.StringUtils; +//import com.ruoyi.dataprocess.amap.LocationVo; +//import com.ruoyi.dataprocess.amap.PositionUtils; +//import com.ruoyi.dataprocess.amap.PositionVo; +//import com.ruoyi.dataprocess.common.ImageUtils; +//import com.ruoyi.dataprocess.domain.*; +//import com.ruoyi.dataprocess.mapper.*; +//import com.ruoyi.dataprocess.service.CommanHandleService; +//import com.ruoyi.dataprocess.service.IDataProcessService; +//import com.ruoyi.tdengine.api.RemoteTdEngineService; +//import com.ruoyi.tdengine.api.domain.TdField; +//import com.ruoyi.tdengine.api.domain.TdHistorySelectDto; +//import com.ruoyi.tdengine.api.domain.TdReturnDataVo; +//import com.ruoyi.tdengine.api.domain.TdTableVo; +//import darabonba.core.client.ClientOverrideConfiguration; +//import org.apache.commons.codec.binary.Base64; +//import org.apache.poi.openxml4j.exceptions.InvalidOperationException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Service; +// +//import javax.annotation.Resource; +//import javax.script.ScriptEngine; +//import javax.script.ScriptEngineManager; +//import java.io.IOException; +//import java.math.BigDecimal; +//import java.text.SimpleDateFormat; +//import java.util.*; +//import java.util.concurrent.CompletableFuture; +//import java.util.concurrent.ExecutionException; +//import java.util.concurrent.atomic.AtomicBoolean; +//import java.util.stream.Collectors; +// +///** +// * @Description: 数据处理业务类 +// * @ClassName: DataProcessServiceImpl +// * @Author : xins +// * @Date :2023-09-04 8:58 +// * @Version :1.0 +// */ +//@Service +//public class DataProcessServiceImpl extends CommanHandleService implements IDataProcessService { +// +// private static final Logger logger = LoggerFactory.getLogger(DataProcessServiceImpl.class); +// +// private final String AccessKeyId = "LTAI5tPoTQHh8HHst2toxtGa"; +// private final String AccessKeySecret = "K8OIuSNgsSnpGMJ2PdqIJFYyUqL38m"; +// +// @Resource +// private RemoteTdEngineService remoteTdEngineService; +// +// @Autowired +// private HwDeviceMapper hwDeviceMapper; +// +// @Autowired +// private HwElectronicFenceMapper hwElectronicFenceMapper; +// +// @Autowired +// private HwFenceAreaMapper hwFenceAreaMapper; +// +// @Autowired +// private HwAlarmInfoMapper hwAlarmInfoMapper; +// +// @Autowired +// private HwAlarmRuleMapper hwAlarmRuleMapper; +// +// /** +// * @param: jsonData +// * @param: imagePath ruoyifile的上传地址 +// * @param: imagePatterns +// * @param: imageDomain ruoyifile的domain +// * @param: imagePrefix ruoyifile的prefix +// * @param: topic 发布主题,用来获取网关设备devicecode +// * @description +// * @author xins +// * @date 2023-08-31 16:16 +// */ +// @Override +// public int processBusinessData(String jsonData, String imagePath, +// String imagePatterns, String imageDomain, String imagePrefix, String topic) { +// JSONObject json = JSON.parseObject(jsonData); +// Long ts = json.getLong(TdEngineConstants.PAYLOAD_TS); +// String tsStr = String.valueOf(ts); +// if (tsStr.length() == 10) { +// ts = ts * 1000; +// } +// JSONArray paramArr = json.getJSONArray(TdEngineConstants.PAYLOAD_PARAM); +//// System.out.println(this.hwDeviceMapper); +//// System.out.println(this.remoteTdEngineService); +// String deviceCode = ""; +// for (int i = 0; i < paramArr.size(); i++) { +// try { +// JSONObject paramJson = paramArr.getJSONObject(i); +// String dataType = paramJson.getString(TdEngineConstants.PAYLOAD_DATATYPE); +// JSONObject dataValueJson = paramJson.getJSONObject(TdEngineConstants.PAYLOAD_DATAVALUE); +// deviceCode = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_CODE).toLowerCase(); +// //deviceCode = getUidByParam(json); +// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode); +// if (hwDevice == null) { +// logger.error("此设备【deviceCode:{}】不存在", deviceCode); +// continue; +// } +// if (hwDevice.getDeviceStatus().equals(HwDictConstants.DEVICE_STATUS_DELETE)) { +// logger.error("此设备【deviceCode:{}】已删除", deviceCode); +// continue; +// } +// +// Long sceneId = hwDevice.getSceneId(); +// Long deviceId = hwDevice.getDeviceId(); +// Long monitorUnitId = hwDevice.getMonitorUnitId(); +// Long tenantId = hwDevice.getTenantId(); +// String databaseName = TdEngineConstants.getDatabaseName(); +// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId; +// +// dataValueJson.remove(TdEngineConstants.PAYLOAD_DEVICE_CODE); +// TdTableVo tdTableVo = new TdTableVo(); +// List schemaFields = new ArrayList<>(); +// List alarmSchemaFields = new ArrayList<>(); +// +// //添加timestamp字段,默认字段名称是ts,协议上传的key是timestamp +// TdField firstTdField = new TdField(); +// firstTdField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME); +// long currentTimeMillis = System.currentTimeMillis(); +// firstTdField.setFieldValue(currentTimeMillis); +//// firstTdField.setFieldValue(ts); +// schemaFields.add(firstTdField); +// +// Object longitude = null; +// Object latitude = null; +// +// for (Map.Entry entry : dataValueJson.entrySet()) { +// String originalKey = entry.getKey();//原来的key +// Object value = entry.getValue(); +// +// /** +// * 先对key进行转换,例如对于key是value的需要转换成value1; +// */ +// String key = TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey) == null ? originalKey +// : TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey); +// +// if (key.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE)) { +// continue; +// } +// +// +// if (value instanceof String) { +// String valueStr = (String) value; +// if (StringUtils.isNotBlank(valueStr)) { +// /** +// * 先判读是否是图片,并获取图片名称, 获取到证明是图片 +// */ +// String[] imagePatternArr = imagePatterns.split(","); +// String extension = ImageUtils.getImageType(valueStr, imagePatternArr); +// if (StringUtils.isNotBlank(extension) && valueStr.length() > 300) { +// //保存图片,并返回图片详细地址进行赋值保存 +// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension); +// if (value == null) continue; +// } else if (dataType.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_IMAGE)) { +// if (ImageUtils.checkIsBase64(valueStr) && valueStr.length() > 300) {//判断没有加前缀的情况 +// String type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE); +// if (type == null) { +// type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE_FIRSET_UPPER); +// } +// extension = "jpg"; +// if (type != null) { +// extension = StringUtils.isNotBlank(ImageUtils.getImageType(type, imagePatternArr)) +// ? ImageUtils.getImageType(type, imagePatternArr) : extension; +// } +// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension); +// if (value == null) continue; +// } +// } +// +// TdField tdField = new TdField(); +// tdField.setFieldName(key); +// tdField.setFieldValue(value); +// schemaFields.add(tdField); +// } +// } else { +// TdField tdField = new TdField(); +// tdField.setFieldName(key); +// tdField.setFieldValue(value); +// schemaFields.add(tdField); +// +// //经纬度判断 +// if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LONGITUDE_IDENTIFIER)) { +// longitude = value; +// } else if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LATITUDE_IDENTIFIER)) { +// latitude = value; +// } else { +// TdField alarmTdField = new TdField(); +// alarmTdField.setFieldName(originalKey); +// alarmTdField.setFieldValue(value); +// alarmSchemaFields.add(alarmTdField); +// } +// } +// } +// +// tdTableVo.setDatabaseName(databaseName); +// tdTableVo.setTableName(tableName); +// tdTableVo.setSchemaFields(schemaFields); +// +// final R insertResult = this.remoteTdEngineService.insertTable(tdTableVo , SecurityConstants.INNER); +// if (insertResult.getCode() == ResultEnums.SUCCESS.getCode()) { +// logger.info("Insert data result: {}", insertResult.getCode()); +// } else { +// logger.error("Insert data Exception: {},data:{}", insertResult.getMsg(), jsonData); +// } +// +// //校验电子围栏 +// if (longitude != null && latitude != null) { +// checkElectronicFence(deviceId, tenantId, monitorUnitId, sceneId, longitude, latitude, ts); +// } +// +// //校验设备报警信息 +// checkAlarm(deviceId, alarmSchemaFields, tenantId, sceneId, monitorUnitId, topic, deviceCode); +// +// } catch (Exception e) { +// e.printStackTrace(); +// logger.error("deviceCode:{},errorMsg:{},data:{}", deviceCode, e.getMessage(), jsonData); +// } +// } +// +// return paramArr.size(); +// } +// +// /** +// * 获取设备UID +// * @param jsonStr +// * @return +// */ +// private String getUidByParam(JSONObject jsonStr){ +// +// JSONObject jsonObject = new JSONObject(jsonStr); +// +// JSONArray paramArray = jsonObject.getJSONArray("param"); +// +// JSONObject firstParamObject = paramArray.getJSONObject(0); +// +// JSONObject datavalueObject = firstParamObject.getJSONObject("datavalue"); +// +// String uid = datavalueObject.getString("uid"); +// +// return uid; +// } +// +// +// private static String getImageFileName(String imagePath, String imageDomain, String imagePrefix, String valueStr, Long deviceId, String extension) { +// try { +// String imageFileName = ImageUtils.convertBase64ToImage(imagePath, +// valueStr, "device" + deviceId, extension); +// return imageDomain + imagePrefix + imageFileName; +// +// } catch (IOException e) { +// logger.error("转换图片错误:" + e.getMessage(), e); +// } +// return null; +// } +// +// +// /** +// * @param: deviceId +// * @param: monitorUnitId +// * @param: sceneId +// * @param: longitude +// * @param: latitude +// * @param: ts +// * @description 校验此设备电子围栏 +// * @author xins +// * @date 2023-09-04 14:04 +// */ +// private void checkElectronicFence(Long deviceId, Long tenantId, Long monitorUnitId, Long sceneId, Object longitude, Object latitude, Long ts) { +// //根据设备ID、监控单元ID和场景ID获取所有的电子围栏配置。(目前先只支持场景) +//// List hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByDeviceId(deviceId); +//// hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByMonitorUnitId(monitorUnitId); +// List hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesBySceneId(sceneId); +// +// if (StringUtils.isNotEmpty(hwElectronicFences)) { +// for (HwElectronicFence hwElectronicFence : hwElectronicFences) { +// HwFenceArea queryFenceArea = new HwFenceArea(); +// queryFenceArea.setElectronicFenceId(hwElectronicFence.getElectronicFenceId()); +// //获取电子围栏下配置的区域列表 +// List fenceAreas = hwFenceAreaMapper.selectHwFenceAreaList(queryFenceArea); +// String effectiveTimeFlag = hwElectronicFence.getEffectiveTimeFlag(); +// String triggerStatus = hwElectronicFence.getTriggerStatus(); +// if (fenceAreas != null && !fenceAreas.isEmpty()) { +// fenceAreas.forEach(fenceArea -> { +// boolean isAlarmed = false;//是否报警 +// if (effectiveTimeFlag.equals(HwDictConstants.EFFECTIVE_TIME_FLAG_LONG)) { +// String areaShapeFlag = fenceArea.getAreaShapeFlag(); +// String areaRange = fenceArea.getAreaRange(); +// //多边形处理 +// if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_POLYGN)) { +// LocationVo polygonVo = new LocationVo(); +// polygonVo.setMarkerType(LocationVo.MARKER_TYPE_POLYGON); +// List> polygonList = new ArrayList<>(); +// List postionList = new ArrayList<>(); +// String[] areaRangeArr = areaRange.split("_");//多个点,每个点的经纬度信息(经度_纬度) +// for (String areaRange1 : areaRangeArr) { +// String[] areaRange1Arr = areaRange1.split(","); +// String longitudeStr = areaRange1Arr[0]; +// String latitudeStr = areaRange1Arr[1]; +// postionList.add(new PositionVo(new BigDecimal(longitudeStr), new BigDecimal(latitudeStr))); +// polygonList.add(postionList); +// } +// +// polygonVo.setPolygonList(polygonList); +// +// /** +// * 传入的longitude和latitude是object类型,本身有可能是BigDecimal,Double,Long和Integer,先转成String再转成Double +// */ +// boolean isWithin = PositionUtils.checkAddressInLocation(polygonVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude))); +// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警 +// isAlarmed = true; +// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警 +// isAlarmed = true; +// } +// +// } else if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_CIRCULA)) {//圆形处理 +// String[] areaRangeArr = areaRange.split(","); +// String longitudeStr = areaRangeArr[0]; +// String latitudeStr = areaRangeArr[1]; +// String radiusStr = areaRangeArr[2]; +// LocationVo circulrVo = new LocationVo(); +// circulrVo.setRadius(Double.valueOf(radiusStr)); +// circulrVo.setLongitude(Double.valueOf(longitudeStr)); +// circulrVo.setLatitude(Double.valueOf(latitudeStr)); +// circulrVo.setMarkerType(LocationVo.MARKER_TYPE_CIRCULAR); +// boolean isWithin = PositionUtils.checkAddressInLocation(circulrVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude))); +// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警 +// isAlarmed = true; +// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警 +// isAlarmed = true; +// } +// } +// } +// +// if (isAlarmed) { +// HwAlarmInfo hwAralmInfo = new HwAlarmInfo(); +// hwAralmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_ELECTRONIC_FENCE); +// hwAralmInfo.setAlarmReleatedId(hwElectronicFence.getElectronicFenceId()); +// hwAralmInfo.setDeviceId(deviceId); +// hwAralmInfo.setMonitorUnitId(monitorUnitId); +// hwAralmInfo.setTenantId(tenantId); +// hwAralmInfo.setSceneId(sceneId); +// hwAralmInfo.setFunctionValue(longitude + "_" + latitude); +// hwAralmInfo.setTriggerStatus(triggerStatus); +// hwAralmInfo.setAlarmTime(new Date(ts)); +// hwAralmInfo.setCreateTime(new Date()); +// hwAralmInfo.setFenceAreaId(fenceArea.getFenceAreaId()); +// hwAlarmInfoMapper.insertHwAlarmInfo(hwAralmInfo); +// } +// }); +// } +// } +// } +// } +// +// +// /** +// * @param: deviceId +// * @param: alarmFields +// * @description 校验报警 +// * @author xins +// * @date 2023-11-07 10:15 +// */ +// private void checkAlarm(Long deviceId, List alarmFields, +// Long tenantId, Long sceneId, Long monitorUnitId, String topic, String subDeviceCode) { +// +// try { +// HwAlarmRule queryAlarmRule = new HwAlarmRule(); +// queryAlarmRule.setRuleDeviceId(deviceId); +// queryAlarmRule.setRuleType(HwDictConstants.ALARM_RULE_RULE_TYPE_DEVICE); +// queryAlarmRule.setAlarmRuleStatus(HwDictConstants.ALARM_RULE_STATUS_ENABLE); +// List alarmRules = hwAlarmRuleMapper.selectHwAlarmRulesWithLink(queryAlarmRule); +// +// ScriptEngineManager manager = new ScriptEngineManager(); +// ScriptEngine engine = manager.getEngineByName("js"); +// +// if (alarmRules != null) { +// Date currentDate = new Date(); +// for (HwAlarmRule alarmRule : alarmRules) { +// String triggerExpression = alarmRule.getTriggerExpression() +// .replaceAll("and", "&&") +// .replaceAll("or", "||"); +// AtomicBoolean isAlarmed = new AtomicBoolean(false); +// List alarmDetails = new ArrayList(); +// +// for (TdField alarmField : alarmFields) { +// String fieldName = alarmField.getFieldName(); +// Object filedValue = alarmField.getFieldValue(); +// if (triggerExpression.contains("{" + fieldName + "}")) { +// isAlarmed.set(true); +// +// /** +// * Add By 获取五分钟内的数据 WenJY 2024-07-03 14:13:21 +// */ +// if(alarmRule.getTriggerTimeFrame() != null && alarmRule.getTriggerTimeFrame() > 0){ +// double difValue = FilterDeviceValue(deviceId, fieldName, alarmRule.getTriggerTimeFrame()); +//// if(difValue > 0){ +//// filedValue = difValue; +//// } +// filedValue = difValue; +// } +// /** End **/ +// +// triggerExpression = triggerExpression.replaceAll("\\{" + fieldName + "\\}", String.valueOf(filedValue)); +// HwAlarmDetail alarmDetail = new HwAlarmDetail(); +// alarmDetail.setDeviceId(deviceId); +// alarmDetail.setFunctionIdentifier(fieldName); +// alarmDetail.setFunctionValue(String.valueOf(filedValue)); +// alarmDetail.setMonitorTime(currentDate); +// alarmDetails.add(alarmDetail); +// +// } +// } +// +// if (isAlarmed.get() && !triggerExpression.contains("{") +// && RegexUtils.findSymbolInText(triggerExpression).size() > 0 +// && RegexUtils.findNumberInText(triggerExpression).size() > 0) { +// +// Boolean triggerExpressionBool = (Boolean) engine.eval(triggerExpression); +// if (triggerExpressionBool) { +// HwAlarmInfo alarmInfo = new HwAlarmInfo(); +// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_DEVICE); +// alarmInfo.setAlarmReleatedId(alarmRule.getAlarmRuleId()); +// alarmInfo.setDeviceId(deviceId); +// alarmInfo.setMonitorUnitId(monitorUnitId); +// alarmInfo.setTenantId(tenantId); +// alarmInfo.setSceneId(sceneId); +// alarmInfo.setAlarmLevelId(alarmRule.getAlarmLevelId()); +// alarmInfo.setAlarmTypeId(alarmRule.getAlarmTypeId()); +// alarmInfo.setHwAlarmDetailList(alarmDetails); +// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO); +// alarmInfo.setCreateTime(currentDate); +// alarmInfo.setAlarmTime(currentDate); +// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo); +// this.insertHwAlarmDetail(alarmInfo); +// this.handleAlarmLink(alarmRule, topic, subDeviceCode); +// /** +// * Add By WenJY 下发报警信息 +// */ +// if(StringUtils.isNotEmpty(alarmRule.getPhoneNumbers())){ +// SendAlarmInfoBySms(deviceId,alarmRule); +// }else { +// logger.warn("设备:{};报警信息发送失败,联系方式为空",deviceId); +// } +// break; +// } +// } +// } +// } +// }catch (Exception e){ +// logger.error("设备:{};报警逻辑处理异常:{}",deviceId,e.getMessage()); +// } +// } +// +// /** +// * 过滤指定时间段内的数据,返回最大最小的差值,过滤0 +// * @param deviceId +// * @param fieldName +// * @param triggerTimeFrame +// * @return 差值 +// */ +// private double FilterDeviceValue(Long deviceId,String fieldName,long triggerTimeFrame){ +// double differenceValue = 0.00; +// String databaseName = TdEngineConstants.getDatabaseName(); +// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId; +// long currentTimeMillis = System.currentTimeMillis(); +// long fiveMinutesInMillis = triggerTimeFrame * 60 * 1000; +// long beforeTimeMillis = currentTimeMillis - fiveMinutesInMillis; +// +// TdHistorySelectDto tdHistorySelectDto = new TdHistorySelectDto(); +// tdHistorySelectDto.setDatabaseName(databaseName); +// tdHistorySelectDto.setTableName(tableName); +// tdHistorySelectDto.setStartTime(beforeTimeMillis); +// tdHistorySelectDto.setEndTime(currentTimeMillis); +// tdHistorySelectDto.setFirstFieldName("ts"); +// R result = this.remoteTdEngineService.getHistoryData(tdHistorySelectDto , SecurityConstants.INNER); +// +// if (result.getCode() == ResultEnums.SUCCESS.getCode()) { +// logger.info("Get history data result: {}", result.getCode()); +// +// TdReturnDataVo data = (TdReturnDataVo) result.getData(); +// +// List> dataList = data.getDataList(); +// List value = dataList.stream().map(map -> (Double)map.get("value1")).distinct().filter(x -> x > 0).collect(Collectors.toList()); +// +// System.out.println(triggerTimeFrame +"分钟内数据:"+JSONArray.toJSON(value) ); +// +// OptionalDouble maxOptional = value.stream().mapToDouble(Double::doubleValue).max(); +// OptionalDouble minOptional = value.stream().mapToDouble(Double::doubleValue).min(); +// +// if (maxOptional.isPresent() && minOptional.isPresent()) { +// double maxValue = maxOptional.getAsDouble(); +// double minValue = minOptional.getAsDouble(); +// differenceValue = maxValue - minValue; +// System.out.println("Difference between max and min values: " + differenceValue); +// } else { +// System.out.println("No values found to calculate difference."); +// } +// +// } else { +// logger.error("Get history data Exception: {}", result.getMsg()); +// } +// +// return differenceValue; +// +// } +// +// /** +// * 新增报警详情信息信息 +// * +// * @param hwAlarmInfo 报警信息对象 +// */ +// private void insertHwAlarmDetail(HwAlarmInfo hwAlarmInfo) { +// List hwAlarmDetailList = hwAlarmInfo.getHwAlarmDetailList(); +// Long alarmInfoId = hwAlarmInfo.getAlarmInfoId(); +// if (StringUtils.isNotNull(hwAlarmDetailList)) { +// List list = new ArrayList(); +// for (HwAlarmDetail hwAlarmDetail : hwAlarmDetailList) { +// hwAlarmDetail.setAlarmInfoId(alarmInfoId); +// list.add(hwAlarmDetail); +// } +// if (list.size() > 0) { +// hwAlarmInfoMapper.batchHwAlarmDetail(list); +// } +// } +// } +// +// /** +// * @param: alarmRule +// * @param: topic +// * @param: subDeviceCode +// * @description 处理报警规则联动设备 +// * @author xins +// * @date 2023-11-07 16:39 +// */ +// private void handleAlarmLink(HwAlarmRule alarmRule, String topic, String subDeviceCode) { +// String controlCommandTopic = topic.replace(HwDictConstants.TOPIC_TYPE_DATA_POSTFIX, HwDictConstants.TOPIC_TYPE_COMMAND_POSTFIX); +// if (alarmRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) { +// List alarmRuleLinks = alarmRule.getHwAlarmRuleLinkList(); +// alarmRuleLinks.forEach(alarmRuleLink -> { +// this.publishControlCommand(controlCommandTopic, subDeviceCode, alarmRuleLink.getLinkDeviceFunctionIdentifier()); +// }); +// } +// } +// +// @Override +// public void testBase64(String jsonData, String imagePath, String imagePatterns, String imageDomain, String imagePrefix) { +// JSONObject json = JSON.parseObject(jsonData); +// Object value = json.get("base64"); +// if (value instanceof String) { +// +// String valueStr = (String) value; +// System.out.println(Base64.isBase64(valueStr)); +// if (StringUtils.isNotBlank(valueStr)) { +// /** +// * 先判读是否是图片,并获取图片名称, 获取到证明是图片 +// */ +// String[] imagePatternArr = imagePatterns.split(","); +// String extension = ImageUtils.getImageType(valueStr, imagePatternArr); +// if (StringUtils.isNotBlank(extension)) { +// //保存图片,并返回图片详细地址进行赋值保存 +// Long deviceId = 100L; +// String imageFileName = null; +// try { +// imageFileName = ImageUtils.convertBase64ToImage(imagePath, +// valueStr, "device" + deviceId, extension); +// if (StringUtils.isNotBlank(imageFileName)) { +// String url = imageDomain + imagePrefix + imageFileName; +// System.out.println(url); +// } +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// +// } +// } +// } +// } +// +// /** +// * 发送报警短信 +// * @param deviceId +// * @param alarmRule +// */ +// private void SendAlarmInfoBySms(Long deviceId,HwAlarmRule alarmRule){ +// try { +// HwDevice hwDevice = hwDeviceMapper.GetDeviceById(deviceId); +// if(hwDevice != null){ +// AlarmMsgTemplateParam alarmMsgTemplateParam = new AlarmMsgTemplateParam(); +// alarmMsgTemplateParam.setWarning(alarmRule.getAlarmRuleName()); +// alarmMsgTemplateParam.setParentname(hwDevice.getMonitorUnitName()); +// alarmMsgTemplateParam.setSensorID(deviceId.toString()); +// AliSmsHandle(alarmRule.getPhoneNumbers(),alarmMsgTemplateParam); +// } +// } catch (ExecutionException | InterruptedException e) { +// throw new InvalidOperationException(String.format("报警信息发送异常:%e",e.getMessage())); +// } +// } +// +// /** +// * 报警短信推送 +// * @param phoneNumbers +// * @param alarmMsgTemplateParam +// * @throws ExecutionException +// * @throws InterruptedException +// */ +// private void AliSmsHandle(String phoneNumbers,AlarmMsgTemplateParam alarmMsgTemplateParam ) throws ExecutionException, InterruptedException { +// StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder() +// .accessKeyId(AccessKeyId) +// .accessKeySecret(AccessKeySecret) +// .build()); +// +// AsyncClient client = AsyncClient.builder() +// .region("cn-shenzhen") // Region ID +// .credentialsProvider(provider) +// .overrideConfiguration( +// ClientOverrideConfiguration.create() +// .setEndpointOverride("dysmsapi.aliyuncs.com") +// ) +// .build(); +// +// String jsonString = com.alibaba.fastjson2.JSONArray.toJSONString(alarmMsgTemplateParam); +// +// SendSmsRequest sendSmsRequest = SendSmsRequest.builder() +// .phoneNumbers(phoneNumbers) +// .signName("深圳市金瑞铭科技") +// .templateCode("SMS_468740027") +// .templateParam(jsonString) +// .build(); +// logger.info("向:{};发送报警信息:{}",phoneNumbers,jsonString); +// CompletableFuture response = client.sendSms(sendSmsRequest); +// SendSmsResponse resp = response.get(); +// +// client.close(); +// } +// +// +// public static void main(String[] args) { +// System.out.println(System.currentTimeMillis()); +// SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); +// Date date = new Date(); +// System.out.println(format.format(date)); +// Object jsonData = "{\n" + +// "\n" + +// " \"timestamp\": 1689814424,\"type\": \"CMD_REPORTDATA\",\n" + +// " \"param\": [\n{\n" + +// " \"datatype\": \"cttemperature\",\n" + +// "\"datavalue\": {\n" + +// " \"uid\": \"0009340109040126\", \"alias\": \"\",\n" + +// " \"value\": 25.6,\n" + +// " \"voltage\": 3.76,\n" + +// " \"rssi\": -80\n" + +// " } },\n" + +// " {\"datatype\": \"cttemperature\",\n" + +// " \"datavalue\": {\n" + +// " \"uid1\": \"00093440109040924\",\n" + +// " \"alias\": \"\",\n" + +// " \"value\": 25.6,\n" + +// " \"voltage\": 3.64,\n" + +// " \"rssi\": -87\n" + +// " }\n" + +// " }]}"; +// +// } +//} diff --git a/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DeviceStatusServiceImpl.java b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DeviceStatusServiceImpl.java new file mode 100644 index 00000000..eb3b851c --- /dev/null +++ b/ruoyi-modules/hwmom-dp/src/main/java/org/dromara/service/DeviceStatusServiceImpl.java @@ -0,0 +1,178 @@ +//package org.dromara.service; +// +//import com.alibaba.fastjson.JSON; +//import com.alibaba.fastjson.JSONObject; +//import com.ruoyi.common.core.constant.HwDictConstants; +//import com.ruoyi.common.core.constant.SecurityConstants; +//import com.ruoyi.common.core.constant.TdEngineConstants; +//import com.ruoyi.common.core.domain.R; +//import com.ruoyi.common.core.utils.StringUtils; +//import com.ruoyi.dataprocess.domain.*; +//import com.ruoyi.dataprocess.mapper.HwAlarmInfoMapper; +//import com.ruoyi.dataprocess.mapper.HwDeviceMapper; +//import com.ruoyi.dataprocess.mapper.HwOfflineRuleMapper; +//import com.ruoyi.dataprocess.service.CommanHandleService; +//import com.ruoyi.dataprocess.service.IDeviceStatusService; +//import com.ruoyi.tdengine.api.RemoteTdEngineService; +//import com.ruoyi.tdengine.api.domain.DeviceStatus; +//import com.ruoyi.tdengine.api.domain.TdField; +//import com.ruoyi.tdengine.api.domain.TdTableVo; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Service; +// +//import javax.annotation.Resource; +//import java.util.ArrayList; +//import java.util.Date; +//import java.util.List; +//import java.util.Map; +// +///** +// * @Description: 设备状态处理服务 +// * @ClassName: DeviceStatusServiceImpl +// * @Author : xins +// * @Date :2023-09-12 15:31 +// * @Version :1.0 +// */ +//@Service +//public class DeviceStatusServiceImpl extends CommanHandleService implements IDeviceStatusService { +// +// @Autowired +// private HwDeviceMapper hwDeviceMapper; +// @Autowired +// private HwOfflineRuleMapper hwOfflineRuleMapper; +// @Autowired +// private HwAlarmInfoMapper hwAlarmInfoMapper; +// @Resource +// private RemoteTdEngineService remoteTdEngineService; +// +// @Override +// public void handleDeviceStatus(String payloadString, String clientId) { +//// ddd:{"msg":"设备设备连接状态信息","deviceType":"edge","connectStatus":1, +//// "statusTime":1694506127199,"deviceCode":"hw-data-process-1"} +// JSONObject json = JSON.parseObject(payloadString); +// String deviceCode = json.getString("deviceCode").toLowerCase(); +// if (clientId.equals(deviceCode)) { //校验是不是自己,如果是自己则不记录状态,返回即可。 +// return; +// } +// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode); +// if (hwDevice != null) { +// Long deviceId = hwDevice.getDeviceId(); +// TdTableVo tdTableVo = new TdTableVo(); +// tdTableVo.setDatabaseName(TdEngineConstants.PLATFORM_DB_NAME); +// tdTableVo.setTableName(TdEngineConstants.DEFAULT_DEVICE_STATUS_TABLE_NAME_PREFIX + deviceId); +// +// List schemaFields = new ArrayList<>(); +// TdField onlineStatusField = new TdField(); +// onlineStatusField.setFieldName(TdEngineConstants.ST_TAG_ONLINESTATUS); +// onlineStatusField.setFieldValue(json.getInteger("connectStatus")); +// schemaFields.add(onlineStatusField); +// +// TdField deviceTypeField = new TdField(); +// deviceTypeField.setFieldName(TdEngineConstants.ST_TAG_DEVICETYPE); +// deviceTypeField.setFieldValue(json.getString("deviceType")); +// schemaFields.add(deviceTypeField); +// +// TdField tsField = new TdField(); +// tsField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME); +// tsField.setFieldValue(json.getLong("statusTime")); +// schemaFields.add(tsField); +// +// tdTableVo.setSchemaFields(schemaFields); +// +// this.remoteTdEngineService.insertTable(tdTableVo, SecurityConstants.INNER); +// +// //更新设备当前状态信息 +// String connectStatus = String.valueOf(json.getInteger("connectStatus")); +// hwDevice.setOnlineStatus(connectStatus); +// //判断如果是否是第一次连接,更新激活状态和激活时间。 +// if (hwDevice.getActiveStatus().equals(HwDictConstants.DEVICE_ACTIVE_STATUS_INACTIVE)) { +// hwDevice.setActiveStatus(HwDictConstants.DEVICE_ACTIVE_STATUS_ACTIVE); +// hwDevice.setActiveTime(new Date()); +// } +// +// hwDeviceMapper.updateHwDevice(hwDevice); +// +// if (connectStatus.equals(HwDictConstants.DEVICE_ONLINE_STATUS_OFFLINE)) { +// this.checkOfflineAlarm(hwDevice); +// } +// +// } +// } +// +// +// /** +// * @param: device +// * @description 离线报警处理 +// * @author xins +// * @date 2023-11-14 16:01 +// */ +// private void checkOfflineAlarm(HwDevice device) { +// Long deviceId = device.getDeviceId(); +// Long sceneId = device.getSceneId(); +// +// HwOfflineRule offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleByDeviceId(deviceId); +// if (offlineRule == null) { +// offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleBySceneId(sceneId); +// } +// +// if (offlineRule != null) { +// Long offlineNumberTime = offlineRule.getOfflineNumberTime();//分钟 +// Long offlineNumber = offlineRule.getOfflineNumber(); +// +// if (offlineNumber > 1) { +// Long currentTime = System.currentTimeMillis(); +// Long startTime = currentTime - offlineNumberTime * 60 * 1000; +// DeviceStatus queryDeviceStatus = new DeviceStatus(); +// queryDeviceStatus.setDeviceId(deviceId); +// queryDeviceStatus.setStartTime(startTime); +// queryDeviceStatus.setEndTime(currentTime); +// queryDeviceStatus.setOnlineStatus(0); +// +// R>> deviceStatusListR = this.remoteTdEngineService.getDeviceStatusList(queryDeviceStatus, SecurityConstants.INNER); +// List> deviceStatusList = deviceStatusListR.getData(); +// //在几分钟时间内离线几次的报警信息保存 +// if (deviceStatusList != null && deviceStatusList.size() > offlineNumber) { +// insertHwAlarmInfo(device, offlineRule, deviceId); +// } +// } else {//只要离线就报警 +// insertHwAlarmInfo(device, offlineRule, deviceId); +// } +// } +// } +// +// private void insertHwAlarmInfo(HwDevice device, HwOfflineRule offlineRule, Long deviceId) { +// Date currentDate = new Date(); +// +// HwAlarmInfo alarmInfo = new HwAlarmInfo(); +// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_OFFLINE); +// alarmInfo.setAlarmReleatedId(offlineRule.getOfflineRuleId()); +// alarmInfo.setDeviceId(deviceId); +// alarmInfo.setTenantId(device.getTenantId()); +// alarmInfo.setSceneId(device.getSceneId()); +// alarmInfo.setMonitorUnitId(device.getMonitorUnitId()); +// alarmInfo.setAlarmLevelId(offlineRule.getAlarmLevelId()); +// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO); +// alarmInfo.setAlarmTime(currentDate); +// alarmInfo.setCreateTime(currentDate); +// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo); +// this.handleAlarmLink(offlineRule, device.getDeviceCode()); +// } +// +// /** +// * @param: offlineRule +// * @param: deviceCode +// * @description 处理报警规则联动设备 +// * @author xins +// * @date 2023-11-07 16:39 +// */ +// private void handleAlarmLink(HwOfflineRule offlineRule, String deviceCode) { +// String controlCommandTopic = StringUtils +// .format(HwDictConstants.CONTROL_COMMAND_TOPIC_VALUE, deviceCode); +// if (offlineRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) { +// List offlineRuleLinks = offlineRule.getHwAlarmRuleLinkList(); +// offlineRuleLinks.forEach(offlineRuleLink -> { +// this.publishControlCommand(controlCommandTopic, deviceCode, offlineRuleLink.getLinkDeviceFunctionIdentifier()); +// }); +// } +// } +//} diff --git a/ruoyi-modules/hwmom-dp/src/main/resources/application.yml b/ruoyi-modules/hwmom-dp/src/main/resources/application.yml new file mode 100644 index 00000000..6c724a18 --- /dev/null +++ b/ruoyi-modules/hwmom-dp/src/main/resources/application.yml @@ -0,0 +1,62 @@ +# Tomcat +server: + port: 9603 + +mqtt: + client: + ip: 127.0.0.1 # mqtt server ip地址 + port: 1883 # mqtt server 端口 + clientId: hw-data-process-1 # 客户端ID,保证唯一 + username: mica # mqtt server 认证用户名 + password: mica # mqtt server 认证密码 + timeout: 10 + keepalive: 20 + qos : 1 # qos服务质量等级,0最多交付一次,1至少交付一次,2只交付一次 + dataTopicFilter: /v1/# # 订阅的处理设备上报数据的topic + deviceStatusTopic: /device/status/v1 # 订阅的处理设备状态的topic + #domain: http://127.0.0.1:9665 # 文件服务地址 + #path: D:/ruoyi/uploadPath # base64图片转换保存的路径 + domain: http://175.27.215.92:9665 # 文件服务地址 + path: /home/ruoyi/uploadPath # base64图片转换保存的路径 + prefix: /statics + imagePatterns: jpg,jpeg,png #支持的图片格式 + +# Spring +spring: + application: + # 应用名称 + name: hwmom-dp + profiles: + # 环境配置 + active: @profiles.active@ + +--- # nacos 配置 +spring: + application: + # 应用名称 + name: hwmom-dp + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:datasource.yml + - optional:nacos:${spring.application.name}.yml + + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer diff --git a/ruoyi-modules/hwmom-mqtt/pom.xml b/ruoyi-modules/hwmom-mqtt/pom.xml new file mode 100644 index 00000000..86546a56 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/pom.xml @@ -0,0 +1,99 @@ + + + + org.dromara + ruoyi-modules + ${revision} + + 4.0.0 + + hwmom-mqtt + + + hwmom-mqtt broker模块 + + + + + + net.dreamlu + mica-mqtt-server-spring-boot-starter + 2.3.5 + + + + + org.springframework.kafka + spring-kafka + + + + org.dromara + ruoyi-common-nacos + + + + + org.dromara + ruoyi-common-security + + + + + org.dromara + ruoyi-api-resource + + + com.alibaba.fastjson2 + fastjson2 + 2.0.51 + compile + + + + + + + + + + + + + + + + + + + + + ${project.artifactId} + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 17 + 17 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java new file mode 100644 index 00000000..1061e561 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/HwMqttBrokerApplication.java @@ -0,0 +1,20 @@ +package com.hw.mqtt; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; + +@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) +public class HwMqttBrokerApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(HwMqttBrokerApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ mqtt模块启动成功 ლ(´ڡ`ლ)゙ "); + + + } + +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java new file mode 100644 index 00000000..6765b7df --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttAuthHandler.java @@ -0,0 +1,147 @@ +package com.hw.mqtt.auth; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.hw.mqtt.domain.DeviceInfoDto; +import com.hw.mqtt.enums.RedisKeys; +//import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler; + +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler; +import org.dromara.common.core.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.tio.core.ChannelContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * mqtt tcp、websocket 认证 + * @author WenJY + * @date 2023-03-14 12:19 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttAuthHandler implements IMqttServerAuthHandler { + + private final Logger logger = LoggerFactory.getLogger(MqttAuthHandler.class); + + /** + * 存储设备信息,设备接入校验 + */ + private List deviceInfoDtos; + + @Value("${broker.whitelist}") + private String clientIdWhitelist; + + private final StringRedisTemplate redisTemplate; + + public MqttAuthHandler(List deviceInfoDtos, StringRedisTemplate redisTemplate) { + this.deviceInfoDtos = deviceInfoDtos; + this.redisTemplate = redisTemplate; + this.InitDeviceInfoListByRedis(); + } + + /** + * 通过Redis初始化本地设备信息集合,只在该类初始化时执行 + */ + private void InitDeviceInfoListByRedis(){ + try{ + String jsonStr = redisTemplate.opsForValue().get(RedisKeys.CLIENT_DEVICE_INFO.getKey()); + if(StringUtils.isEmpty(jsonStr)){ + logger.warn("通过Redis获取设备信息为空"); + return; + } + this.deviceInfoDtos = JSONArray.parseArray(jsonStr, DeviceInfoDto.class); + }catch (Exception ex){ + logger.error("通过Redis获取设备信息方法处理异常:"+ex.getMessage()); + } + } + + /** + * 客户端认证逻辑实现 + * @param context ChannelContext + * @param uniqueId mqtt 内唯一id,默认和 clientId 相同 + * @param clientId 客户端 ID + * @param userName 用户名 + * @param password 密码 + * @return + */ + @Override + public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) { + // + try{ + //白名单过滤 + if(clientIdWhitelist.contains(clientId)){ + return true; + }else if (deviceInfoDtos!=null){ + Optional optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst(); + //判断本地集合中是否包含该设备信息,如果不包含再次读取Redis并初始化本地集合 + if (optionalDeviceInfoDto.isPresent()) { + DeviceInfoDto deviceInfo = optionalDeviceInfoDto.get(); + return checkDeviceInfo(deviceInfo,clientId,userName,password); + } + } + return NoDeviceInfoEvent(clientId,userName,password); + }catch (Exception ex){ + logger.error("客户端认证逻辑处理异常:"+ex.getMessage()); + } + return false; + } + + /** + * 通过本地集合未获取到设备信息时重新获取进行校验 + * @param clientId + * @param userName + * @param password + * @return + */ + private boolean NoDeviceInfoEvent(String clientId,String userName,String password){ + DeviceInfoDto deviceInfo = GetDeviceInfoByRedis(clientId); + if (deviceInfo != null) { + return checkDeviceInfo(deviceInfo,clientId,userName,password); + }else { + logger.warn("未获取到接入客户端的设备信息,禁止该设备接入"); + return false; + } + } + + /** + * 校验设备接入信息 + * @param deviceInfo + * @param clientId + * @param userName + * @param passwd + * @return + */ + private boolean checkDeviceInfo(DeviceInfoDto deviceInfo,String clientId,String userName,String passwd){ + if(Objects.equals(userName, deviceInfo.getUserName()) && Objects.equals(passwd, deviceInfo.getPassword())){ + return true; + }else { + logger.warn("接入设备:"+clientId+";账号或密码错误,禁止该设备接入"); + return false; + } + } + + /** + * 再次读取Redis重新加载本地设备信息集合内容获取设备信息,过滤出指定clientId的设备信息 + * @param clientId + * @return + */ + private DeviceInfoDto GetDeviceInfoByRedis(String clientId){ + this.InitDeviceInfoListByRedis(); + if(deviceInfoDtos!=null){ + Optional optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst(); + return optionalDeviceInfoDto.orElse(null); + }else { + return null; + } + } + +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java new file mode 100644 index 00000000..2c4a3743 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttHttpAuthFilter.java @@ -0,0 +1,37 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode; +import net.dreamlu.iot.mqtt.core.server.http.api.result.Result; +import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter; +import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.annotation.Configuration; +import org.tio.http.common.HttpRequest; +import org.tio.http.common.HttpResponse; + +/** + * mqtt http 接口认证 + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttHttpAuthFilter implements HttpFilter, InitializingBean { + + @Override + public boolean filter(HttpRequest request) throws Exception { + // 自行实现逻辑 + return true; + } + + @Override + public HttpResponse response(HttpRequest request) { + // 认证不通过时的响应 + return Result.fail(request, ResultCode.E103); + } + + @Override + public void afterPropertiesSet() throws Exception { + MqttHttpRoutes.addFilter(this); + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java new file mode 100644 index 00000000..733b1538 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttSubscribeValidator.java @@ -0,0 +1,23 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator; +import org.springframework.context.annotation.Configuration; +import org.tio.core.ChannelContext; + +/** + * 自定义订阅校验 + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttSubscribeValidator implements IMqttServerSubscribeValidator { + + @Override + public boolean isValid(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) { + // 校验客户端订阅的 topic,校验成功返回 true,失败返回 false + return true; + } + +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java new file mode 100644 index 00000000..84f5a2b8 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/auth/MqttUniqueIdService.java @@ -0,0 +1,22 @@ +package com.hw.mqtt.auth; + +import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService; +import org.springframework.context.annotation.Configuration; +import org.tio.core.ChannelContext; + +/** + * 自定义 clientId + * @author WenJY + * @date 2023-03-14 12:20 + * @return null + */ +@Configuration(proxyBeanMethods = false) +public class MqttUniqueIdService implements IMqttServerUniqueIdService { + + @Override + public String getUniqueId(ChannelContext context, String clientId, String userName, String password) { + // 返回的 uniqueId 会替代 mqtt client 传过来的 clientId,请保证返回的 uniqueId 唯一。 + return clientId; + } + +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java new file mode 100644 index 00000000..f9884a23 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/config/RedisListenerConfig.java @@ -0,0 +1,35 @@ +package com.hw.mqtt.config; + +import com.hw.mqtt.enums.RedisKeys; +import com.hw.mqtt.listener.RedisMessageListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; + +/** + * @author Wen JY + * @description: TODO + * @date 2023-09-11 15:35:20 + * @version: 1.0 + */ +@Configuration +public class RedisListenerConfig { + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, + MessageListenerAdapter msgIngoListenerAdapter) { + + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + // 可以添加多个 messageListener,配置不同的交换机 + container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey())); + return container; + } + + @Bean + MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) { + return new MessageListenerAdapter(receiver, "deviceCommand"); + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java new file mode 100644 index 00000000..c03e5e62 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/domain/DeviceInfoDto.java @@ -0,0 +1,29 @@ +package com.hw.mqtt.domain; + +import lombok.Data; + +/** + * 设备基础信息 + * @author Wen JY + * @description: TODO + * @date 2023-09-13 14:52:18 + * @version: 1.0 + */ +@Data +public class DeviceInfoDto { + + /** + * 设备编号,ClientId + */ + private String deviceCode; + + /** + * 用户名 + */ + private String userName; + + /** + * 密码 + */ + private String password; +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/ConnectStatus.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/ConnectStatus.java new file mode 100644 index 00000000..84d68a9b --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/ConnectStatus.java @@ -0,0 +1,32 @@ +package com.hw.mqtt.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * 设备连接状态枚举 + * @author Wen JY + * @description: TODO + * @date 2023-09-13 13:42:37 + * @version: 1.0 + */ +@Getter +@RequiredArgsConstructor +public enum ConnectStatus { + + /** + * 设备连接 + */ + connct(1), + + /** + * 设备断开 + */ + disconnect(0); + + private final int key; + + public int getKey(){ + return this.key; + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/DeviceType.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/DeviceType.java new file mode 100644 index 00000000..399fc127 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/DeviceType.java @@ -0,0 +1,37 @@ +package com.hw.mqtt.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * 设备类型枚举 + * @author Wen JY + * @description: TODO + * @date 2023-09-13 13:30:38 + * @version: 1.0 + */ +@Getter +@RequiredArgsConstructor +public enum DeviceType { + + /** + * 网关/基站 + */ + edge(1), + + /** + * 网关子设备/传感器 + */ + sensor(2), + + /** + * 直连设备 + */ + device(3); + + private final int key; + + public int getKey(){ + return this.key; + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/RedisKeys.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/RedisKeys.java new file mode 100644 index 00000000..f24bc8f0 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/enums/RedisKeys.java @@ -0,0 +1,63 @@ +package com.hw.mqtt.enums; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * @author WenJY + * @date 2023年03月14日 13:36 + */ +@Getter +@RequiredArgsConstructor +public enum RedisKeys { + + /** + * mqtt 服务端节点 + */ + SERVER_NODES("mqtt:server:nodes:"), + /** + * mqtt <-> redis pug/sub 集群内消息交互 + */ + REDIS_CHANNEL_EXCHANGE("mqtt:channel:exchange"), + /** + * 设备 -> 云端 redis pug/sub 上行消息通道,适合 mq 的集群模式消费 + */ + REDIS_CHANNEL_UP("mqtt:channel:up"), + /** + * 云端 -> 设备 redis pug/sub 下行数据通道,广播到 mqtt 集群 + */ + REDIS_CHANNEL_DOWN("mqtt:channel:down"), + /** + * 连接状态存储 + */ + CONNECT_STATUS("mqtt:connect:status:"), + /** + * 遗嘱消息存储 + */ + MESSAGE_STORE_WILL("mqtt:messages:will:"), + /** + * 保留消息存储 + */ + MESSAGE_STORE_RETAIN("mqtt:messages:retain:"), + + /** + * 设备信息 + */ + CLIENT_DEVICE_INFO("hw_device_info"), + ; + + + private final String key; + + /** + * 用于拼接后缀 + * + * @param suffix 后缀 + * @return 完整的 redis key + */ + public String getKey(String suffix) { + return this.key.concat(suffix); + } + +} + diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java new file mode 100644 index 00000000..fa78dfb7 --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttConnectStatusListener.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2019-2029, Dreamlu 卢春梦 (596392912@qq.com & dreamlu.net). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hw.mqtt.listener; + +import com.alibaba.fastjson2.JSONArray; +import com.hw.mqtt.enums.ConnectStatus; +import com.hw.mqtt.enums.DeviceType; +import com.hw.mqtt.enums.RedisKeys; +import net.dreamlu.iot.mqtt.core.server.MqttServerCreator; +import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; +import org.tio.core.ChannelContext; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +/** + * 状态监听器 + * 性能损失小 + * @author WenJY + * @date 2023-03-14 12:17 + * @return null + */ +@Service +public class MqttConnectStatusListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean { + private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class); + + private final ApplicationContext context; + private final StringRedisTemplate redisTemplate; + private MqttServerCreator serverCreator; + private MqttServerTemplate mqttServerTemplate; + + @Value("${broker.connectStatus.enable}") + private boolean publishConnectStatusEnable; + + @Value("${broker.connectStatus.topic}") + private String publishConnectStatusTopic; + + public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) { + this.context = context; + this.redisTemplate = redisTemplate; + } + + @Override + public void online(ChannelContext context, String clientId, String username) { + logger.info("Mqtt clientId:{} username:{} online.", clientId, username); + redisTemplate.opsForSet().add(getRedisKey(), clientId); + pushConnectStatus(clientId,ConnectStatus.connct); + } + + @Override + public void offline(ChannelContext context, String clientId, String username, String reason) { + logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason); + redisTemplate.opsForSet().remove(getRedisKey(), clientId); + pushConnectStatus(clientId,ConnectStatus.disconnect); + } + + /** + * 设备上下线存储,key 的值为 前缀:nodeName + * + * @return redis key + */ + private String getRedisKey() { + return RedisKeys.CONNECT_STATUS.getKey(serverCreator.getNodeName()); + } + + @Override + public void afterSingletonsInstantiated() { + this.serverCreator = context.getBean(MqttServerCreator.class); + this.mqttServerTemplate = context.getBean(MqttServerTemplate.class); + } + + @Override + public void destroy() throws Exception { + // 停机时删除集合 + redisTemplate.opsForSet().remove(getRedisKey()); + } + + /** + * 推送设备状态到指定主题 + * @param clientId + * @param connectStatus + */ + public void pushConnectStatus(String clientId, ConnectStatus connectStatus){ + if(publishConnectStatusEnable){ + Map entityMap = new HashMap<>(); + entityMap.put("msg","设备设备连接状态信息"); + entityMap.put("deviceType", DeviceType.edge.getKey()); + entityMap.put("deviceCode",clientId); + entityMap.put("connectStatus",connectStatus.getKey()); + entityMap.put("statusTime",System.currentTimeMillis()); + String jsonString = JSONArray.toJSONString(entityMap); + boolean result = mqttServerTemplate.publishAll(publishConnectStatusTopic, jsonString.getBytes(StandardCharsets.UTF_8)); + if(result){ + logger.info("客户端:"+clientId+";"+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送成功"); + }else { + logger.info("客户端:"+clientId+";"+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送失败"); + } + }else { + logger.info("未开启设备连接状态推送"); + } + + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java new file mode 100644 index 00000000..eb136a6e --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/MqttServerMessageListener.java @@ -0,0 +1,49 @@ +package com.hw.mqtt.listener; + +//import net.dreamlu.iot.mqtt.codec.ByteBufferUtil; + +import net.dreamlu.iot.mqtt.codec.MqttPublishMessage; +import net.dreamlu.iot.mqtt.codec.MqttQoS; +import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.SmartInitializingSingleton; +import com.hw.mqtt.service.KafkaProducerService; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; +import org.tio.core.ChannelContext; + +import java.nio.charset.StandardCharsets; + +/** + * 消息监听器 + * + * @author WenJY + * @date 2023-03-14 12:18 + * @return null + */ +@Service +public class MqttServerMessageListener implements IMqttMessageListener, SmartInitializingSingleton { + private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class); + private final ApplicationContext applicationContext; + private MqttServerTemplate mqttServerTemplate; + private final KafkaProducerService kafkaProducerService; + + public MqttServerMessageListener(ApplicationContext applicationContext, KafkaProducerService kafkaProducerService) { + this.applicationContext = applicationContext; + this.kafkaProducerService = kafkaProducerService; + } + + @Override + public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) { + logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8)); + kafkaProducerService.sendMessage("test", new String(message.getPayload(), StandardCharsets.UTF_8)); + } + + @Override + public void afterSingletonsInstantiated() { + // 单利 bean 初始化完成之后从 ApplicationContext 中获取 bean + mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class); + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java new file mode 100644 index 00000000..5d0444bc --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/listener/RedisMessageListener.java @@ -0,0 +1,58 @@ +package com.hw.mqtt.listener; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate; +import org.dromara.common.core.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +/** + * @author Wen JY + * @description: TODO + * @date 2023-09-11 15:28:18 + * @version: 1.0 + */ +@Component +public class RedisMessageListener { + + private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class); + + private final MqttServerTemplate mqttServerTemplate; + + public RedisMessageListener(MqttServerTemplate mqttServerTemplate) { + this.mqttServerTemplate = mqttServerTemplate; + } + + /** + * 订阅设备控制信息、发布设备指令 + * @param message + */ + public void deviceCommand(String message){ + try { + + if(StringUtils.isEmpty(message)){ + logger.warn("Redis订阅内容为空!!!"); + return; + } + logger.info("Redis订阅内容:"+message); + JSONObject jsonObject = JSONObject.parseObject(message); + String topic = jsonObject.get("Topic").toString(); + String payload = jsonObject.get("Payload").toString(); + boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8)); + if(publishResult){ + logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功"); + }else { + logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!"); + } + }catch (Exception e){ + logger.error("设备控制指令下发异常:"+e.getMessage()); + } + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/service/KafkaProducerService.java b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/service/KafkaProducerService.java new file mode 100644 index 00000000..eff9e3fb --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/com/hw/mqtt/service/KafkaProducerService.java @@ -0,0 +1,19 @@ +package com.hw.mqtt.service; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendMessage(String topic, String message) { + kafkaTemplate.send(topic, message); + System.out.println("Forwarded message to Kafka: " + message); + } +} diff --git a/ruoyi-modules/hwmom-mqtt/src/main/java/org/dromara/Main.java b/ruoyi-modules/hwmom-mqtt/src/main/java/org/dromara/Main.java new file mode 100644 index 00000000..7a72da2c --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/java/org/dromara/Main.java @@ -0,0 +1,17 @@ +package org.dromara; + +//TIP To Run code, press or +// click the icon in the gutter. +public class Main { + public static void main(String[] args) { + //TIP Press with your caret at the highlighted text + // to see how IntelliJ IDEA suggests fixing it. + System.out.printf("Hello and welcome!"); + + for (int i = 1; i <= 5; i++) { + //TIP Press to start debugging your code. We have set one breakpoint + // for you, but you can always add more by pressing . + System.out.println("i = " + i); + } + } +} \ No newline at end of file diff --git a/ruoyi-modules/hwmom-mqtt/src/main/resources/application.yml b/ruoyi-modules/hwmom-mqtt/src/main/resources/application.yml new file mode 100644 index 00000000..b2e33bac --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/resources/application.yml @@ -0,0 +1,60 @@ +# Tomcat +server: + port: 9605 + +# broker监听端口 +mqtt: + server: + port: 1883 # MQTT端口,默认:1883 + web-port: 8083 # http、websocket 端口,默认:8083 + +#broker相关配置 +broker: + #客户端ClientId白名单,不进行接入校验,多个ClientId用逗号隔开 + whitelist: hw-data-process-1 + #设备接入状态推送 + connectStatus: + enable: true #是否启用设备连接状态推送 + topic: /device/status/v1 #设备连接状态推送主题 + +# Spring +spring: + application: + # 应用名称 + name: hwmom-mqtt + profiles: + # 环境配置 + active: @profiles.active@ + +--- # nacos 配置 +spring: + application: + # 应用名称 + name: hw-mqtt-broker + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:datasource.yml + - optional:nacos:${spring.application.name}.yml + + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + + diff --git a/ruoyi-modules/hwmom-mqtt/src/main/resources/banner.txt b/ruoyi-modules/hwmom-mqtt/src/main/resources/banner.txt new file mode 100644 index 00000000..89e25a4f --- /dev/null +++ b/ruoyi-modules/hwmom-mqtt/src/main/resources/banner.txt @@ -0,0 +1,10 @@ + +${AnsiColor.RED} ## ## ####### ######## ######## +${AnsiColor.RED} ### ### ## ## ## ## +${AnsiColor.RED} #### #### ## ## ## ## +${AnsiColor.RED} ## ### ## ## ## ## ## +${AnsiColor.RED} ## ## ## ## ## ## ## +${AnsiColor.RED} ## ## ## ## ## ## +${AnsiColor.RED} ## ## ##### ## ## ## + +${AnsiColor.BRIGHT_BLUE}:: ${spring.application.name} :: Running Spring Boot ${spring-boot.version} 🏃🏃🏃 ${AnsiColor.DEFAULT} diff --git a/ruoyi-modules/hwmom-tsdb/pom.xml b/ruoyi-modules/hwmom-tsdb/pom.xml new file mode 100644 index 00000000..fd706974 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/pom.xml @@ -0,0 +1,100 @@ + + + 4.0.0 + + org.dromara + ruoyi-modules + ${revision} + + + hwmom-tsdb + + + hwmom-tsdb 时序数据库处理模块 + + + + + org.dromara + ruoyi-common-nacos + + + + org.dromara + ruoyi-common-security + + + + org.dromara + ruoyi-common-sentinel + + + + + org.influxdb + influxdb-java + 2.23 + + + + + + org.dromara + ruoyi-common-log + + + + org.dromara + ruoyi-common-doc + + + + org.dromara + ruoyi-common-web + + + + org.dromara + ruoyi-common-dubbo + + + + org.dromara + ruoyi-common-tenant + + + + + + + + ${project.artifactId} + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 17 + 17 + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + + + + diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/HwTsDbApplication.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/HwTsDbApplication.java new file mode 100644 index 00000000..28e97ec7 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/HwTsDbApplication.java @@ -0,0 +1,20 @@ +package org.dromara.tsdb; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; + +@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) +public class HwTsDbApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(HwTsDbApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ TSDB 时序数据库处理模块启动成功 ლ(´ڡ`ლ)゙ "); + + + } + +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java new file mode 100644 index 00000000..2e5e7f27 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/config/InfluxDbConfig.java @@ -0,0 +1,30 @@ +package org.dromara.tsdb.config; + +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class InfluxDbConfig { + + @Value("${influxdb.url}") + private String influxDBUrl; + + @Value("${influxdb.username}") + private String username; + + @Value("${influxdb.password}") + private String password; + + @Value("${influxdb.database}") + private String database; + + @Bean + public InfluxDB influxDB() { + InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, username, password); + influxDB.setDatabase(database); + return influxDB; + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java new file mode 100644 index 00000000..9c74366b --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/controller/InfluxDbController.java @@ -0,0 +1,35 @@ +package org.dromara.tsdb.controller; + +import org.dromara.tsdb.service.IInfluxDbService; +import org.influxdb.dto.QueryResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +@RestController +@RequestMapping("/influx") +public class InfluxDbController { + + @Autowired + private IInfluxDbService influxDbService; + + /** + * 写入数据到InfluxDB + */ + @PostMapping("/write") + public String writeData(@RequestParam String measurement, + @RequestParam String tagKey, + @RequestParam String tagValue, + @RequestParam String fieldKey, + @RequestParam String fieldValue) { + influxDbService.writeData(measurement, tagKey, tagValue, fieldKey, fieldValue); + return "Data written to InfluxDB successfully!"; + } + + /** + * 查询数据 + */ + @GetMapping("/query") + public QueryResult queryData(@RequestParam String query) { + return influxDbService.queryData(query); + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java new file mode 100644 index 00000000..f4f065d8 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/IInfluxDbService.java @@ -0,0 +1,25 @@ +package org.dromara.tsdb.service; + +import org.influxdb.dto.QueryResult; + +public interface IInfluxDbService { + /** + * 写入数据到InfluxDB + * + * @param measurement 表名 + * @param tagKey 标签键 + * @param tagValue 标签值 + * @param fieldKey 字段键 + * @param fieldValue 字段值 + */ + public void writeData(String measurement, String tagKey, String tagValue, String fieldKey, String fieldValue); + + /** + * 查询数据 + * + * @param query 查询语句 + * @return 查询结果 + */ + public QueryResult queryData(String query); + +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java new file mode 100644 index 00000000..6a2dc170 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/java/org/dromara/tsdb/service/impl/InfluxDbServiceImpl.java @@ -0,0 +1,50 @@ +package org.dromara.tsdb.service.impl; + +import org.dromara.tsdb.service.IInfluxDbService; +import org.influxdb.InfluxDB; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.TimeUnit; + +@Service +public class InfluxDbServiceImpl implements IInfluxDbService { + + @Autowired + private InfluxDB influxDB; + + /** + * 写入数据到InfluxDB + * + * @param measurement 表名 + * @param tagKey 标签键 + * @param tagValue 标签值 + * @param fieldKey 字段键 + * @param fieldValue 字段值 + */ + @Override + public void writeData(String measurement, String tagKey, String tagValue, String fieldKey, String fieldValue) { + Point point = Point.measurement(measurement) + .tag(tagKey, tagValue) + .addField(fieldKey, fieldValue) + .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .build(); + influxDB.write(point); + } + + /** + * 查询数据 + * + * @param query 查询语句 + * @return 查询结果 + */ + @Override + public QueryResult queryData(String query) { + QueryResult queryResult = influxDB.query(new Query(query)); + System.out.println(queryResult.getResults()); + return queryResult; + } +} diff --git a/ruoyi-modules/hwmom-tsdb/src/main/resources/application.yml b/ruoyi-modules/hwmom-tsdb/src/main/resources/application.yml new file mode 100644 index 00000000..d7731390 --- /dev/null +++ b/ruoyi-modules/hwmom-tsdb/src/main/resources/application.yml @@ -0,0 +1,37 @@ +# Tomcat +server: + port: 9608 + +# Spring +spring: + application: + # 应用名称 + name: hwmom-tsdb + profiles: + # 环境配置 + active: @profiles.active@ + +--- # nacos 配置 +spring: + application: + # 应用名称 + name: hwmom-tsdb + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + username: @nacos.username@ + password: @nacos.password@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml + - optional:nacos:datasource.yml + - optional:nacos:${spring.application.name}.yml