mqttbroker获取数据,转发数据到kafka——dp处理数据——保存到influxdb,整体框架搭建完成
master
xs 9 months ago
parent c50bf72060
commit 6944bbfe3e

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-modules</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hwmom-dp</artifactId>
<description>
hwmom-dp data process数据处理模块
</description>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-nacos</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-api-resource</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.51</version>
<scope>compile</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.mysql</groupId>-->
<!-- <artifactId>mysql-connector-j</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.microsoft.sqlserver</groupId>-->
<!-- <artifactId>mssql-jdbc</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.github.jsqlparser</groupId>-->
<!-- <artifactId>jsqlparser</artifactId>-->
<!-- <version>1.4</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <!-- 确保使用兼容的插件版本 -->
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,20 @@
package org.dromara.dp;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class HwDataProcessApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(HwDataProcessApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ data process数据处理模块启动成功 ლ(´ڡ`ლ)゙ ");
}
}

@ -0,0 +1,13 @@
package org.dromara.dp.kafka.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "test", groupId = "my-consumer-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}

@ -0,0 +1,685 @@
//package org.dromara.service;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONArray;
//import com.alibaba.fastjson.JSONObject;
//import com.aliyun.auth.credentials.Credential;
//import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
//import com.aliyun.sdk.service.dysmsapi20170525.AsyncClient;
//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsRequest;
//import com.aliyun.sdk.service.dysmsapi20170525.models.SendSmsResponse;
//import com.ruoyi.common.core.constant.HwDictConstants;
//import com.ruoyi.common.core.constant.SecurityConstants;
//import com.ruoyi.common.core.constant.TdEngineConstants;
//import com.ruoyi.common.core.domain.R;
//import com.ruoyi.common.core.enums.ResultEnums;
//import com.ruoyi.common.core.utils.RegexUtils;
//import com.ruoyi.common.core.utils.StringUtils;
//import com.ruoyi.dataprocess.amap.LocationVo;
//import com.ruoyi.dataprocess.amap.PositionUtils;
//import com.ruoyi.dataprocess.amap.PositionVo;
//import com.ruoyi.dataprocess.common.ImageUtils;
//import com.ruoyi.dataprocess.domain.*;
//import com.ruoyi.dataprocess.mapper.*;
//import com.ruoyi.dataprocess.service.CommanHandleService;
//import com.ruoyi.dataprocess.service.IDataProcessService;
//import com.ruoyi.tdengine.api.RemoteTdEngineService;
//import com.ruoyi.tdengine.api.domain.TdField;
//import com.ruoyi.tdengine.api.domain.TdHistorySelectDto;
//import com.ruoyi.tdengine.api.domain.TdReturnDataVo;
//import com.ruoyi.tdengine.api.domain.TdTableVo;
//import darabonba.core.client.ClientOverrideConfiguration;
//import org.apache.commons.codec.binary.Base64;
//import org.apache.poi.openxml4j.exceptions.InvalidOperationException;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import javax.script.ScriptEngine;
//import javax.script.ScriptEngineManager;
//import java.io.IOException;
//import java.math.BigDecimal;
//import java.text.SimpleDateFormat;
//import java.util.*;
//import java.util.concurrent.CompletableFuture;
//import java.util.concurrent.ExecutionException;
//import java.util.concurrent.atomic.AtomicBoolean;
//import java.util.stream.Collectors;
//
///**
// * @Description: 数据处理业务类
// * @ClassName: DataProcessServiceImpl
// * @Author : xins
// * @Date :2023-09-04 8:58
// * @Version :1.0
// */
//@Service
//public class DataProcessServiceImpl extends CommanHandleService implements IDataProcessService {
//
// private static final Logger logger = LoggerFactory.getLogger(DataProcessServiceImpl.class);
//
// private final String AccessKeyId = "LTAI5tPoTQHh8HHst2toxtGa";
// private final String AccessKeySecret = "K8OIuSNgsSnpGMJ2PdqIJFYyUqL38m";
//
// @Resource
// private RemoteTdEngineService remoteTdEngineService;
//
// @Autowired
// private HwDeviceMapper hwDeviceMapper;
//
// @Autowired
// private HwElectronicFenceMapper hwElectronicFenceMapper;
//
// @Autowired
// private HwFenceAreaMapper hwFenceAreaMapper;
//
// @Autowired
// private HwAlarmInfoMapper hwAlarmInfoMapper;
//
// @Autowired
// private HwAlarmRuleMapper hwAlarmRuleMapper;
//
// /**
// * @param: jsonData
// * @param: imagePath ruoyifile的上传地址
// * @param: imagePatterns
// * @param: imageDomain ruoyifile的domain
// * @param: imagePrefix ruoyifile的prefix
// * @param: topic 发布主题用来获取网关设备devicecode
// * @description
// * @author xins
// * @date 2023-08-31 16:16
// */
// @Override
// public int processBusinessData(String jsonData, String imagePath,
// String imagePatterns, String imageDomain, String imagePrefix, String topic) {
// JSONObject json = JSON.parseObject(jsonData);
// Long ts = json.getLong(TdEngineConstants.PAYLOAD_TS);
// String tsStr = String.valueOf(ts);
// if (tsStr.length() == 10) {
// ts = ts * 1000;
// }
// JSONArray paramArr = json.getJSONArray(TdEngineConstants.PAYLOAD_PARAM);
//// System.out.println(this.hwDeviceMapper);
//// System.out.println(this.remoteTdEngineService);
// String deviceCode = "";
// for (int i = 0; i < paramArr.size(); i++) {
// try {
// JSONObject paramJson = paramArr.getJSONObject(i);
// String dataType = paramJson.getString(TdEngineConstants.PAYLOAD_DATATYPE);
// JSONObject dataValueJson = paramJson.getJSONObject(TdEngineConstants.PAYLOAD_DATAVALUE);
// deviceCode = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_CODE).toLowerCase();
// //deviceCode = getUidByParam(json);
// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode);
// if (hwDevice == null) {
// logger.error("此设备【deviceCode:{}】不存在", deviceCode);
// continue;
// }
// if (hwDevice.getDeviceStatus().equals(HwDictConstants.DEVICE_STATUS_DELETE)) {
// logger.error("此设备【deviceCode:{}】已删除", deviceCode);
// continue;
// }
//
// Long sceneId = hwDevice.getSceneId();
// Long deviceId = hwDevice.getDeviceId();
// Long monitorUnitId = hwDevice.getMonitorUnitId();
// Long tenantId = hwDevice.getTenantId();
// String databaseName = TdEngineConstants.getDatabaseName();
// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId;
//
// dataValueJson.remove(TdEngineConstants.PAYLOAD_DEVICE_CODE);
// TdTableVo tdTableVo = new TdTableVo();
// List<TdField> schemaFields = new ArrayList<>();
// List<TdField> alarmSchemaFields = new ArrayList<>();
//
// //添加timestamp字段默认字段名称是ts协议上传的key是timestamp
// TdField firstTdField = new TdField();
// firstTdField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME);
// long currentTimeMillis = System.currentTimeMillis();
// firstTdField.setFieldValue(currentTimeMillis);
//// firstTdField.setFieldValue(ts);
// schemaFields.add(firstTdField);
//
// Object longitude = null;
// Object latitude = null;
//
// for (Map.Entry<String, Object> entry : dataValueJson.entrySet()) {
// String originalKey = entry.getKey();//原来的key
// Object value = entry.getValue();
//
// /**
// * 先对key进行转换例如对于key是value的需要转换成value1
// */
// String key = TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey) == null ? originalKey
// : TdEngineConstants.TDENGINE_KEY_TRANSFER_MAP.get(originalKey);
//
// if (key.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE)) {
// continue;
// }
//
//
// if (value instanceof String) {
// String valueStr = (String) value;
// if (StringUtils.isNotBlank(valueStr)) {
// /**
// * 先判读是否是图片,并获取图片名称, 获取到证明是图片
// */
// String[] imagePatternArr = imagePatterns.split(",");
// String extension = ImageUtils.getImageType(valueStr, imagePatternArr);
// if (StringUtils.isNotBlank(extension) && valueStr.length() > 300) {
// //保存图片,并返回图片详细地址进行赋值保存
// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension);
// if (value == null) continue;
// } else if (dataType.equalsIgnoreCase(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_IMAGE)) {
// if (ImageUtils.checkIsBase64(valueStr) && valueStr.length() > 300) {//判断没有加前缀的情况
// String type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE);
// if (type == null) {
// type = dataValueJson.getString(TdEngineConstants.PAYLOAD_DEVICE_DATA_TYPE_TYPE_FIRSET_UPPER);
// }
// extension = "jpg";
// if (type != null) {
// extension = StringUtils.isNotBlank(ImageUtils.getImageType(type, imagePatternArr))
// ? ImageUtils.getImageType(type, imagePatternArr) : extension;
// }
// value = getImageFileName(imagePath, imageDomain, imagePrefix, valueStr, deviceId, extension);
// if (value == null) continue;
// }
// }
//
// TdField tdField = new TdField();
// tdField.setFieldName(key);
// tdField.setFieldValue(value);
// schemaFields.add(tdField);
// }
// } else {
// TdField tdField = new TdField();
// tdField.setFieldName(key);
// tdField.setFieldValue(value);
// schemaFields.add(tdField);
//
// //经纬度判断
// if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LONGITUDE_IDENTIFIER)) {
// longitude = value;
// } else if (key.equalsIgnoreCase(HwDictConstants.DEFAULT_FUNCTION_LATITUDE_IDENTIFIER)) {
// latitude = value;
// } else {
// TdField alarmTdField = new TdField();
// alarmTdField.setFieldName(originalKey);
// alarmTdField.setFieldValue(value);
// alarmSchemaFields.add(alarmTdField);
// }
// }
// }
//
// tdTableVo.setDatabaseName(databaseName);
// tdTableVo.setTableName(tableName);
// tdTableVo.setSchemaFields(schemaFields);
//
// final R<?> insertResult = this.remoteTdEngineService.insertTable(tdTableVo , SecurityConstants.INNER);
// if (insertResult.getCode() == ResultEnums.SUCCESS.getCode()) {
// logger.info("Insert data result: {}", insertResult.getCode());
// } else {
// logger.error("Insert data Exception: {},data:{}", insertResult.getMsg(), jsonData);
// }
//
// //校验电子围栏
// if (longitude != null && latitude != null) {
// checkElectronicFence(deviceId, tenantId, monitorUnitId, sceneId, longitude, latitude, ts);
// }
//
// //校验设备报警信息
// checkAlarm(deviceId, alarmSchemaFields, tenantId, sceneId, monitorUnitId, topic, deviceCode);
//
// } catch (Exception e) {
// e.printStackTrace();
// logger.error("deviceCode:{},errorMsg:{},data:{}", deviceCode, e.getMessage(), jsonData);
// }
// }
//
// return paramArr.size();
// }
//
// /**
// * 获取设备UID
// * @param jsonStr
// * @return
// */
// private String getUidByParam(JSONObject jsonStr){
//
// JSONObject jsonObject = new JSONObject(jsonStr);
//
// JSONArray paramArray = jsonObject.getJSONArray("param");
//
// JSONObject firstParamObject = paramArray.getJSONObject(0);
//
// JSONObject datavalueObject = firstParamObject.getJSONObject("datavalue");
//
// String uid = datavalueObject.getString("uid");
//
// return uid;
// }
//
//
// private static String getImageFileName(String imagePath, String imageDomain, String imagePrefix, String valueStr, Long deviceId, String extension) {
// try {
// String imageFileName = ImageUtils.convertBase64ToImage(imagePath,
// valueStr, "device" + deviceId, extension);
// return imageDomain + imagePrefix + imageFileName;
//
// } catch (IOException e) {
// logger.error("转换图片错误:" + e.getMessage(), e);
// }
// return null;
// }
//
//
// /**
// * @param: deviceId
// * @param: monitorUnitId
// * @param: sceneId
// * @param: longitude
// * @param: latitude
// * @param: ts
// * @description 校验此设备电子围栏
// * @author xins
// * @date 2023-09-04 14:04
// */
// private void checkElectronicFence(Long deviceId, Long tenantId, Long monitorUnitId, Long sceneId, Object longitude, Object latitude, Long ts) {
// //根据设备ID、监控单元ID和场景ID获取所有的电子围栏配置。目前先只支持场景
//// List<HwElectronicFence> hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByDeviceId(deviceId);
//// hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesByMonitorUnitId(monitorUnitId);
// List<HwElectronicFence> hwElectronicFences = hwElectronicFenceMapper.selectElectronicFencesBySceneId(sceneId);
//
// if (StringUtils.isNotEmpty(hwElectronicFences)) {
// for (HwElectronicFence hwElectronicFence : hwElectronicFences) {
// HwFenceArea queryFenceArea = new HwFenceArea();
// queryFenceArea.setElectronicFenceId(hwElectronicFence.getElectronicFenceId());
// //获取电子围栏下配置的区域列表
// List<HwFenceArea> fenceAreas = hwFenceAreaMapper.selectHwFenceAreaList(queryFenceArea);
// String effectiveTimeFlag = hwElectronicFence.getEffectiveTimeFlag();
// String triggerStatus = hwElectronicFence.getTriggerStatus();
// if (fenceAreas != null && !fenceAreas.isEmpty()) {
// fenceAreas.forEach(fenceArea -> {
// boolean isAlarmed = false;//是否报警
// if (effectiveTimeFlag.equals(HwDictConstants.EFFECTIVE_TIME_FLAG_LONG)) {
// String areaShapeFlag = fenceArea.getAreaShapeFlag();
// String areaRange = fenceArea.getAreaRange();
// //多边形处理
// if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_POLYGN)) {
// LocationVo polygonVo = new LocationVo();
// polygonVo.setMarkerType(LocationVo.MARKER_TYPE_POLYGON);
// List<List<PositionVo>> polygonList = new ArrayList<>();
// List<PositionVo> postionList = new ArrayList<>();
// String[] areaRangeArr = areaRange.split("_");//多个点每个点的经纬度信息经度_纬度
// for (String areaRange1 : areaRangeArr) {
// String[] areaRange1Arr = areaRange1.split(",");
// String longitudeStr = areaRange1Arr[0];
// String latitudeStr = areaRange1Arr[1];
// postionList.add(new PositionVo(new BigDecimal(longitudeStr), new BigDecimal(latitudeStr)));
// polygonList.add(postionList);
// }
//
// polygonVo.setPolygonList(polygonList);
//
// /**
// * 传入的longitude和latitude是object类型本身有可能是BigDecimalDoubleLong和Integer先转成String再转成Double
// */
// boolean isWithin = PositionUtils.checkAddressInLocation(polygonVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude)));
// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警
// isAlarmed = true;
// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警
// isAlarmed = true;
// }
//
// } else if (areaShapeFlag.equals(HwDictConstants.AREA_SHAPE_FLAG_CIRCULA)) {//圆形处理
// String[] areaRangeArr = areaRange.split(",");
// String longitudeStr = areaRangeArr[0];
// String latitudeStr = areaRangeArr[1];
// String radiusStr = areaRangeArr[2];
// LocationVo circulrVo = new LocationVo();
// circulrVo.setRadius(Double.valueOf(radiusStr));
// circulrVo.setLongitude(Double.valueOf(longitudeStr));
// circulrVo.setLatitude(Double.valueOf(latitudeStr));
// circulrVo.setMarkerType(LocationVo.MARKER_TYPE_CIRCULAR);
// boolean isWithin = PositionUtils.checkAddressInLocation(circulrVo, Double.valueOf(String.valueOf(longitude)), Double.valueOf(String.valueOf(latitude)));
// if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_EXIT) && !isWithin) {//如果电子围栏配置是出界,而此设备出界则报警
// isAlarmed = true;
// } else if (triggerStatus.equals(HwDictConstants.ELECTRONIC_FENCE_TRIGGER_STATUS_ENTRY) && isWithin) {//如果电子围栏配置是入界,而此设备入界则报警
// isAlarmed = true;
// }
// }
// }
//
// if (isAlarmed) {
// HwAlarmInfo hwAralmInfo = new HwAlarmInfo();
// hwAralmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_ELECTRONIC_FENCE);
// hwAralmInfo.setAlarmReleatedId(hwElectronicFence.getElectronicFenceId());
// hwAralmInfo.setDeviceId(deviceId);
// hwAralmInfo.setMonitorUnitId(monitorUnitId);
// hwAralmInfo.setTenantId(tenantId);
// hwAralmInfo.setSceneId(sceneId);
// hwAralmInfo.setFunctionValue(longitude + "_" + latitude);
// hwAralmInfo.setTriggerStatus(triggerStatus);
// hwAralmInfo.setAlarmTime(new Date(ts));
// hwAralmInfo.setCreateTime(new Date());
// hwAralmInfo.setFenceAreaId(fenceArea.getFenceAreaId());
// hwAlarmInfoMapper.insertHwAlarmInfo(hwAralmInfo);
// }
// });
// }
// }
// }
// }
//
//
// /**
// * @param: deviceId
// * @param: alarmFields
// * @description 校验报警
// * @author xins
// * @date 2023-11-07 10:15
// */
// private void checkAlarm(Long deviceId, List<TdField> alarmFields,
// Long tenantId, Long sceneId, Long monitorUnitId, String topic, String subDeviceCode) {
//
// try {
// HwAlarmRule queryAlarmRule = new HwAlarmRule();
// queryAlarmRule.setRuleDeviceId(deviceId);
// queryAlarmRule.setRuleType(HwDictConstants.ALARM_RULE_RULE_TYPE_DEVICE);
// queryAlarmRule.setAlarmRuleStatus(HwDictConstants.ALARM_RULE_STATUS_ENABLE);
// List<HwAlarmRule> alarmRules = hwAlarmRuleMapper.selectHwAlarmRulesWithLink(queryAlarmRule);
//
// ScriptEngineManager manager = new ScriptEngineManager();
// ScriptEngine engine = manager.getEngineByName("js");
//
// if (alarmRules != null) {
// Date currentDate = new Date();
// for (HwAlarmRule alarmRule : alarmRules) {
// String triggerExpression = alarmRule.getTriggerExpression()
// .replaceAll("and", "&&")
// .replaceAll("or", "||");
// AtomicBoolean isAlarmed = new AtomicBoolean(false);
// List<HwAlarmDetail> alarmDetails = new ArrayList<HwAlarmDetail>();
//
// for (TdField alarmField : alarmFields) {
// String fieldName = alarmField.getFieldName();
// Object filedValue = alarmField.getFieldValue();
// if (triggerExpression.contains("{" + fieldName + "}")) {
// isAlarmed.set(true);
//
// /**
// * Add By 获取五分钟内的数据 WenJY 2024-07-03 14:13:21
// */
// if(alarmRule.getTriggerTimeFrame() != null && alarmRule.getTriggerTimeFrame() > 0){
// double difValue = FilterDeviceValue(deviceId, fieldName, alarmRule.getTriggerTimeFrame());
//// if(difValue > 0){
//// filedValue = difValue;
//// }
// filedValue = difValue;
// }
// /** End **/
//
// triggerExpression = triggerExpression.replaceAll("\\{" + fieldName + "\\}", String.valueOf(filedValue));
// HwAlarmDetail alarmDetail = new HwAlarmDetail();
// alarmDetail.setDeviceId(deviceId);
// alarmDetail.setFunctionIdentifier(fieldName);
// alarmDetail.setFunctionValue(String.valueOf(filedValue));
// alarmDetail.setMonitorTime(currentDate);
// alarmDetails.add(alarmDetail);
//
// }
// }
//
// if (isAlarmed.get() && !triggerExpression.contains("{")
// && RegexUtils.findSymbolInText(triggerExpression).size() > 0
// && RegexUtils.findNumberInText(triggerExpression).size() > 0) {
//
// Boolean triggerExpressionBool = (Boolean) engine.eval(triggerExpression);
// if (triggerExpressionBool) {
// HwAlarmInfo alarmInfo = new HwAlarmInfo();
// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_DEVICE);
// alarmInfo.setAlarmReleatedId(alarmRule.getAlarmRuleId());
// alarmInfo.setDeviceId(deviceId);
// alarmInfo.setMonitorUnitId(monitorUnitId);
// alarmInfo.setTenantId(tenantId);
// alarmInfo.setSceneId(sceneId);
// alarmInfo.setAlarmLevelId(alarmRule.getAlarmLevelId());
// alarmInfo.setAlarmTypeId(alarmRule.getAlarmTypeId());
// alarmInfo.setHwAlarmDetailList(alarmDetails);
// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO);
// alarmInfo.setCreateTime(currentDate);
// alarmInfo.setAlarmTime(currentDate);
// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo);
// this.insertHwAlarmDetail(alarmInfo);
// this.handleAlarmLink(alarmRule, topic, subDeviceCode);
// /**
// * Add By WenJY 下发报警信息
// */
// if(StringUtils.isNotEmpty(alarmRule.getPhoneNumbers())){
// SendAlarmInfoBySms(deviceId,alarmRule);
// }else {
// logger.warn("设备:{};报警信息发送失败,联系方式为空",deviceId);
// }
// break;
// }
// }
// }
// }
// }catch (Exception e){
// logger.error("设备:{};报警逻辑处理异常:{}",deviceId,e.getMessage());
// }
// }
//
// /**
// * 过滤指定时间段内的数据返回最大最小的差值过滤0
// * @param deviceId
// * @param fieldName
// * @param triggerTimeFrame
// * @return 差值
// */
// private double FilterDeviceValue(Long deviceId,String fieldName,long triggerTimeFrame){
// double differenceValue = 0.00;
// String databaseName = TdEngineConstants.getDatabaseName();
// String tableName = TdEngineConstants.DEFAULT_TABLE_NAME_PREFIX + deviceId;
// long currentTimeMillis = System.currentTimeMillis();
// long fiveMinutesInMillis = triggerTimeFrame * 60 * 1000;
// long beforeTimeMillis = currentTimeMillis - fiveMinutesInMillis;
//
// TdHistorySelectDto tdHistorySelectDto = new TdHistorySelectDto();
// tdHistorySelectDto.setDatabaseName(databaseName);
// tdHistorySelectDto.setTableName(tableName);
// tdHistorySelectDto.setStartTime(beforeTimeMillis);
// tdHistorySelectDto.setEndTime(currentTimeMillis);
// tdHistorySelectDto.setFirstFieldName("ts");
// R<?> result = this.remoteTdEngineService.getHistoryData(tdHistorySelectDto , SecurityConstants.INNER);
//
// if (result.getCode() == ResultEnums.SUCCESS.getCode()) {
// logger.info("Get history data result: {}", result.getCode());
//
// TdReturnDataVo data = (TdReturnDataVo) result.getData();
//
// List<Map<String, Object>> dataList = data.getDataList();
// List<Double> value = dataList.stream().map(map -> (Double)map.get("value1")).distinct().filter(x -> x > 0).collect(Collectors.toList());
//
// System.out.println(triggerTimeFrame +"分钟内数据:"+JSONArray.toJSON(value) );
//
// OptionalDouble maxOptional = value.stream().mapToDouble(Double::doubleValue).max();
// OptionalDouble minOptional = value.stream().mapToDouble(Double::doubleValue).min();
//
// if (maxOptional.isPresent() && minOptional.isPresent()) {
// double maxValue = maxOptional.getAsDouble();
// double minValue = minOptional.getAsDouble();
// differenceValue = maxValue - minValue;
// System.out.println("Difference between max and min values: " + differenceValue);
// } else {
// System.out.println("No values found to calculate difference.");
// }
//
// } else {
// logger.error("Get history data Exception: {}", result.getMsg());
// }
//
// return differenceValue;
//
// }
//
// /**
// * 新增报警详情信息信息
// *
// * @param hwAlarmInfo 报警信息对象
// */
// private void insertHwAlarmDetail(HwAlarmInfo hwAlarmInfo) {
// List<HwAlarmDetail> hwAlarmDetailList = hwAlarmInfo.getHwAlarmDetailList();
// Long alarmInfoId = hwAlarmInfo.getAlarmInfoId();
// if (StringUtils.isNotNull(hwAlarmDetailList)) {
// List<HwAlarmDetail> list = new ArrayList<HwAlarmDetail>();
// for (HwAlarmDetail hwAlarmDetail : hwAlarmDetailList) {
// hwAlarmDetail.setAlarmInfoId(alarmInfoId);
// list.add(hwAlarmDetail);
// }
// if (list.size() > 0) {
// hwAlarmInfoMapper.batchHwAlarmDetail(list);
// }
// }
// }
//
// /**
// * @param: alarmRule
// * @param: topic
// * @param: subDeviceCode
// * @description 处理报警规则联动设备
// * @author xins
// * @date 2023-11-07 16:39
// */
// private void handleAlarmLink(HwAlarmRule alarmRule, String topic, String subDeviceCode) {
// String controlCommandTopic = topic.replace(HwDictConstants.TOPIC_TYPE_DATA_POSTFIX, HwDictConstants.TOPIC_TYPE_COMMAND_POSTFIX);
// if (alarmRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) {
// List<HwAlarmRuleLink> alarmRuleLinks = alarmRule.getHwAlarmRuleLinkList();
// alarmRuleLinks.forEach(alarmRuleLink -> {
// this.publishControlCommand(controlCommandTopic, subDeviceCode, alarmRuleLink.getLinkDeviceFunctionIdentifier());
// });
// }
// }
//
// @Override
// public void testBase64(String jsonData, String imagePath, String imagePatterns, String imageDomain, String imagePrefix) {
// JSONObject json = JSON.parseObject(jsonData);
// Object value = json.get("base64");
// if (value instanceof String) {
//
// String valueStr = (String) value;
// System.out.println(Base64.isBase64(valueStr));
// if (StringUtils.isNotBlank(valueStr)) {
// /**
// * 先判读是否是图片,并获取图片名称, 获取到证明是图片
// */
// String[] imagePatternArr = imagePatterns.split(",");
// String extension = ImageUtils.getImageType(valueStr, imagePatternArr);
// if (StringUtils.isNotBlank(extension)) {
// //保存图片,并返回图片详细地址进行赋值保存
// Long deviceId = 100L;
// String imageFileName = null;
// try {
// imageFileName = ImageUtils.convertBase64ToImage(imagePath,
// valueStr, "device" + deviceId, extension);
// if (StringUtils.isNotBlank(imageFileName)) {
// String url = imageDomain + imagePrefix + imageFileName;
// System.out.println(url);
// }
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
//
// }
// }
// }
// }
//
// /**
// * 发送报警短信
// * @param deviceId
// * @param alarmRule
// */
// private void SendAlarmInfoBySms(Long deviceId,HwAlarmRule alarmRule){
// try {
// HwDevice hwDevice = hwDeviceMapper.GetDeviceById(deviceId);
// if(hwDevice != null){
// AlarmMsgTemplateParam alarmMsgTemplateParam = new AlarmMsgTemplateParam();
// alarmMsgTemplateParam.setWarning(alarmRule.getAlarmRuleName());
// alarmMsgTemplateParam.setParentname(hwDevice.getMonitorUnitName());
// alarmMsgTemplateParam.setSensorID(deviceId.toString());
// AliSmsHandle(alarmRule.getPhoneNumbers(),alarmMsgTemplateParam);
// }
// } catch (ExecutionException | InterruptedException e) {
// throw new InvalidOperationException(String.format("报警信息发送异常:%e",e.getMessage()));
// }
// }
//
// /**
// * 报警短信推送
// * @param phoneNumbers
// * @param alarmMsgTemplateParam
// * @throws ExecutionException
// * @throws InterruptedException
// */
// private void AliSmsHandle(String phoneNumbers,AlarmMsgTemplateParam alarmMsgTemplateParam ) throws ExecutionException, InterruptedException {
// StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
// .accessKeyId(AccessKeyId)
// .accessKeySecret(AccessKeySecret)
// .build());
//
// AsyncClient client = AsyncClient.builder()
// .region("cn-shenzhen") // Region ID
// .credentialsProvider(provider)
// .overrideConfiguration(
// ClientOverrideConfiguration.create()
// .setEndpointOverride("dysmsapi.aliyuncs.com")
// )
// .build();
//
// String jsonString = com.alibaba.fastjson2.JSONArray.toJSONString(alarmMsgTemplateParam);
//
// SendSmsRequest sendSmsRequest = SendSmsRequest.builder()
// .phoneNumbers(phoneNumbers)
// .signName("深圳市金瑞铭科技")
// .templateCode("SMS_468740027")
// .templateParam(jsonString)
// .build();
// logger.info("向:{};发送报警信息:{}",phoneNumbers,jsonString);
// CompletableFuture<SendSmsResponse> response = client.sendSms(sendSmsRequest);
// SendSmsResponse resp = response.get();
//
// client.close();
// }
//
//
// public static void main(String[] args) {
// System.out.println(System.currentTimeMillis());
// SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
// Date date = new Date();
// System.out.println(format.format(date));
// Object jsonData = "{\n" +
// "\n" +
// " \"timestamp\": 1689814424,\"type\": \"CMD_REPORTDATA\",\n" +
// " \"param\": [\n{\n" +
// " \"datatype\": \"cttemperature\",\n" +
// "\"datavalue\": {\n" +
// " \"uid\": \"0009340109040126\", \"alias\": \"\",\n" +
// " \"value\": 25.6,\n" +
// " \"voltage\": 3.76,\n" +
// " \"rssi\": -80\n" +
// " } },\n" +
// " {\"datatype\": \"cttemperature\",\n" +
// " \"datavalue\": {\n" +
// " \"uid1\": \"00093440109040924\",\n" +
// " \"alias\": \"\",\n" +
// " \"value\": 25.6,\n" +
// " \"voltage\": 3.64,\n" +
// " \"rssi\": -87\n" +
// " }\n" +
// " }]}";
//
// }
//}

@ -0,0 +1,178 @@
//package org.dromara.service;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.ruoyi.common.core.constant.HwDictConstants;
//import com.ruoyi.common.core.constant.SecurityConstants;
//import com.ruoyi.common.core.constant.TdEngineConstants;
//import com.ruoyi.common.core.domain.R;
//import com.ruoyi.common.core.utils.StringUtils;
//import com.ruoyi.dataprocess.domain.*;
//import com.ruoyi.dataprocess.mapper.HwAlarmInfoMapper;
//import com.ruoyi.dataprocess.mapper.HwDeviceMapper;
//import com.ruoyi.dataprocess.mapper.HwOfflineRuleMapper;
//import com.ruoyi.dataprocess.service.CommanHandleService;
//import com.ruoyi.dataprocess.service.IDeviceStatusService;
//import com.ruoyi.tdengine.api.RemoteTdEngineService;
//import com.ruoyi.tdengine.api.domain.DeviceStatus;
//import com.ruoyi.tdengine.api.domain.TdField;
//import com.ruoyi.tdengine.api.domain.TdTableVo;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.Date;
//import java.util.List;
//import java.util.Map;
//
///**
// * @Description: 设备状态处理服务
// * @ClassName: DeviceStatusServiceImpl
// * @Author : xins
// * @Date :2023-09-12 15:31
// * @Version :1.0
// */
//@Service
//public class DeviceStatusServiceImpl extends CommanHandleService implements IDeviceStatusService {
//
// @Autowired
// private HwDeviceMapper hwDeviceMapper;
// @Autowired
// private HwOfflineRuleMapper hwOfflineRuleMapper;
// @Autowired
// private HwAlarmInfoMapper hwAlarmInfoMapper;
// @Resource
// private RemoteTdEngineService remoteTdEngineService;
//
// @Override
// public void handleDeviceStatus(String payloadString, String clientId) {
//// ddd:{"msg":"设备设备连接状态信息","deviceType":"edge","connectStatus":1,
//// "statusTime":1694506127199,"deviceCode":"hw-data-process-1"}
// JSONObject json = JSON.parseObject(payloadString);
// String deviceCode = json.getString("deviceCode").toLowerCase();
// if (clientId.equals(deviceCode)) { //校验是不是自己,如果是自己则不记录状态,返回即可。
// return;
// }
// HwDevice hwDevice = hwDeviceMapper.selectHwDeviceByDeviceCode(deviceCode);
// if (hwDevice != null) {
// Long deviceId = hwDevice.getDeviceId();
// TdTableVo tdTableVo = new TdTableVo();
// tdTableVo.setDatabaseName(TdEngineConstants.PLATFORM_DB_NAME);
// tdTableVo.setTableName(TdEngineConstants.DEFAULT_DEVICE_STATUS_TABLE_NAME_PREFIX + deviceId);
//
// List<TdField> schemaFields = new ArrayList<>();
// TdField onlineStatusField = new TdField();
// onlineStatusField.setFieldName(TdEngineConstants.ST_TAG_ONLINESTATUS);
// onlineStatusField.setFieldValue(json.getInteger("connectStatus"));
// schemaFields.add(onlineStatusField);
//
// TdField deviceTypeField = new TdField();
// deviceTypeField.setFieldName(TdEngineConstants.ST_TAG_DEVICETYPE);
// deviceTypeField.setFieldValue(json.getString("deviceType"));
// schemaFields.add(deviceTypeField);
//
// TdField tsField = new TdField();
// tsField.setFieldName(TdEngineConstants.DEFAULT_FIRST_FIELD_NAME);
// tsField.setFieldValue(json.getLong("statusTime"));
// schemaFields.add(tsField);
//
// tdTableVo.setSchemaFields(schemaFields);
//
// this.remoteTdEngineService.insertTable(tdTableVo, SecurityConstants.INNER);
//
// //更新设备当前状态信息
// String connectStatus = String.valueOf(json.getInteger("connectStatus"));
// hwDevice.setOnlineStatus(connectStatus);
// //判断如果是否是第一次连接,更新激活状态和激活时间。
// if (hwDevice.getActiveStatus().equals(HwDictConstants.DEVICE_ACTIVE_STATUS_INACTIVE)) {
// hwDevice.setActiveStatus(HwDictConstants.DEVICE_ACTIVE_STATUS_ACTIVE);
// hwDevice.setActiveTime(new Date());
// }
//
// hwDeviceMapper.updateHwDevice(hwDevice);
//
// if (connectStatus.equals(HwDictConstants.DEVICE_ONLINE_STATUS_OFFLINE)) {
// this.checkOfflineAlarm(hwDevice);
// }
//
// }
// }
//
//
// /**
// * @param: device
// * @description 离线报警处理
// * @author xins
// * @date 2023-11-14 16:01
// */
// private void checkOfflineAlarm(HwDevice device) {
// Long deviceId = device.getDeviceId();
// Long sceneId = device.getSceneId();
//
// HwOfflineRule offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleByDeviceId(deviceId);
// if (offlineRule == null) {
// offlineRule = hwOfflineRuleMapper.selectHwOfflineRuleBySceneId(sceneId);
// }
//
// if (offlineRule != null) {
// Long offlineNumberTime = offlineRule.getOfflineNumberTime();//分钟
// Long offlineNumber = offlineRule.getOfflineNumber();
//
// if (offlineNumber > 1) {
// Long currentTime = System.currentTimeMillis();
// Long startTime = currentTime - offlineNumberTime * 60 * 1000;
// DeviceStatus queryDeviceStatus = new DeviceStatus();
// queryDeviceStatus.setDeviceId(deviceId);
// queryDeviceStatus.setStartTime(startTime);
// queryDeviceStatus.setEndTime(currentTime);
// queryDeviceStatus.setOnlineStatus(0);
//
// R<List<Map<String, Object>>> deviceStatusListR = this.remoteTdEngineService.getDeviceStatusList(queryDeviceStatus, SecurityConstants.INNER);
// List<Map<String, Object>> deviceStatusList = deviceStatusListR.getData();
// //在几分钟时间内离线几次的报警信息保存
// if (deviceStatusList != null && deviceStatusList.size() > offlineNumber) {
// insertHwAlarmInfo(device, offlineRule, deviceId);
// }
// } else {//只要离线就报警
// insertHwAlarmInfo(device, offlineRule, deviceId);
// }
// }
// }
//
// private void insertHwAlarmInfo(HwDevice device, HwOfflineRule offlineRule, Long deviceId) {
// Date currentDate = new Date();
//
// HwAlarmInfo alarmInfo = new HwAlarmInfo();
// alarmInfo.setAlarmInfoType(HwDictConstants.ALARM_INFO_TYPE_OFFLINE);
// alarmInfo.setAlarmReleatedId(offlineRule.getOfflineRuleId());
// alarmInfo.setDeviceId(deviceId);
// alarmInfo.setTenantId(device.getTenantId());
// alarmInfo.setSceneId(device.getSceneId());
// alarmInfo.setMonitorUnitId(device.getMonitorUnitId());
// alarmInfo.setAlarmLevelId(offlineRule.getAlarmLevelId());
// alarmInfo.setHandleStatus(HwDictConstants.ALARM_HANDLE_STATUS_NO);
// alarmInfo.setAlarmTime(currentDate);
// alarmInfo.setCreateTime(currentDate);
// hwAlarmInfoMapper.insertHwAlarmInfo(alarmInfo);
// this.handleAlarmLink(offlineRule, device.getDeviceCode());
// }
//
// /**
// * @param: offlineRule
// * @param: deviceCode
// * @description 处理报警规则联动设备
// * @author xins
// * @date 2023-11-07 16:39
// */
// private void handleAlarmLink(HwOfflineRule offlineRule, String deviceCode) {
// String controlCommandTopic = StringUtils
// .format(HwDictConstants.CONTROL_COMMAND_TOPIC_VALUE, deviceCode);
// if (offlineRule.getLinkFlag().equals(HwDictConstants.ALARM_RULE_LINK_FLAG_YES)) {
// List<HwAlarmRuleLink> offlineRuleLinks = offlineRule.getHwAlarmRuleLinkList();
// offlineRuleLinks.forEach(offlineRuleLink -> {
// this.publishControlCommand(controlCommandTopic, deviceCode, offlineRuleLink.getLinkDeviceFunctionIdentifier());
// });
// }
// }
//}

@ -0,0 +1,62 @@
# Tomcat
server:
port: 9603
mqtt:
client:
ip: 127.0.0.1 # mqtt server ip地址
port: 1883 # mqtt server 端口
clientId: hw-data-process-1 # 客户端ID保证唯一
username: mica # mqtt server 认证用户名
password: mica # mqtt server 认证密码
timeout: 10
keepalive: 20
qos : 1 # qos服务质量等级0最多交付一次1至少交付一次2只交付一次
dataTopicFilter: /v1/# # 订阅的处理设备上报数据的topic
deviceStatusTopic: /device/status/v1 # 订阅的处理设备状态的topic
#domain: http://127.0.0.1:9665 # 文件服务地址
#path: D:/ruoyi/uploadPath # base64图片转换保存的路径
domain: http://175.27.215.92:9665 # 文件服务地址
path: /home/ruoyi/uploadPath # base64图片转换保存的路径
prefix: /statics
imagePatterns: jpg,jpeg,png #支持的图片格式
# Spring
spring:
application:
# 应用名称
name: hwmom-dp
profiles:
# 环境配置
active: @profiles.active@
--- # nacos 配置
spring:
application:
# 应用名称
name: hwmom-dp
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
username: @nacos.username@
password: @nacos.password@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:datasource.yml
- optional:nacos:${spring.application.name}.yml
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-modules</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hwmom-mqtt</artifactId>
<description>
hwmom-mqtt broker模块
</description>
<dependencies>
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-server-spring-boot-starter</artifactId>
<version>2.3.5</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-nacos</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-api-resource</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.51</version>
<scope>compile</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.mysql</groupId>-->
<!-- <artifactId>mysql-connector-j</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.microsoft.sqlserver</groupId>-->
<!-- <artifactId>mssql-jdbc</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.github.jsqlparser</groupId>-->
<!-- <artifactId>jsqlparser</artifactId>-->
<!-- <version>1.4</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <!-- 确保使用兼容的插件版本 -->
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,20 @@
package com.hw.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class HwMqttBrokerApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(HwMqttBrokerApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ mqtt模块启动成功 ლ(´ڡ`ლ)゙ ");
}
}

