diff --git a/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java b/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java index 46335b5..2a446f0 100644 --- a/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java +++ b/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java @@ -173,6 +173,8 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService @Scheduled(cron = "0 */1 * * * ?") // 每分钟执行一次 @Override public void checkIotenvThresholdAlarms() { + System.out.println("==== 开始执行阈值检查定时任务 ===="); + long startTime = System.currentTimeMillis(); try { // 1. 获取当天的分表名 SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd"); @@ -184,16 +186,24 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService return; } - // 2. 查询所有启用的告警规则(trigger_rule=0 超过阈值) + // 2. 查询所有启用的告警规则(trigger_rule=0 超过阈值) EmsRecordAlarmRule queryRule = new EmsRecordAlarmRule(); queryRule.setTriggerRule(0L); // 0表示超过阈值 List alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(queryRule); + System.out.println("查询到的阈值告警规则数量: " + alarmRules.size()); + for (EmsRecordAlarmRule rule : alarmRules) { + System.out.println("规则详情 - 规则ID:" + rule.getObjId() + + ", 设备ID:" + rule.getMonitorId() + + ", 监测字段:" + rule.getMonitorField() + + ", 阈值:" + rule.getTriggerValue()); + } + if (alarmRules.isEmpty()) { System.out.println("没有启用的阈值告警规则"); return; } - + // 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则 // 注意:monitor_id字段存储的实际是monitor_code值,与采集数据分表的monitorId保持一致 Map> rulesByMonitor = alarmRules.stream() @@ -204,6 +214,12 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000); List recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo); + System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size()); + if (!recentRecords.isEmpty()) { + System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() + + " 到 " + recentRecords.get(0).getRecodeTime()); + } + if (recentRecords.isEmpty()) { System.out.println("当天分表最近2分钟无新数据: " + tableName); return; @@ -223,43 +239,88 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService List rules = rulesByMonitor.get(monitorId); if (rules == null || rules.isEmpty()) { + // 输出更详细的调试信息 + if (rulesByMonitor.containsKey(monitorId)) { + System.out.println("设备 " + monitorId + " 的规则列表为空"); + } else { + System.out.println("设备 " + monitorId + " 未找到对应的告警规则"); + } continue; } + System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则"); + EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId); if (monitorInfo == null) { + System.out.println("警告:设备 " + monitorId + " 未找到设备信息"); continue; } - // 7. 遍历该设备的所有规则,每个规则对应一个具体的监测字段 + // 7. 遍历该设备的所有规则,支持新版本(有monitor_field)和历史版本(monitor_field为NULL) for (EmsRecordAlarmRule rule : rules) { - // 检查规则的基本有效性 - if (rule.getTriggerValue() == null || rule.getMonitorField() == null) { + // 检查规则的基本有效性 - 只要有阈值就可以处理 + if (rule.getTriggerValue() == null) { + System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId()); continue; } - // 验证监测字段是否符合设备类型(可选的业务校验) - if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) { - System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId + - ", 设备类型:" + monitorInfo.getMonitorType() + - ", 监测字段:" + rule.getMonitorField()); - continue; - } - - // 8. 根据规则的monitor_field精确检查对应的数据字段 BigDecimal threshold = rule.getTriggerValue(); - BigDecimal actualValue = getFieldValue(record, rule.getMonitorField()); + List valuesToCheck = new ArrayList<>(); + List fieldDescriptions = new ArrayList<>(); - // 9. 阈值比较:实际值 > 阈值时触发异常 - if (actualValue != null && actualValue.compareTo(threshold) > 0) { - EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString()); - candidateAlarms.add(alarmData); + // 8. 根据monitor_field字段处理不同情况 + if (rule.getMonitorField() != null) { + // 新版本:有明确的monitor_field,精确检查指定字段 - System.out.println("检测到阈值超标 - 设备:" + monitorId + - ", 字段:" + MONITOR_FIELD_TO_DESC.get(rule.getMonitorField()) + - ", 实际值:" + actualValue + - ", 阈值:" + threshold + - ", 采集时间:" + record.getCollectTime()); + // 验证监测字段是否符合设备类型(可选的业务校验) + if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) { + System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId + + ", 设备类型:" + monitorInfo.getMonitorType() + + ", 监测字段:" + rule.getMonitorField()); + continue; + } + + BigDecimal actualValue = getFieldValue(record, rule.getMonitorField()); + if (actualValue != null) { + valuesToCheck.add(actualValue); + String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField()); + fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段"); + } + + } else { + // 历史版本:monitor_field为NULL,根据设备类型检查相应的默认字段 + List defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType()); + List defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType()); + + if (!defaultValues.isEmpty()) { + valuesToCheck.addAll(defaultValues); + fieldDescriptions.addAll(defaultDescriptions); + System.out.println("历史规则兼容处理 - 设备:" + monitorId + + ", 设备类型:" + monitorInfo.getMonitorType() + + ", 检查字段数:" + defaultValues.size()); + } else { + System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId + + ", 设备类型:" + monitorInfo.getMonitorType()); + continue; + } + } + + // 9. 阈值比较:检查所有相关字段 + for (int i = 0; i < valuesToCheck.size(); i++) { + BigDecimal actualValue = valuesToCheck.get(i); + String fieldDesc = fieldDescriptions.get(i); + + if (actualValue != null && actualValue.compareTo(threshold) > 0) { + EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc); + candidateAlarms.add(alarmData); + + System.out.println("检测到阈值超标 - 设备:" + monitorId + + ", 字段:" + fieldDesc + + ", 实际值:" + actualValue + + ", 阈值:" + threshold + + ", 设备采集时间:" + record.getCollectTime() + + ", 系统记录时间:" + record.getRecodeTime()); + } } } } @@ -275,6 +336,9 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService } catch (Exception e) { System.err.println("阈值检查定时任务执行异常: " + e.getMessage()); e.printStackTrace(); + } finally { + long executionTime = System.currentTimeMillis() - startTime; + System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ===="); } } @@ -306,6 +370,111 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService } } + /** + * 根据设备类型获取默认的检查字段值(用于历史规则兼容) + * + * @param record 物联网数据记录 + * @param monitorType 设备类型 + * @return 需要检查的字段值列表 + */ + private List getDefaultFieldValues(RecordIotenvInstant record, Long monitorType) { + List values = new ArrayList<>(); + + if (monitorType == null) { + return values; + } + + switch (monitorType.intValue()) { + case 5: // 纯温度设备 + if (record.getTemperature() != null) { + values.add(record.getTemperature()); + } + break; + case 6: // 温湿度设备 + if (record.getTemperature() != null) { + values.add(record.getTemperature()); + } + if (record.getHumidity() != null) { + values.add(record.getHumidity()); + } + break; + case 10: // 振动设备 + if (record.getVibrationSpeed() != null) { + values.add(record.getVibrationSpeed()); + } + if (record.getVibrationDisplacement() != null) { + values.add(record.getVibrationDisplacement()); + } + if (record.getVibrationAcceleration() != null) { + values.add(record.getVibrationAcceleration()); + } + if (record.getVibrationTemp() != null) { + values.add(record.getVibrationTemp()); + } + break; + default: + // 其他设备类型,尝试检查所有非空字段 + if (record.getTemperature() != null) { + values.add(record.getTemperature()); + } + if (record.getHumidity() != null) { + values.add(record.getHumidity()); + } + if (record.getIlluminance() != null) { + values.add(record.getIlluminance()); + } + if (record.getNoise() != null) { + values.add(record.getNoise()); + } + if (record.getConcentration() != null) { + values.add(record.getConcentration()); + } + break; + } + + return values; + } + + /** + * 根据设备类型获取默认的字段描述(用于历史规则兼容) + * + * @param monitorType 设备类型 + * @return 字段描述列表 + */ + private List getDefaultFieldDescriptions(Long monitorType) { + List descriptions = new ArrayList<>(); + + if (monitorType == null) { + return descriptions; + } + + switch (monitorType.intValue()) { + case 5: // 纯温度设备 + descriptions.add("温度"); + break; + case 6: // 温湿度设备 + descriptions.add("温度"); + descriptions.add("湿度"); + break; + case 10: // 振动设备 + descriptions.add("振动-速度(mm/s)"); + descriptions.add("振动-位移(um)"); + descriptions.add("振动-加速度(g)"); + descriptions.add("振动-温度(℃)"); + break; + default: + // 其他设备类型的通用字段 + descriptions.add("温度"); + descriptions.add("湿度"); + descriptions.add("照度"); + descriptions.add("噪声"); + descriptions.add("浓度"); + break; + } + + return descriptions; + } + /** * 验证监测字段是否符合设备类型 * 业务规则校验,确保规则配置的合理性 @@ -342,6 +511,9 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService * 创建异常数据对象 * 使用字典描述作为cause,提高可读性 * + * 重要修改:collectTime字段存储物联网数据的recordTime而非collectTime + * 原因:设备的collectTime可能不准确,recordTime是系统记录时间,更可靠 + * * @param record 物联网数据记录 * @param rule 告警规则 * @param actualValue 实际数值 @@ -350,7 +522,8 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService private EmsRecordAlarmData createAlarmData(RecordIotenvInstant record, EmsRecordAlarmRule rule, String actualValue) { EmsRecordAlarmData alarmData = new EmsRecordAlarmData(); alarmData.setMonitorId(record.getMonitorId()); - alarmData.setCollectTime(record.getCollectTime()); + // 关键修改:存储recordTime而非collectTime,确保时间准确性 + alarmData.setCollectTime(record.getRecodeTime()); // 存储系统记录时间 alarmData.setAlarmType(0L); // 0表示超过阈值 alarmData.setAlarmStatus(1L); // 1表示未处理 alarmData.setAlarmData(actualValue); // 记录实际超标数值 @@ -362,32 +535,70 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService alarmData.setCreateTime(new Date()); return alarmData; } + + /** + * 创建异常数据对象(带自定义字段描述) + * 用于历史规则兼容,支持自定义字段描述 + * + * @param record 物联网数据记录 + * @param rule 告警规则 + * @param actualValue 实际数值 + * @param fieldDescription 字段描述 + * @return 异常数据对象 + */ + private EmsRecordAlarmData createAlarmData(RecordIotenvInstant record, EmsRecordAlarmRule rule, String actualValue, String fieldDescription) { + EmsRecordAlarmData alarmData = new EmsRecordAlarmData(); + alarmData.setMonitorId(record.getMonitorId()); + // 关键修改:存储recordTime而非collectTime,确保时间准确性 + alarmData.setCollectTime(record.getRecodeTime()); // 存储系统记录时间 + alarmData.setAlarmType(0L); // 0表示超过阈值 + alarmData.setAlarmStatus(1L); // 1表示未处理 + alarmData.setAlarmData(actualValue); // 记录实际超标数值 + + // 使用传入的字段描述 + alarmData.setCause(StringUtils.isNotEmpty(fieldDescription) ? fieldDescription : "未知字段"); + + alarmData.setCreateTime(new Date()); + return alarmData; + } /** - * 使用严格的去重逻辑插入异常数据 - * 防重复策略:同一设备的同一字段在30分钟内只记录一次异常 - * 每条数据插入前都会检查数据库是否已存在相同记录 + * 使用严格的两层去重逻辑插入异常数据 + * + * 重构说明: + * 1. 第一层:批次内去重 - 对同一批候选数据进行去重,避免同批次内的重复 + * 2. 第二层:数据库去重 - 与数据库已有记录进行重复检查 + * 3. 时间基准修正:基于recordTime(存储在collectTime字段中)进行时间窗口计算 + * 4. 防重复策略:同一设备的同一字段在10分钟内只记录一次异常 * * @param alarmDataList 待插入的异常数据列表 * @return 实际插入的异常数据数量 */ private int insertAlarmDataWithStrictDuplicateCheck(List alarmDataList) { + if (alarmDataList == null || alarmDataList.isEmpty()) { + return 0; + } + + // 第一层:批次内去重 + List deduplicatedList = performBatchDeduplication(alarmDataList); + System.out.println("批次内去重:原始 " + alarmDataList.size() + " 条,去重后 " + deduplicatedList.size() + " 条"); + + // 第二层:数据库去重并插入 int insertedCount = 0; + long fiveMinutesInMs = 5 * 60 * 1000; // 10分钟去重窗口(前后各5分钟),避免短时间内重复告警 - // 30分钟去重窗口(前后各15分钟) - long fifteenMinutesInMs = 15 * 60 * 1000; - - for (EmsRecordAlarmData alarmData : alarmDataList) { + for (EmsRecordAlarmData alarmData : deduplicatedList) { try { - // 为每条异常数据检查是否已存在重复记录 - Date collectTime = alarmData.getCollectTime(); - if (collectTime == null) { - continue; // 跳过没有采集时间的数据 + // 获取记录时间(现在存储的是recordTime) + Date recordTime = alarmData.getCollectTime(); + if (recordTime == null) { + System.out.println("跳过无记录时间的异常数据"); + continue; } - // 构建30分钟的时间窗口 - Date startTime = new Date(collectTime.getTime() - fifteenMinutesInMs); - Date endTime = new Date(collectTime.getTime() + fifteenMinutesInMs); + // 构建10分钟的时间窗口(基于recordTime) + Date startTime = new Date(recordTime.getTime() - fiveMinutesInMs); + Date endTime = new Date(recordTime.getTime() + fiveMinutesInMs); // 查询数据库是否已存在相同的异常记录 Integer existingCount = emsRecordAlarmDataMapper.checkDuplicateAlarmData( @@ -404,14 +615,14 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService insertedCount++; System.out.println("成功插入异常数据 - 设备:" + alarmData.getMonitorId() + ", 原因:" + alarmData.getCause() + - ", 采集时间:" + alarmData.getCollectTime() + + ", 记录时间:" + alarmData.getCollectTime() + ", 数值:" + alarmData.getAlarmData()); } } else { System.out.println("跳过重复异常数据 - 设备:" + alarmData.getMonitorId() + ", 原因:" + alarmData.getCause() + - ", 采集时间:" + alarmData.getCollectTime() + - ", 30分钟内已存在 " + existingCount + " 条相同记录"); + ", 记录时间:" + alarmData.getCollectTime() + + ", 10分钟内已存在 " + existingCount + " 条相同记录"); } } catch (Exception e) { @@ -423,6 +634,59 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService return insertedCount; } + /** + * 执行批次内去重 + * 去重规则:相同设备 + 相同字段类型 + 相同记录时间(精确到秒)的记录视为重复 + * 保留第一条记录,丢弃后续重复记录 + * + * @param alarmDataList 原始异常数据列表 + * @return 去重后的异常数据列表 + */ + private List performBatchDeduplication(List alarmDataList) { + if (alarmDataList == null || alarmDataList.isEmpty()) { + return new ArrayList<>(); + } + + // 使用LinkedHashMap保持插入顺序,同时去重 + Map uniqueRecords = new LinkedHashMap<>(); + + for (EmsRecordAlarmData alarmData : alarmDataList) { + // 构建唯一标识:设备ID + 字段类型 + 记录时间(精确到秒) + String uniqueKey = buildUniqueKey(alarmData); + + if (uniqueKey != null && !uniqueRecords.containsKey(uniqueKey)) { + uniqueRecords.put(uniqueKey, alarmData); + } else if (uniqueKey != null) { + System.out.println("批次内发现重复记录,已跳过 - " + uniqueKey); + } + } + + return new ArrayList<>(uniqueRecords.values()); + } + + /** + * 构建异常数据的唯一标识 + * 格式:设备ID_字段类型_记录时间戳(秒) + * + * @param alarmData 异常数据 + * @return 唯一标识字符串,如果数据不完整返回null + */ + private String buildUniqueKey(EmsRecordAlarmData alarmData) { + if (alarmData == null || + StringUtils.isEmpty(alarmData.getMonitorId()) || + StringUtils.isEmpty(alarmData.getCause()) || + alarmData.getCollectTime() == null) { + return null; + } + + // 将时间精确到秒,避免毫秒级的微小差异 + long timeInSeconds = alarmData.getCollectTime().getTime() / 1000; + + return alarmData.getMonitorId() + "_" + + alarmData.getCause() + "_" + + timeInSeconds; + } + /** * 检查数据库表是否存在 * diff --git a/os-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml b/os-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml index 6e7ada2..f222af6 100644 --- a/os-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml +++ b/os-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml @@ -164,6 +164,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" +