diff --git a/os-ems/src/main/java/com/os/ems/record/controller/EmsRecordAlarmDataController.java b/os-ems/src/main/java/com/os/ems/record/controller/EmsRecordAlarmDataController.java index ef5d3b7..1f18685 100644 --- a/os-ems/src/main/java/com/os/ems/record/controller/EmsRecordAlarmDataController.java +++ b/os-ems/src/main/java/com/os/ems/record/controller/EmsRecordAlarmDataController.java @@ -124,4 +124,38 @@ public class EmsRecordAlarmDataController extends BaseController return emsRecordAlarmDataService.getAlarmDataTotalCount(); } + /** + * 保存WebSocket告警数据(批量保存) + * 前端传入多个EmsRecordAlarmData实体,每个告警规则对应一条记录 + * + * @param alarmDataList 异常数据记录列表 + * @return 结果 + */ + @Log(title = "WebSocket告警数据", businessType = BusinessType.INSERT) + @PostMapping("/saveWebSocketAlarmData") + public AjaxResult saveWebSocketAlarmData(@RequestBody List alarmDataList) + { + if (alarmDataList == null || alarmDataList.isEmpty()) { + return error("告警数据不能为空"); + } + + try { + // 设置创建人信息 + String username = getUsername(); + for (EmsRecordAlarmData alarmData : alarmDataList) { + alarmData.setCreateBy(username); + } + + int result = emsRecordAlarmDataService.saveWebSocketAlarmDataBatch(alarmDataList); + if (result > 0) { + return success("保存成功,共保存 " + result + " 条告警记录"); + } else { + return error("保存失败,没有新增任何记录"); + } + } catch (Exception e) { + logger.error("保存WebSocket告警数据异常", e); + return error("保存失败:" + e.getMessage()); + } + } + } diff --git a/os-ems/src/main/java/com/os/ems/record/service/IEmsRecordAlarmDataService.java b/os-ems/src/main/java/com/os/ems/record/service/IEmsRecordAlarmDataService.java index 51083e4..0f65e64 100644 --- a/os-ems/src/main/java/com/os/ems/record/service/IEmsRecordAlarmDataService.java +++ b/os-ems/src/main/java/com/os/ems/record/service/IEmsRecordAlarmDataService.java @@ -69,10 +69,19 @@ public interface IEmsRecordAlarmDataService /** * 物联网数据阈值检查定时任务 */ - void checkIotenvThresholdAlarms(); +// void checkIotenvThresholdAlarms(); /** * 获取异常数据表总数(包含所有告警类型) */ public int getAlarmDataTotalCount(); + + /** + * 批量保存WebSocket告警数据 + * 处理前端传来的告警数据列表,进行去重和批量插入 + * + * @param alarmDataList 告警数据列表 + * @return 实际插入的记录数量 + */ + public int saveWebSocketAlarmDataBatch(List alarmDataList); } diff --git a/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java b/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java index bcdc0bf..a7f6a58 100644 --- a/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java +++ b/os-ems/src/main/java/com/os/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java @@ -170,206 +170,208 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService * 4. 修复数据关联:monitor_id存储的是monitor_code值,与采集数据分表保持一致 * 5. 完全重构去重逻辑:确保绝对不会产生重复数据 * 6. 支持大于阈值和小于阈值双重检查(trigger_rule: 0=大于阈值, 1=小于阈值) + * + * 注意:此方法已停用,现在完全依赖WebSocket进行实时数据推送和告警处理 */ - @Scheduled(cron = "0 */1 * * * ?") // 每分钟执行一次 - @Override - public void checkIotenvThresholdAlarms() { - System.out.println("==== 开始执行阈值检查定时任务 ===="); - long startTime = System.currentTimeMillis(); - try { - // 1. 获取当天的分表名 - SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd"); - String tableName = "record_iotenv_instant_" + tableFormat.format(new Date()); - - // 检查表是否存在 - if (!checkTableExists(tableName)) { - System.out.println("当天分表不存在: " + tableName); - return; - } - - // 2. 查询所有启用的告警规则(包括大于阈值和小于阈值) - List alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(new EmsRecordAlarmRule()); - - // 按触发规则分类 - List greaterThanRules = new ArrayList<>(); - List lessThanRules = new ArrayList<>(); - - for (EmsRecordAlarmRule rule : alarmRules) { - if (rule.getTriggerRule() != null) { - if (rule.getTriggerRule() == 0L) { - greaterThanRules.add(rule); // 大于阈值 - } else if (rule.getTriggerRule() == 1L) { - lessThanRules.add(rule); // 小于阈值 - } - } - } - - System.out.println("查询到的告警规则 - 大于阈值规则: " + greaterThanRules.size() + - " 条, 小于阈值规则: " + lessThanRules.size() + " 条"); - - if (greaterThanRules.isEmpty() && lessThanRules.isEmpty()) { - System.out.println("没有启用的阈值告警规则"); - return; - } - - // 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则 - // 注意:monitor_id字段存储的实际是monitor_code值,与采集数据分表的monitorId保持一致 - Map> rulesByMonitor = alarmRules.stream() - .filter(rule -> StringUtils.isNotEmpty(rule.getMonitorId())) - .collect(Collectors.groupingBy(EmsRecordAlarmRule::getMonitorId)); - - // 4. 只查询最近2分钟的数据,确保每分钟执行时不重复处理 - Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000); - List recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo); - - System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size()); - if (!recentRecords.isEmpty()) { - System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() + - " 到 " + recentRecords.get(0).getRecodeTime()); - } - - if (recentRecords.isEmpty()) { - System.out.println("当天分表最近2分钟无新数据: " + tableName); - return; - } - - // 5. 获取所有相关设备信息(用于验证设备类型,确保规则合理性) - // 这里需要按monitor_code查询,因为rulesByMonitor的key是monitor_code - Set monitorCodes = rulesByMonitor.keySet(); - List monitorInfos = emsBaseMonitorInfoMapper.selectMonitorInfoByCodes(new ArrayList<>(monitorCodes)); - Map monitorInfoMap = monitorInfos.stream() - .collect(Collectors.toMap(EmsBaseMonitorInfo::getMonitorCode, m -> m)); - - // 6. 处理每条记录,基于精确的字段规则进行阈值检查 - List candidateAlarms = new ArrayList<>(); - for (RecordIotenvInstant record : recentRecords) { - String monitorId = record.getMonitorId(); // 采集数据中的monitorId实际是monitor_code - List rules = rulesByMonitor.get(monitorId); - - if (rules == null || rules.isEmpty()) { - // 输出更详细的调试信息 - if (rulesByMonitor.containsKey(monitorId)) { - System.out.println("设备 " + monitorId + " 的规则列表为空"); - } else { - System.out.println("设备 " + monitorId + " 未找到对应的告警规则"); - } - continue; - } - - System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则"); - - EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId); - if (monitorInfo == null) { - System.out.println("警告:设备 " + monitorId + " 未找到设备信息"); - continue; - } - - // 7. 遍历该设备的所有规则,支持新版本(有monitor_field)和历史版本(monitor_field为NULL) - for (EmsRecordAlarmRule rule : rules) { - // 检查规则的基本有效性 - 只要有阈值就可以处理 - if (rule.getTriggerValue() == null) { - System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId()); - continue; - } - - BigDecimal threshold = rule.getTriggerValue(); - List valuesToCheck = new ArrayList<>(); - List fieldDescriptions = new ArrayList<>(); - - // 8. 根据monitor_field字段处理不同情况 - if (rule.getMonitorField() != null) { - // 新版本:有明确的monitor_field,精确检查指定字段 - - // 验证监测字段是否符合设备类型(可选的业务校验) - if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) { - System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId + - ", 设备类型:" + monitorInfo.getMonitorType() + - ", 监测字段:" + rule.getMonitorField()); - continue; - } - - BigDecimal actualValue = getFieldValue(record, rule.getMonitorField()); - if (actualValue != null) { - valuesToCheck.add(actualValue); - String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField()); - fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段"); - } - - } else { - // 历史版本:monitor_field为NULL,根据设备类型检查相应的默认字段 - List defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType()); - List defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType()); - - if (!defaultValues.isEmpty()) { - valuesToCheck.addAll(defaultValues); - fieldDescriptions.addAll(defaultDescriptions); - System.out.println("历史规则兼容处理 - 设备:" + monitorId + - ", 设备类型:" + monitorInfo.getMonitorType() + - ", 检查字段数:" + defaultValues.size()); - } else { - System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId + - ", 设备类型:" + monitorInfo.getMonitorType()); - continue; - } - } - - // 9. 阈值比较:根据触发规则检查所有相关字段 - for (int i = 0; i < valuesToCheck.size(); i++) { - BigDecimal actualValue = valuesToCheck.get(i); - String fieldDesc = fieldDescriptions.get(i); - - if (actualValue != null) { - boolean isAlarmTriggered = false; - String alarmDescription = ""; - - // 根据触发规则判断是否触发告警 - if (rule.getTriggerRule() == 0L) { - // 大于阈值检查 - if (actualValue.compareTo(threshold) > 0) { - isAlarmTriggered = true; - alarmDescription = "大于阈值"; - } - } else if (rule.getTriggerRule() == 1L) { - // 小于阈值检查 - if (actualValue.compareTo(threshold) < 0) { - isAlarmTriggered = true; - alarmDescription = "小于阈值"; - } - } - - if (isAlarmTriggered) { - EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc); - candidateAlarms.add(alarmData); - - System.out.println("检测到阈值异常 - 设备:" + monitorId + - ", 字段:" + fieldDesc + - ", 实际值:" + actualValue + - ", 阈值:" + threshold + - ", 异常类型:" + alarmDescription + - ", 设备采集时间:" + record.getCollectTime() + - ", 系统记录时间:" + record.getRecodeTime()); - } - } - } - } - } - - // 10. 使用严格的去重逻辑插入异常数据 - if (!candidateAlarms.isEmpty()) { - int insertedCount = insertAlarmDataWithStrictDuplicateCheck(candidateAlarms); - System.out.println("严格去重后实际插入异常数据: " + insertedCount + " 条"); - } else { - System.out.println("未检测到异常数据"); - } - - } catch (Exception e) { - System.err.println("阈值检查定时任务执行异常: " + e.getMessage()); - e.printStackTrace(); - } finally { - long executionTime = System.currentTimeMillis() - startTime; - System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ===="); - } - } +// @Scheduled(cron = "0 */1 * * * ?") // 已停用:完全依赖WebSocket,不再使用定时任务 +// @Override +// public void checkIotenvThresholdAlarms() { +// System.out.println("==== 开始执行阈值检查定时任务 ===="); +// long startTime = System.currentTimeMillis(); +// try { +// // 1. 获取当天的分表名 +// SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd"); +// String tableName = "record_iotenv_instant_" + tableFormat.format(new Date()); +// +// // 检查表是否存在 +// if (!checkTableExists(tableName)) { +// System.out.println("当天分表不存在: " + tableName); +// return; +// } +// +// // 2. 查询所有启用的告警规则(包括大于阈值和小于阈值) +// List alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(new EmsRecordAlarmRule()); +// +// // 按触发规则分类 +// List greaterThanRules = new ArrayList<>(); +// List lessThanRules = new ArrayList<>(); +// +// for (EmsRecordAlarmRule rule : alarmRules) { +// if (rule.getTriggerRule() != null) { +// if (rule.getTriggerRule() == 0L) { +// greaterThanRules.add(rule); // 大于阈值 +// } else if (rule.getTriggerRule() == 1L) { +// lessThanRules.add(rule); // 小于阈值 +// } +// } +// } +// +// System.out.println("查询到的告警规则 - 大于阈值规则: " + greaterThanRules.size() + +// " 条, 小于阈值规则: " + lessThanRules.size() + " 条"); +// +// if (greaterThanRules.isEmpty() && lessThanRules.isEmpty()) { +// System.out.println("没有启用的阈值告警规则"); +// return; +// } +// +// // 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则 +// // 注意:monitor_id字段存储的实际是monitor_code值,与采集数据分表的monitorId保持一致 +// Map> rulesByMonitor = alarmRules.stream() +// .filter(rule -> StringUtils.isNotEmpty(rule.getMonitorId())) +// .collect(Collectors.groupingBy(EmsRecordAlarmRule::getMonitorId)); +// +// // 4. 只查询最近2分钟的数据,确保每分钟执行时不重复处理 +// Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000); +// List recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo); +// +// System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size()); +// if (!recentRecords.isEmpty()) { +// System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() + +// " 到 " + recentRecords.get(0).getRecodeTime()); +// } +// +// if (recentRecords.isEmpty()) { +// System.out.println("当天分表最近2分钟无新数据: " + tableName); +// return; +// } +// +// // 5. 获取所有相关设备信息(用于验证设备类型,确保规则合理性) +// // 这里需要按monitor_code查询,因为rulesByMonitor的key是monitor_code +// Set monitorCodes = rulesByMonitor.keySet(); +// List monitorInfos = emsBaseMonitorInfoMapper.selectMonitorInfoByCodes(new ArrayList<>(monitorCodes)); +// Map monitorInfoMap = monitorInfos.stream() +// .collect(Collectors.toMap(EmsBaseMonitorInfo::getMonitorCode, m -> m)); +// +// // 6. 处理每条记录,基于精确的字段规则进行阈值检查 +// List candidateAlarms = new ArrayList<>(); +// for (RecordIotenvInstant record : recentRecords) { +// String monitorId = record.getMonitorId(); // 采集数据中的monitorId实际是monitor_code +// List rules = rulesByMonitor.get(monitorId); +// +// if (rules == null || rules.isEmpty()) { +// // 输出更详细的调试信息 +// if (rulesByMonitor.containsKey(monitorId)) { +// System.out.println("设备 " + monitorId + " 的规则列表为空"); +// } else { +// System.out.println("设备 " + monitorId + " 未找到对应的告警规则"); +// } +// continue; +// } +// +// System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则"); +// +// EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId); +// if (monitorInfo == null) { +// System.out.println("警告:设备 " + monitorId + " 未找到设备信息"); +// continue; +// } +// +// // 7. 遍历该设备的所有规则,支持新版本(有monitor_field)和历史版本(monitor_field为NULL) +// for (EmsRecordAlarmRule rule : rules) { +// // 检查规则的基本有效性 - 只要有阈值就可以处理 +// if (rule.getTriggerValue() == null) { +// System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId()); +// continue; +// } +// +// BigDecimal threshold = rule.getTriggerValue(); +// List valuesToCheck = new ArrayList<>(); +// List fieldDescriptions = new ArrayList<>(); +// +// // 8. 根据monitor_field字段处理不同情况 +// if (rule.getMonitorField() != null) { +// // 新版本:有明确的monitor_field,精确检查指定字段 +// +// // 验证监测字段是否符合设备类型(可选的业务校验) +// if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) { +// System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId + +// ", 设备类型:" + monitorInfo.getMonitorType() + +// ", 监测字段:" + rule.getMonitorField()); +// continue; +// } +// +// BigDecimal actualValue = getFieldValue(record, rule.getMonitorField()); +// if (actualValue != null) { +// valuesToCheck.add(actualValue); +// String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField()); +// fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段"); +// } +// +// } else { +// // 历史版本:monitor_field为NULL,根据设备类型检查相应的默认字段 +// List defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType()); +// List defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType()); +// +// if (!defaultValues.isEmpty()) { +// valuesToCheck.addAll(defaultValues); +// fieldDescriptions.addAll(defaultDescriptions); +// System.out.println("历史规则兼容处理 - 设备:" + monitorId + +// ", 设备类型:" + monitorInfo.getMonitorType() + +// ", 检查字段数:" + defaultValues.size()); +// } else { +// System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId + +// ", 设备类型:" + monitorInfo.getMonitorType()); +// continue; +// } +// } +// +// // 9. 阈值比较:根据触发规则检查所有相关字段 +// for (int i = 0; i < valuesToCheck.size(); i++) { +// BigDecimal actualValue = valuesToCheck.get(i); +// String fieldDesc = fieldDescriptions.get(i); +// +// if (actualValue != null) { +// boolean isAlarmTriggered = false; +// String alarmDescription = ""; +// +// // 根据触发规则判断是否触发告警 +// if (rule.getTriggerRule() == 0L) { +// // 大于阈值检查 +// if (actualValue.compareTo(threshold) > 0) { +// isAlarmTriggered = true; +// alarmDescription = "大于阈值"; +// } +// } else if (rule.getTriggerRule() == 1L) { +// // 小于阈值检查 +// if (actualValue.compareTo(threshold) < 0) { +// isAlarmTriggered = true; +// alarmDescription = "小于阈值"; +// } +// } +// +// if (isAlarmTriggered) { +// EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc); +// candidateAlarms.add(alarmData); +// +// System.out.println("检测到阈值异常 - 设备:" + monitorId + +// ", 字段:" + fieldDesc + +// ", 实际值:" + actualValue + +// ", 阈值:" + threshold + +// ", 异常类型:" + alarmDescription + +// ", 设备采集时间:" + record.getCollectTime() + +// ", 系统记录时间:" + record.getRecodeTime()); +// } +// } +// } +// } +// } +// +// // 10. 使用严格的去重逻辑插入异常数据 +// if (!candidateAlarms.isEmpty()) { +// int insertedCount = insertAlarmDataWithStrictDuplicateCheck(candidateAlarms); +// System.out.println("严格去重后实际插入异常数据: " + insertedCount + " 条"); +// } else { +// System.out.println("未检测到异常数据"); +// } +// +// } catch (Exception e) { +// System.err.println("阈值检查定时任务执行异常: " + e.getMessage()); +// e.printStackTrace(); +// } finally { +// long executionTime = System.currentTimeMillis() - startTime; +// System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ===="); +// } +// } /** * 根据监测字段获取记录中对应的数据值 @@ -750,4 +752,59 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService public int getAlarmDataTotalCount() { return emsRecordAlarmDataMapper.getAlarmDataTotalCount(); } + + /** + * 批量保存WebSocket告警数据 + * 处理前端传来的告警数据列表,进行去重和批量插入 + * + * @param alarmDataList 告警数据列表 + * @return 实际插入的记录数量 + */ + @Override + public int saveWebSocketAlarmDataBatch(List alarmDataList) { + if (alarmDataList == null || alarmDataList.isEmpty()) { + return 0; + } + + System.out.println("==== 开始保存WebSocket告警数据 ===="); + System.out.println("接收到告警数据数量: " + alarmDataList.size()); + + // 设置默认值和创建时间 + Date currentTime = DateUtils.getNowDate(); + for (EmsRecordAlarmData alarmData : alarmDataList) { + // 设置创建时间 + if (alarmData.getCreateTime() == null) { + alarmData.setCreateTime(currentTime); + } + + // 设置默认告警状态为未处理 + if (alarmData.getAlarmStatus() == null) { + alarmData.setAlarmStatus(1L); // 1表示未处理 + } + + // 数据完整性检查 + if (StringUtils.isEmpty(alarmData.getMonitorId())) { + System.out.println("警告:跳过monitorId为空的告警数据"); + continue; + } + + if (alarmData.getCollectTime() == null) { + System.out.println("警告:collectTime为空,使用当前时间"); + alarmData.setCollectTime(currentTime); + } + + System.out.println("准备保存告警数据 - 设备:" + alarmData.getMonitorId() + + ", 原因:" + alarmData.getCause() + + ", 数值:" + alarmData.getAlarmData() + + ", 告警类型:" + alarmData.getAlarmType()); + } + + // 复用现有的严格去重逻辑 + int insertedCount = insertAlarmDataWithStrictDuplicateCheck(alarmDataList); + + System.out.println("WebSocket告警数据保存完成,实际插入: " + insertedCount + " 条记录"); + System.out.println("==== WebSocket告警数据保存结束 ===="); + + return insertedCount; + } }