@ -0,0 +1,147 @@
package com.hw.mqtt.auth;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.domain.DeviceInfoDto;
import com.hw.mqtt.enums.RedisKeys;
//import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import org.dromara.common.core.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.tio.core.ChannelContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* mqtt tcpwebsocket
* @author WenJY
* @date 2023-03-14 12:19
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttAuthHandler implements IMqttServerAuthHandler {
private final Logger logger = LoggerFactory.getLogger(MqttAuthHandler.class);
/**
*
*/
private List<DeviceInfoDto> deviceInfoDtos;
@Value("${broker.whitelist}")
private String clientIdWhitelist;
private final StringRedisTemplate redisTemplate;
public MqttAuthHandler(List<DeviceInfoDto> deviceInfoDtos, StringRedisTemplate redisTemplate) {
this.deviceInfoDtos = deviceInfoDtos;
this.redisTemplate = redisTemplate;
this.InitDeviceInfoListByRedis();
}
/**
* Redis
*/
private void InitDeviceInfoListByRedis(){
try{
String jsonStr = redisTemplate.opsForValue().get(RedisKeys.CLIENT_DEVICE_INFO.getKey());
if(StringUtils.isEmpty(jsonStr)){
logger.warn("通过Redis获取设备信息为空");
return;
}
this.deviceInfoDtos = JSONArray.parseArray(jsonStr, DeviceInfoDto.class);
}catch (Exception ex){
logger.error("通过Redis获取设备信息方法处理异常"+ex.getMessage());
}
}
/**
*
* @param context ChannelContext
* @param uniqueId mqtt id clientId
* @param clientId ID
* @param userName
* @param password
* @return
*/
@Override
public boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {
//
try{
//白名单过滤
if(clientIdWhitelist.contains(clientId)){
return true;
}else if (deviceInfoDtos!=null){
Optional<DeviceInfoDto> optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
//判断本地集合中是否包含该设备信息如果不包含再次读取Redis并初始化本地集合
if (optionalDeviceInfoDto.isPresent()) {
DeviceInfoDto deviceInfo = optionalDeviceInfoDto.get();
return checkDeviceInfo(deviceInfo,clientId,userName,password);
}
}
return NoDeviceInfoEvent(clientId,userName,password);
}catch (Exception ex){
logger.error("客户端认证逻辑处理异常:"+ex.getMessage());
}
return false;
}
/**
*
* @param clientId
* @param userName
* @param password
* @return
*/
private boolean NoDeviceInfoEvent(String clientId,String userName,String password){
DeviceInfoDto deviceInfo = GetDeviceInfoByRedis(clientId);
if (deviceInfo != null) {
return checkDeviceInfo(deviceInfo,clientId,userName,password);
}else {
logger.warn("未获取到接入客户端的设备信息,禁止该设备接入");
return false;
}
}
/**
*
* @param deviceInfo
* @param clientId
* @param userName
* @param passwd
* @return
*/
private boolean checkDeviceInfo(DeviceInfoDto deviceInfo,String clientId,String userName,String passwd){
if(Objects.equals(userName, deviceInfo.getUserName()) && Objects.equals(passwd, deviceInfo.getPassword())){
return true;
}else {
logger.warn("接入设备:"+clientId+";账号或密码错误,禁止该设备接入");
return false;
}
}
/**
* RedisclientId
* @param clientId
* @return
*/
private DeviceInfoDto GetDeviceInfoByRedis(String clientId){
this.InitDeviceInfoListByRedis();
if(deviceInfoDtos!=null){
Optional<DeviceInfoDto> optionalDeviceInfoDto = deviceInfoDtos.stream().distinct().filter(x -> x.getDeviceCode().equals(clientId)).findFirst();
return optionalDeviceInfoDto.orElse(null);
}else {
return null;
}
}
}

