refactor(record): 重构阈值告警功能并优化数据去重逻辑

- 改进数据插入前的去重逻辑,增加批次内去重步骤
-调整时间基准,使用recordTime替代collectTime
- 优化数据库去重策略,将时间窗口从30分钟缩短到10分钟
- 增加对历史规则的兼容处理,支持无monitor_field的情况
- 改进日志输出,增加更多调试信息和统计
boardTest
zch 3 months ago
parent ca32ab3816
commit 8c4d541123

@ -173,6 +173,8 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
@Scheduled(cron = "0 */1 * * * ?") // 每分钟执行一次
@Override
public void checkIotenvThresholdAlarms() {
System.out.println("==== 开始执行阈值检查定时任务 ====");
long startTime = System.currentTimeMillis();
try {
// 1. 获取当天的分表名
SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd");
@ -184,16 +186,24 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
return;
}
// 2. 查询所有启用的告警规则trigger_rule=0 超过阈值)
// 2. 查询所有启用的告警规则trigger_rule=0 超过阈值)
EmsRecordAlarmRule queryRule = new EmsRecordAlarmRule();
queryRule.setTriggerRule(0L); // 0表示超过阈值
List<EmsRecordAlarmRule> alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(queryRule);
System.out.println("查询到的阈值告警规则数量: " + alarmRules.size());
for (EmsRecordAlarmRule rule : alarmRules) {
System.out.println("规则详情 - 规则ID:" + rule.getObjId() +
", 设备ID:" + rule.getMonitorId() +
", 监测字段:" + rule.getMonitorField() +
", 阈值:" + rule.getTriggerValue());
}
if (alarmRules.isEmpty()) {
System.out.println("没有启用的阈值告警规则");
return;
}
// 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则
// 注意monitor_id字段存储的实际是monitor_code值与采集数据分表的monitorId保持一致
Map<String, List<EmsRecordAlarmRule>> rulesByMonitor = alarmRules.stream()
@ -204,6 +214,12 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000);
List<RecordIotenvInstant> recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo);
System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size());
if (!recentRecords.isEmpty()) {
System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() +
" 到 " + recentRecords.get(0).getRecodeTime());
}
if (recentRecords.isEmpty()) {
System.out.println("当天分表最近2分钟无新数据: " + tableName);
return;
@ -223,43 +239,88 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
List<EmsRecordAlarmRule> rules = rulesByMonitor.get(monitorId);
if (rules == null || rules.isEmpty()) {
// 输出更详细的调试信息
if (rulesByMonitor.containsKey(monitorId)) {
System.out.println("设备 " + monitorId + " 的规则列表为空");
} else {
System.out.println("设备 " + monitorId + " 未找到对应的告警规则");
}
continue;
}
System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则");
EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId);
if (monitorInfo == null) {
System.out.println("警告:设备 " + monitorId + " 未找到设备信息");
continue;
}
// 7. 遍历该设备的所有规则,每个规则对应一个具体的监测字段
// 7. 遍历该设备的所有规则,支持新版本有monitor_field和历史版本monitor_field为NULL
for (EmsRecordAlarmRule rule : rules) {
// 检查规则的基本有效性
if (rule.getTriggerValue() == null || rule.getMonitorField() == null) {
// 检查规则的基本有效性 - 只要有阈值就可以处理
if (rule.getTriggerValue() == null) {
System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId());
continue;
}
// 验证监测字段是否符合设备类型(可选的业务校验)
if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) {
System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType() +
", 监测字段:" + rule.getMonitorField());
continue;
}
// 8. 根据规则的monitor_field精确检查对应的数据字段
BigDecimal threshold = rule.getTriggerValue();
BigDecimal actualValue = getFieldValue(record, rule.getMonitorField());
List<BigDecimal> valuesToCheck = new ArrayList<>();
List<String> fieldDescriptions = new ArrayList<>();
// 9. 阈值比较:实际值 > 阈值时触发异常
if (actualValue != null && actualValue.compareTo(threshold) > 0) {
EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString());
candidateAlarms.add(alarmData);
// 8. 根据monitor_field字段处理不同情况
if (rule.getMonitorField() != null) {
// 新版本有明确的monitor_field精确检查指定字段
System.out.println("检测到阈值超标 - 设备:" + monitorId +
", 字段:" + MONITOR_FIELD_TO_DESC.get(rule.getMonitorField()) +
", 实际值:" + actualValue +
", 阈值:" + threshold +
", 采集时间:" + record.getCollectTime());
// 验证监测字段是否符合设备类型(可选的业务校验)
if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) {
System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType() +
", 监测字段:" + rule.getMonitorField());
continue;
}
BigDecimal actualValue = getFieldValue(record, rule.getMonitorField());
if (actualValue != null) {
valuesToCheck.add(actualValue);
String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField());
fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段");
}
} else {
// 历史版本monitor_field为NULL根据设备类型检查相应的默认字段
List<BigDecimal> defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType());
List<String> defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType());
if (!defaultValues.isEmpty()) {
valuesToCheck.addAll(defaultValues);
fieldDescriptions.addAll(defaultDescriptions);
System.out.println("历史规则兼容处理 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType() +
", 检查字段数:" + defaultValues.size());
} else {
System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType());
continue;
}
}
// 9. 阈值比较:检查所有相关字段
for (int i = 0; i < valuesToCheck.size(); i++) {
BigDecimal actualValue = valuesToCheck.get(i);
String fieldDesc = fieldDescriptions.get(i);
if (actualValue != null && actualValue.compareTo(threshold) > 0) {
EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc);
candidateAlarms.add(alarmData);
System.out.println("检测到阈值超标 - 设备:" + monitorId +
", 字段:" + fieldDesc +
", 实际值:" + actualValue +
", 阈值:" + threshold +
", 设备采集时间:" + record.getCollectTime() +
", 系统记录时间:" + record.getRecodeTime());
}
}
}
}
@ -275,6 +336,9 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
} catch (Exception e) {
System.err.println("阈值检查定时任务执行异常: " + e.getMessage());
e.printStackTrace();
} finally {
long executionTime = System.currentTimeMillis() - startTime;
System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ====");
}
}
@ -306,6 +370,111 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
}
}
/**
*
*
* @param record
* @param monitorType
* @return
*/
private List<BigDecimal> getDefaultFieldValues(RecordIotenvInstant record, Long monitorType) {
List<BigDecimal> values = new ArrayList<>();
if (monitorType == null) {
return values;
}
switch (monitorType.intValue()) {
case 5: // 纯温度设备
if (record.getTemperature() != null) {
values.add(record.getTemperature());
}
break;
case 6: // 温湿度设备
if (record.getTemperature() != null) {
values.add(record.getTemperature());
}
if (record.getHumidity() != null) {
values.add(record.getHumidity());
}
break;
case 10: // 振动设备
if (record.getVibrationSpeed() != null) {
values.add(record.getVibrationSpeed());
}
if (record.getVibrationDisplacement() != null) {
values.add(record.getVibrationDisplacement());
}
if (record.getVibrationAcceleration() != null) {
values.add(record.getVibrationAcceleration());
}
if (record.getVibrationTemp() != null) {
values.add(record.getVibrationTemp());
}
break;
default:
// 其他设备类型,尝试检查所有非空字段
if (record.getTemperature() != null) {
values.add(record.getTemperature());
}
if (record.getHumidity() != null) {
values.add(record.getHumidity());
}
if (record.getIlluminance() != null) {
values.add(record.getIlluminance());
}
if (record.getNoise() != null) {
values.add(record.getNoise());
}
if (record.getConcentration() != null) {
values.add(record.getConcentration());
}
break;
}
return values;
}
/**
*
*
* @param monitorType
* @return
*/
private List<String> getDefaultFieldDescriptions(Long monitorType) {
List<String> descriptions = new ArrayList<>();
if (monitorType == null) {
return descriptions;
}
switch (monitorType.intValue()) {
case 5: // 纯温度设备
descriptions.add("温度");
break;
case 6: // 温湿度设备
descriptions.add("温度");
descriptions.add("湿度");
break;
case 10: // 振动设备
descriptions.add("振动-速度(mm/s)");
descriptions.add("振动-位移(um)");
descriptions.add("振动-加速度(g)");
descriptions.add("振动-温度(℃)");
break;
default:
// 其他设备类型的通用字段
descriptions.add("温度");
descriptions.add("湿度");
descriptions.add("照度");
descriptions.add("噪声");
descriptions.add("浓度");
break;
}
return descriptions;
}
/**
*
*
@ -342,6 +511,9 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
*
* 使cause
*
* collectTimerecordTimecollectTime
* collectTimerecordTime
*
* @param record
* @param rule
* @param actualValue
@ -350,7 +522,8 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
private EmsRecordAlarmData createAlarmData(RecordIotenvInstant record, EmsRecordAlarmRule rule, String actualValue) {
EmsRecordAlarmData alarmData = new EmsRecordAlarmData();
alarmData.setMonitorId(record.getMonitorId());
alarmData.setCollectTime(record.getCollectTime());
// 关键修改存储recordTime而非collectTime确保时间准确性
alarmData.setCollectTime(record.getRecodeTime()); // 存储系统记录时间
alarmData.setAlarmType(0L); // 0表示超过阈值
alarmData.setAlarmStatus(1L); // 1表示未处理
alarmData.setAlarmData(actualValue); // 记录实际超标数值
@ -362,32 +535,70 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
alarmData.setCreateTime(new Date());
return alarmData;
}
/**
*
*
*
* @param record
* @param rule
* @param actualValue
* @param fieldDescription
* @return
*/
private EmsRecordAlarmData createAlarmData(RecordIotenvInstant record, EmsRecordAlarmRule rule, String actualValue, String fieldDescription) {
EmsRecordAlarmData alarmData = new EmsRecordAlarmData();
alarmData.setMonitorId(record.getMonitorId());
// 关键修改存储recordTime而非collectTime确保时间准确性
alarmData.setCollectTime(record.getRecodeTime()); // 存储系统记录时间
alarmData.setAlarmType(0L); // 0表示超过阈值
alarmData.setAlarmStatus(1L); // 1表示未处理
alarmData.setAlarmData(actualValue); // 记录实际超标数值
// 使用传入的字段描述
alarmData.setCause(StringUtils.isNotEmpty(fieldDescription) ? fieldDescription : "未知字段");
alarmData.setCreateTime(new Date());
return alarmData;
}
/**
* 使
* 30
*
* 使
*
*
* 1. -
* 2. -
* 3. recordTimecollectTime
* 4. 10
*
* @param alarmDataList
* @return
*/
private int insertAlarmDataWithStrictDuplicateCheck(List<EmsRecordAlarmData> alarmDataList) {
if (alarmDataList == null || alarmDataList.isEmpty()) {
return 0;
}
// 第一层:批次内去重
List<EmsRecordAlarmData> deduplicatedList = performBatchDeduplication(alarmDataList);
System.out.println("批次内去重:原始 " + alarmDataList.size() + " 条,去重后 " + deduplicatedList.size() + " 条");
// 第二层:数据库去重并插入
int insertedCount = 0;
long fiveMinutesInMs = 5 * 60 * 1000; // 10分钟去重窗口前后各5分钟避免短时间内重复告警
// 30分钟去重窗口前后各15分钟
long fifteenMinutesInMs = 15 * 60 * 1000;
for (EmsRecordAlarmData alarmData : alarmDataList) {
for (EmsRecordAlarmData alarmData : deduplicatedList) {
try {
// 为每条异常数据检查是否已存在重复记录
Date collectTime = alarmData.getCollectTime();
if (collectTime == null) {
continue; // 跳过没有采集时间的数据
// 获取记录时间现在存储的是recordTime
Date recordTime = alarmData.getCollectTime();
if (recordTime == null) {
System.out.println("跳过无记录时间的异常数据");
continue;
}
// 构建30分钟的时间窗口
Date startTime = new Date(collectTime.getTime() - fifteenMinutesInMs);
Date endTime = new Date(collectTime.getTime() + fifteenMinutesInMs);
// 构建10分钟的时间窗口基于recordTime
Date startTime = new Date(recordTime.getTime() - fiveMinutesInMs);
Date endTime = new Date(recordTime.getTime() + fiveMinutesInMs);
// 查询数据库是否已存在相同的异常记录
Integer existingCount = emsRecordAlarmDataMapper.checkDuplicateAlarmData(
@ -404,14 +615,14 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
insertedCount++;
System.out.println("成功插入异常数据 - 设备:" + alarmData.getMonitorId() +
", 原因:" + alarmData.getCause() +
", 采集时间:" + alarmData.getCollectTime() +
", 记录时间:" + alarmData.getCollectTime() +
", 数值:" + alarmData.getAlarmData());
}
} else {
System.out.println("跳过重复异常数据 - 设备:" + alarmData.getMonitorId() +
", 原因:" + alarmData.getCause() +
", 采集时间:" + alarmData.getCollectTime() +
", 30分钟内已存在 " + existingCount + " 条相同记录");
", 记录时间:" + alarmData.getCollectTime() +
", 10分钟内已存在 " + existingCount + " 条相同记录");
}
} catch (Exception e) {
@ -423,6 +634,59 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
return insertedCount;
}
/**
*
* + +
*
*
* @param alarmDataList
* @return
*/
private List<EmsRecordAlarmData> performBatchDeduplication(List<EmsRecordAlarmData> alarmDataList) {
if (alarmDataList == null || alarmDataList.isEmpty()) {
return new ArrayList<>();
}
// 使用LinkedHashMap保持插入顺序同时去重
Map<String, EmsRecordAlarmData> uniqueRecords = new LinkedHashMap<>();
for (EmsRecordAlarmData alarmData : alarmDataList) {
// 构建唯一标识设备ID + 字段类型 + 记录时间(精确到秒)
String uniqueKey = buildUniqueKey(alarmData);
if (uniqueKey != null && !uniqueRecords.containsKey(uniqueKey)) {
uniqueRecords.put(uniqueKey, alarmData);
} else if (uniqueKey != null) {
System.out.println("批次内发现重复记录,已跳过 - " + uniqueKey);
}
}
return new ArrayList<>(uniqueRecords.values());
}
/**
*
* ID__()
*
* @param alarmData
* @return null
*/
private String buildUniqueKey(EmsRecordAlarmData alarmData) {
if (alarmData == null ||
StringUtils.isEmpty(alarmData.getMonitorId()) ||
StringUtils.isEmpty(alarmData.getCause()) ||
alarmData.getCollectTime() == null) {
return null;
}
// 将时间精确到秒,避免毫秒级的微小差异
long timeInSeconds = alarmData.getCollectTime().getTime() / 1000;
return alarmData.getMonitorId() + "_" +
alarmData.getCause() + "_" +
timeInSeconds;
}
/**
*
*

@ -164,6 +164,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</select>
<!-- 检查特定条件的异常数据是否已存在(精确去重) -->
<!-- 注意collect_time字段实际存储的是物联网数据的recordTime系统记录时间 -->
<select id="checkDuplicateAlarmData" resultType="java.lang.Integer">
SELECT COUNT(*)
FROM ems_record_alarm_data

Loading…
Cancel
Save