@ -0,0 +1,37 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.core.server.http.api.code.ResultCode;
import net.dreamlu.iot.mqtt.core.server.http.api.result.Result;
import net.dreamlu.iot.mqtt.core.server.http.handler.HttpFilter;
import net.dreamlu.iot.mqtt.core.server.http.handler.MqttHttpRoutes;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Configuration;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
/**
* mqtt http
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttHttpAuthFilter implements HttpFilter, InitializingBean {
@Override
public boolean filter(HttpRequest request) throws Exception {
// 自行实现逻辑
return true;
}
@Override
public HttpResponse response(HttpRequest request) {
// 认证不通过时的响应
return Result.fail(request, ResultCode.E103);
}
@Override
public void afterPropertiesSet() throws Exception {
MqttHttpRoutes.addFilter(this);
}
}

@ -0,0 +1,23 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
/**
*
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttSubscribeValidator implements IMqttServerSubscribeValidator {
@Override
public boolean isValid(ChannelContext context, String clientId, String topicFilter, MqttQoS qoS) {
// 校验客户端订阅的 topic校验成功返回 true失败返回 false
return true;
}
}

@ -0,0 +1,22 @@
package com.hw.mqtt.auth;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import org.springframework.context.annotation.Configuration;
import org.tio.core.ChannelContext;
/**
* clientId
* @author WenJY
* @date 2023-03-14 12:20
* @return null
*/
@Configuration(proxyBeanMethods = false)
public class MqttUniqueIdService implements IMqttServerUniqueIdService {
@Override
public String getUniqueId(ChannelContext context, String clientId, String userName, String password) {
// 返回的 uniqueId 会替代 mqtt client 传过来的 clientId请保证返回的 uniqueId 唯一。
return clientId;
}
}

@ -0,0 +1,35 @@
package com.hw.mqtt.config;
import com.hw.mqtt.enums.RedisKeys;
import com.hw.mqtt.listener.RedisMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:35:20
* @version: 1.0
*/
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter msgIngoListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener配置不同的交换机
container.addMessageListener(msgIngoListenerAdapter, new PatternTopic(RedisKeys.REDIS_CHANNEL_DOWN.getKey()));
return container;
}
@Bean
MessageListenerAdapter msgIngoListenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "deviceCommand");
}
}

@ -0,0 +1,29 @@
package com.hw.mqtt.domain;
import lombok.Data;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 14:52:18
* @version: 1.0
*/
@Data
public class DeviceInfoDto {
/**
* ClientId
*/
private String deviceCode;
/**
*
*/
private String userName;
/**
*
*/
private String password;
}

@ -0,0 +1,32 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 13:42:37
* @version: 1.0
*/
@Getter
@RequiredArgsConstructor
public enum ConnectStatus {
/**
*
*/
connct(1),
/**
*
*/
disconnect(0);
private final int key;
public int getKey(){
return this.key;
}
}

@ -0,0 +1,37 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
*
* @author Wen JY
* @description: TODO
* @date 2023-09-13 13:30:38
* @version: 1.0
*/
@Getter
@RequiredArgsConstructor
public enum DeviceType {
/**
* /
*/
edge(1),
/**
* /
*/
sensor(2),
/**
*
*/
device(3);
private final int key;
public int getKey(){
return this.key;
}
}

@ -0,0 +1,63 @@
package com.hw.mqtt.enums;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author WenJY
* @date 20230314 13:36
*/
@Getter
@RequiredArgsConstructor
public enum RedisKeys {
/**
* mqtt
*/
SERVER_NODES("mqtt:server:nodes:"),
/**
* mqtt <-> redis pug/sub
*/
REDIS_CHANNEL_EXCHANGE("mqtt:channel:exchange"),
/**
* -> redis pug/sub mq
*/
REDIS_CHANNEL_UP("mqtt:channel:up"),
/**
* -> redis pug/sub 广 mqtt
*/
REDIS_CHANNEL_DOWN("mqtt:channel:down"),
/**
*
*/
CONNECT_STATUS("mqtt:connect:status:"),
/**
*
*/
MESSAGE_STORE_WILL("mqtt:messages:will:"),
/**
*
*/
MESSAGE_STORE_RETAIN("mqtt:messages:retain:"),
/**
*
*/
CLIENT_DEVICE_INFO("hw_device_info"),
;
private final String key;
/**
*
*
* @param suffix
* @return redis key
*/
public String getKey(String suffix) {
return this.key.concat(suffix);
}
}

@ -0,0 +1,127 @@
/*
* Copyright (c) 2019-2029, Dreamlu (596392912@qq.com & dreamlu.net).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSONArray;
import com.hw.mqtt.enums.ConnectStatus;
import com.hw.mqtt.enums.DeviceType;
import com.hw.mqtt.enums.RedisKeys;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
*
*
* @author WenJY
* @date 2023-03-14 12:17
* @return null
*/
@Service
public class MqttConnectStatusListener implements IMqttConnectStatusListener, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);
private final ApplicationContext context;
private final StringRedisTemplate redisTemplate;
private MqttServerCreator serverCreator;
private MqttServerTemplate mqttServerTemplate;
@Value("${broker.connectStatus.enable}")
private boolean publishConnectStatusEnable;
@Value("${broker.connectStatus.topic}")
private String publishConnectStatusTopic;
public MqttConnectStatusListener(ApplicationContext context, StringRedisTemplate redisTemplate) {
this.context = context;
this.redisTemplate = redisTemplate;
}
@Override
public void online(ChannelContext context, String clientId, String username) {
logger.info("Mqtt clientId:{} username:{} online.", clientId, username);
redisTemplate.opsForSet().add(getRedisKey(), clientId);
pushConnectStatus(clientId,ConnectStatus.connct);
}
@Override
public void offline(ChannelContext context, String clientId, String username, String reason) {
logger.info("Mqtt clientId:{} username:{} offline reason:{}.", clientId, username, reason);
redisTemplate.opsForSet().remove(getRedisKey(), clientId);
pushConnectStatus(clientId,ConnectStatus.disconnect);
}
/**
* 线key :nodeName
*
* @return redis key
*/
private String getRedisKey() {
return RedisKeys.CONNECT_STATUS.getKey(serverCreator.getNodeName());
}
@Override
public void afterSingletonsInstantiated() {
this.serverCreator = context.getBean(MqttServerCreator.class);
this.mqttServerTemplate = context.getBean(MqttServerTemplate.class);
}
@Override
public void destroy() throws Exception {
// 停机时删除集合
redisTemplate.opsForSet().remove(getRedisKey());
}
/**
*
* @param clientId
* @param connectStatus
*/
public void pushConnectStatus(String clientId, ConnectStatus connectStatus){
if(publishConnectStatusEnable){
Map<String,Object> entityMap = new HashMap<>();
entityMap.put("msg","设备设备连接状态信息");
entityMap.put("deviceType", DeviceType.edge.getKey());
entityMap.put("deviceCode",clientId);
entityMap.put("connectStatus",connectStatus.getKey());
entityMap.put("statusTime",System.currentTimeMillis());
String jsonString = JSONArray.toJSONString(entityMap);
boolean result = mqttServerTemplate.publishAll(publishConnectStatusTopic, jsonString.getBytes(StandardCharsets.UTF_8));
if(result){
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送成功");
}else {
logger.info("客户端:"+clientId+""+ (connectStatus == ConnectStatus.connct ? "连接" :"断开") +"状态推送失败");
}
}else {
logger.info("未开启设备连接状态推送");
}
}
}

@ -0,0 +1,49 @@
package com.hw.mqtt.listener;
//import net.dreamlu.iot.mqtt.codec.ByteBufferUtil;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import com.hw.mqtt.service.KafkaProducerService;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import java.nio.charset.StandardCharsets;
/**
*
*
* @author WenJY
* @date 2023-03-14 12:18
* @return null
*/
@Service
public class MqttServerMessageListener implements IMqttMessageListener, SmartInitializingSingleton {
private static final Logger logger = LoggerFactory.getLogger(MqttServerMessageListener.class);
private final ApplicationContext applicationContext;
private MqttServerTemplate mqttServerTemplate;
private final KafkaProducerService kafkaProducerService;
public MqttServerMessageListener(ApplicationContext applicationContext, KafkaProducerService kafkaProducerService) {
this.applicationContext = applicationContext;
this.kafkaProducerService = kafkaProducerService;
}
@Override
public void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qos, MqttPublishMessage message) {
logger.info("context:{} clientId:{} message:{} payload:{}", context, clientId, message, new String(message.getPayload(), StandardCharsets.UTF_8));
kafkaProducerService.sendMessage("test", new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Override
public void afterSingletonsInstantiated() {
// 单利 bean 初始化完成之后从 ApplicationContext 中获取 bean
mqttServerTemplate = applicationContext.getBean(MqttServerTemplate.class);
}
}

@ -0,0 +1,58 @@
package com.hw.mqtt.listener;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.dromara.common.core.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author Wen JY
* @description: TODO
* @date 2023-09-11 15:28:18
* @version: 1.0
*/
@Component
public class RedisMessageListener {
private static final Logger logger = LoggerFactory.getLogger(RedisMessageListener.class);
private final MqttServerTemplate mqttServerTemplate;
public RedisMessageListener(MqttServerTemplate mqttServerTemplate) {
this.mqttServerTemplate = mqttServerTemplate;
}
/**
*
* @param message
*/
public void deviceCommand(String message){
try {
if(StringUtils.isEmpty(message)){
logger.warn("Redis订阅内容为空");
return;
}
logger.info("Redis订阅内容"+message);
JSONObject jsonObject = JSONObject.parseObject(message);
String topic = jsonObject.get("Topic").toString();
String payload = jsonObject.get("Payload").toString();
boolean publishResult = mqttServerTemplate.publishAll(topic, payload.getBytes(StandardCharsets.UTF_8));
if(publishResult){
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布成功");
}else {
logger.info("Topic:"+topic+";Payload:"+payload+";消息发布失败!!!");
}
}catch (Exception e){
logger.error("设备控制指令下发异常:"+e.getMessage());
}
}
}

@ -0,0 +1,19 @@
package com.hw.mqtt.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Forwarded message to Kafka: " + message);
}
}

@ -0,0 +1,17 @@
package org.dromara;
//TIP To <b>Run</b> code, press <shortcut actionId="Run"/> or
// click the <icon src="AllIcons.Actions.Execute"/> icon in the gutter.
public class Main {
public static void main(String[] args) {
//TIP Press <shortcut actionId="ShowIntentionActions"/> with your caret at the highlighted text
// to see how IntelliJ IDEA suggests fixing it.
System.out.printf("Hello and welcome!");
for (int i = 1; i <= 5; i++) {
//TIP Press <shortcut actionId="Debug"/> to start debugging your code. We have set one <icon src="AllIcons.Debugger.Db_set_breakpoint"/> breakpoint
// for you, but you can always add more by pressing <shortcut actionId="ToggleLineBreakpoint"/>.
System.out.println("i = " + i);
}
}
}

@ -0,0 +1,60 @@
# Tomcat
server:
port: 9605
# broker监听端口
mqtt:
server:
port: 1883 # MQTT端口默认1883
web-port: 8083 # http、websocket 端口默认8083
#broker相关配置
broker:
#客户端ClientId白名单不进行接入校验,多个ClientId用逗号隔开
whitelist: hw-data-process-1
#设备接入状态推送
connectStatus:
enable: true #是否启用设备连接状态推送
topic: /device/status/v1 #设备连接状态推送主题
# Spring
spring:
application:
# 应用名称
name: hwmom-mqtt
profiles:
# 环境配置
active: @profiles.active@
--- # nacos 配置
spring:
application:
# 应用名称
name: hw-mqtt-broker
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
username: @nacos.username@
password: @nacos.password@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:datasource.yml
- optional:nacos:${spring.application.name}.yml
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

@ -0,0 +1,10 @@
${AnsiColor.RED} ## ## ####### ######## ########
${AnsiColor.RED} ### ### ## ## ## ##
${AnsiColor.RED} #### #### ## ## ## ##
${AnsiColor.RED} ## ### ## ## ## ## ##
${AnsiColor.RED} ## ## ## ## ## ## ##
${AnsiColor.RED} ## ## ## ## ## ##
${AnsiColor.RED} ## ## ##### ## ## ##
${AnsiColor.BRIGHT_BLUE}:: ${spring.application.name} :: Running Spring Boot ${spring-boot.version} 🏃🏃🏃 ${AnsiColor.DEFAULT}

@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-modules</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hwmom-tsdb</artifactId>
<description>
hwmom-tsdb 时序数据库处理模块
</description>
<dependencies>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-nacos</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sentinel</artifactId>
</dependency>
<!-- InfluxDB Java Client -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.23</version>
</dependency>
<!-- RuoYi Common Log -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-log</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-doc</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-web</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-tenant</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version> <!-- 确保使用兼容的插件版本 -->
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,20 @@
package org.dromara.tsdb;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class HwTsDbApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(HwTsDbApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ TSDB 时序数据库处理模块启动成功 ლ(´ڡ`ლ)゙ ");
}
}

@ -0,0 +1,30 @@
package org.dromara.tsdb.config;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDbConfig {
@Value("${influxdb.url}")
private String influxDBUrl;
@Value("${influxdb.username}")
private String username;
@Value("${influxdb.password}")
private String password;
@Value("${influxdb.database}")
private String database;
@Bean
public InfluxDB influxDB() {
InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, username, password);
influxDB.setDatabase(database);
return influxDB;
}
}

@ -0,0 +1,35 @@
package org.dromara.tsdb.controller;
import org.dromara.tsdb.service.IInfluxDbService;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/influx")
public class InfluxDbController {
@Autowired
private IInfluxDbService influxDbService;
/**
* InfluxDB
*/
@PostMapping("/write")
public String writeData(@RequestParam String measurement,
@RequestParam String tagKey,
@RequestParam String tagValue,
@RequestParam String fieldKey,
@RequestParam String fieldValue) {
influxDbService.writeData(measurement, tagKey, tagValue, fieldKey, fieldValue);
return "Data written to InfluxDB successfully!";
}
/**
*
*/
@GetMapping("/query")
public QueryResult queryData(@RequestParam String query) {
return influxDbService.queryData(query);
}
}

@ -0,0 +1,25 @@
package org.dromara.tsdb.service;
import org.influxdb.dto.QueryResult;
public interface IInfluxDbService {
/**
* InfluxDB
*
* @param measurement
* @param tagKey
* @param tagValue
* @param fieldKey
* @param fieldValue
*/
public void writeData(String measurement, String tagKey, String tagValue, String fieldKey, String fieldValue);
/**
*
*
* @param query
* @return
*/
public QueryResult queryData(String query);
}

@ -0,0 +1,50 @@
package org.dromara.tsdb.service.impl;
import org.dromara.tsdb.service.IInfluxDbService;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class InfluxDbServiceImpl implements IInfluxDbService {
@Autowired
private InfluxDB influxDB;
/**
* InfluxDB
*
* @param measurement
* @param tagKey
* @param tagValue
* @param fieldKey
* @param fieldValue
*/
@Override
public void writeData(String measurement, String tagKey, String tagValue, String fieldKey, String fieldValue) {
Point point = Point.measurement(measurement)
.tag(tagKey, tagValue)
.addField(fieldKey, fieldValue)
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.build();
influxDB.write(point);
}
/**
*
*
* @param query
* @return
*/
@Override
public QueryResult queryData(String query) {
QueryResult queryResult = influxDB.query(new Query(query));
System.out.println(queryResult.getResults());
return queryResult;
}
}

@ -0,0 +1,37 @@
# Tomcat
server:
port: 9608
# Spring
spring:
application:
# 应用名称
name: hwmom-tsdb
profiles:
# 环境配置
active: @profiles.active@
--- # nacos 配置
spring:
application:
# 应用名称
name: hwmom-tsdb
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
username: @nacos.username@
password: @nacos.password@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml
- optional:nacos:datasource.yml
- optional:nacos:${spring.application.name}.yml
Loading…
Cancel
Save