feat(ems): 实现 WebSocket告警数据批量保存功能

- 新增 saveWebSocketAlarmData 方法处理 WebSocket 告警数据
- 实现 saveWebSocketAlarmDataBatch 方法进行批量保存
- 优化数据完整性检查和默认值设置
- 复用严格去重逻辑确保数据唯一性
- 完全停用定时任务检查阈值告警,转而依赖 WebSocket 实时推送
boardTest
zch 4 weeks ago
parent fcee8e3347
commit 2a1af76338

@ -124,4 +124,38 @@ public class EmsRecordAlarmDataController extends BaseController
return emsRecordAlarmDataService.getAlarmDataTotalCount();
}
/**
* WebSocket
* EmsRecordAlarmData
*
* @param alarmDataList
* @return
*/
@Log(title = "WebSocket告警数据", businessType = BusinessType.INSERT)
@PostMapping("/saveWebSocketAlarmData")
public AjaxResult saveWebSocketAlarmData(@RequestBody List<EmsRecordAlarmData> alarmDataList)
{
if (alarmDataList == null || alarmDataList.isEmpty()) {
return error("告警数据不能为空");
}
try {
// 设置创建人信息
String username = getUsername();
for (EmsRecordAlarmData alarmData : alarmDataList) {
alarmData.setCreateBy(username);
}
int result = emsRecordAlarmDataService.saveWebSocketAlarmDataBatch(alarmDataList);
if (result > 0) {
return success("保存成功,共保存 " + result + " 条告警记录");
} else {
return error("保存失败,没有新增任何记录");
}
} catch (Exception e) {
logger.error("保存WebSocket告警数据异常", e);
return error("保存失败:" + e.getMessage());
}
}
}

@ -69,10 +69,19 @@ public interface IEmsRecordAlarmDataService
/**
*
*/
void checkIotenvThresholdAlarms();
// void checkIotenvThresholdAlarms();
/**
*
*/
public int getAlarmDataTotalCount();
/**
* WebSocket
*
*
* @param alarmDataList
* @return
*/
public int saveWebSocketAlarmDataBatch(List<EmsRecordAlarmData> alarmDataList);
}

@ -170,206 +170,208 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
* 4. monitor_idmonitor_code
* 5.
* 6. trigger_rule: 0=, 1=
*
* WebSocket
*/
@Scheduled(cron = "0 */1 * * * ?") // 每分钟执行一次
@Override
public void checkIotenvThresholdAlarms() {
System.out.println("==== 开始执行阈值检查定时任务 ====");
long startTime = System.currentTimeMillis();
try {
// 1. 获取当天的分表名
SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd");
String tableName = "record_iotenv_instant_" + tableFormat.format(new Date());
// 检查表是否存在
if (!checkTableExists(tableName)) {
System.out.println("当天分表不存在: " + tableName);
return;
}
// 2. 查询所有启用的告警规则(包括大于阈值和小于阈值)
List<EmsRecordAlarmRule> alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(new EmsRecordAlarmRule());
// 按触发规则分类
List<EmsRecordAlarmRule> greaterThanRules = new ArrayList<>();
List<EmsRecordAlarmRule> lessThanRules = new ArrayList<>();
for (EmsRecordAlarmRule rule : alarmRules) {
if (rule.getTriggerRule() != null) {
if (rule.getTriggerRule() == 0L) {
greaterThanRules.add(rule); // 大于阈值
} else if (rule.getTriggerRule() == 1L) {
lessThanRules.add(rule); // 小于阈值
}
}
}
System.out.println("查询到的告警规则 - 大于阈值规则: " + greaterThanRules.size() +
" 条, 小于阈值规则: " + lessThanRules.size() + " 条");
if (greaterThanRules.isEmpty() && lessThanRules.isEmpty()) {
System.out.println("没有启用的阈值告警规则");
return;
}
// 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则
// 注意monitor_id字段存储的实际是monitor_code值与采集数据分表的monitorId保持一致
Map<String, List<EmsRecordAlarmRule>> rulesByMonitor = alarmRules.stream()
.filter(rule -> StringUtils.isNotEmpty(rule.getMonitorId()))
.collect(Collectors.groupingBy(EmsRecordAlarmRule::getMonitorId));
// 4. 只查询最近2分钟的数据确保每分钟执行时不重复处理
Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000);
List<RecordIotenvInstant> recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo);
System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size());
if (!recentRecords.isEmpty()) {
System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() +
" 到 " + recentRecords.get(0).getRecodeTime());
}
if (recentRecords.isEmpty()) {
System.out.println("当天分表最近2分钟无新数据: " + tableName);
return;
}
// 5. 获取所有相关设备信息(用于验证设备类型,确保规则合理性)
// 这里需要按monitor_code查询因为rulesByMonitor的key是monitor_code
Set<String> monitorCodes = rulesByMonitor.keySet();
List<EmsBaseMonitorInfo> monitorInfos = emsBaseMonitorInfoMapper.selectMonitorInfoByCodes(new ArrayList<>(monitorCodes));
Map<String, EmsBaseMonitorInfo> monitorInfoMap = monitorInfos.stream()
.collect(Collectors.toMap(EmsBaseMonitorInfo::getMonitorCode, m -> m));
// 6. 处理每条记录,基于精确的字段规则进行阈值检查
List<EmsRecordAlarmData> candidateAlarms = new ArrayList<>();
for (RecordIotenvInstant record : recentRecords) {
String monitorId = record.getMonitorId(); // 采集数据中的monitorId实际是monitor_code
List<EmsRecordAlarmRule> rules = rulesByMonitor.get(monitorId);
if (rules == null || rules.isEmpty()) {
// 输出更详细的调试信息
if (rulesByMonitor.containsKey(monitorId)) {
System.out.println("设备 " + monitorId + " 的规则列表为空");
} else {
System.out.println("设备 " + monitorId + " 未找到对应的告警规则");
}
continue;
}
System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则");
EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId);
if (monitorInfo == null) {
System.out.println("警告:设备 " + monitorId + " 未找到设备信息");
continue;
}
// 7. 遍历该设备的所有规则支持新版本有monitor_field和历史版本monitor_field为NULL
for (EmsRecordAlarmRule rule : rules) {
// 检查规则的基本有效性 - 只要有阈值就可以处理
if (rule.getTriggerValue() == null) {
System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId());
continue;
}
BigDecimal threshold = rule.getTriggerValue();
List<BigDecimal> valuesToCheck = new ArrayList<>();
List<String> fieldDescriptions = new ArrayList<>();
// 8. 根据monitor_field字段处理不同情况
if (rule.getMonitorField() != null) {
// 新版本有明确的monitor_field精确检查指定字段
// 验证监测字段是否符合设备类型(可选的业务校验)
if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) {
System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType() +
", 监测字段:" + rule.getMonitorField());
continue;
}
BigDecimal actualValue = getFieldValue(record, rule.getMonitorField());
if (actualValue != null) {
valuesToCheck.add(actualValue);
String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField());
fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段");
}
} else {
// 历史版本monitor_field为NULL根据设备类型检查相应的默认字段
List<BigDecimal> defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType());
List<String> defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType());
if (!defaultValues.isEmpty()) {
valuesToCheck.addAll(defaultValues);
fieldDescriptions.addAll(defaultDescriptions);
System.out.println("历史规则兼容处理 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType() +
", 检查字段数:" + defaultValues.size());
} else {
System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId +
", 设备类型:" + monitorInfo.getMonitorType());
continue;
}
}
// 9. 阈值比较:根据触发规则检查所有相关字段
for (int i = 0; i < valuesToCheck.size(); i++) {
BigDecimal actualValue = valuesToCheck.get(i);
String fieldDesc = fieldDescriptions.get(i);
if (actualValue != null) {
boolean isAlarmTriggered = false;
String alarmDescription = "";
// 根据触发规则判断是否触发告警
if (rule.getTriggerRule() == 0L) {
// 大于阈值检查
if (actualValue.compareTo(threshold) > 0) {
isAlarmTriggered = true;
alarmDescription = "大于阈值";
}
} else if (rule.getTriggerRule() == 1L) {
// 小于阈值检查
if (actualValue.compareTo(threshold) < 0) {
isAlarmTriggered = true;
alarmDescription = "小于阈值";
}
}
if (isAlarmTriggered) {
EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc);
candidateAlarms.add(alarmData);
System.out.println("检测到阈值异常 - 设备:" + monitorId +
", 字段:" + fieldDesc +
", 实际值:" + actualValue +
", 阈值:" + threshold +
", 异常类型:" + alarmDescription +
", 设备采集时间:" + record.getCollectTime() +
", 系统记录时间:" + record.getRecodeTime());
}
}
}
}
}
// 10. 使用严格的去重逻辑插入异常数据
if (!candidateAlarms.isEmpty()) {
int insertedCount = insertAlarmDataWithStrictDuplicateCheck(candidateAlarms);
System.out.println("严格去重后实际插入异常数据: " + insertedCount + " 条");
} else {
System.out.println("未检测到异常数据");
}
} catch (Exception e) {
System.err.println("阈值检查定时任务执行异常: " + e.getMessage());
e.printStackTrace();
} finally {
long executionTime = System.currentTimeMillis() - startTime;
System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ====");
}
}
// @Scheduled(cron = "0 */1 * * * ?") // 已停用完全依赖WebSocket不再使用定时任务
// @Override
// public void checkIotenvThresholdAlarms() {
// System.out.println("==== 开始执行阈值检查定时任务 ====");
// long startTime = System.currentTimeMillis();
// try {
// // 1. 获取当天的分表名
// SimpleDateFormat tableFormat = new SimpleDateFormat("yyyyMMdd");
// String tableName = "record_iotenv_instant_" + tableFormat.format(new Date());
//
// // 检查表是否存在
// if (!checkTableExists(tableName)) {
// System.out.println("当天分表不存在: " + tableName);
// return;
// }
//
// // 2. 查询所有启用的告警规则(包括大于阈值和小于阈值)
// List<EmsRecordAlarmRule> alarmRules = emsRecordAlarmRuleMapper.selectEmsRecordAlarmRuleList(new EmsRecordAlarmRule());
//
// // 按触发规则分类
// List<EmsRecordAlarmRule> greaterThanRules = new ArrayList<>();
// List<EmsRecordAlarmRule> lessThanRules = new ArrayList<>();
//
// for (EmsRecordAlarmRule rule : alarmRules) {
// if (rule.getTriggerRule() != null) {
// if (rule.getTriggerRule() == 0L) {
// greaterThanRules.add(rule); // 大于阈值
// } else if (rule.getTriggerRule() == 1L) {
// lessThanRules.add(rule); // 小于阈值
// }
// }
// }
//
// System.out.println("查询到的告警规则 - 大于阈值规则: " + greaterThanRules.size() +
// " 条, 小于阈值规则: " + lessThanRules.size() + " 条");
//
// if (greaterThanRules.isEmpty() && lessThanRules.isEmpty()) {
// System.out.println("没有启用的阈值告警规则");
// return;
// }
//
// // 3. 按设备编码分组告警规则,每个设备可能有多个字段的规则
// // 注意monitor_id字段存储的实际是monitor_code值与采集数据分表的monitorId保持一致
// Map<String, List<EmsRecordAlarmRule>> rulesByMonitor = alarmRules.stream()
// .filter(rule -> StringUtils.isNotEmpty(rule.getMonitorId()))
// .collect(Collectors.groupingBy(EmsRecordAlarmRule::getMonitorId));
//
// // 4. 只查询最近2分钟的数据确保每分钟执行时不重复处理
// Date twoMinutesAgo = new Date(System.currentTimeMillis() - 2 * 60 * 1000);
// List<RecordIotenvInstant> recentRecords = recordIotenvInstantMapper.selectRecentRecordsFromTable(tableName, twoMinutesAgo);
//
// System.out.println("从分表 " + tableName + " 查询到最近2分钟的数据条数: " + recentRecords.size());
// if (!recentRecords.isEmpty()) {
// System.out.println("数据时间范围: " + recentRecords.get(recentRecords.size()-1).getRecodeTime() +
// " 到 " + recentRecords.get(0).getRecodeTime());
// }
//
// if (recentRecords.isEmpty()) {
// System.out.println("当天分表最近2分钟无新数据: " + tableName);
// return;
// }
//
// // 5. 获取所有相关设备信息(用于验证设备类型,确保规则合理性)
// // 这里需要按monitor_code查询因为rulesByMonitor的key是monitor_code
// Set<String> monitorCodes = rulesByMonitor.keySet();
// List<EmsBaseMonitorInfo> monitorInfos = emsBaseMonitorInfoMapper.selectMonitorInfoByCodes(new ArrayList<>(monitorCodes));
// Map<String, EmsBaseMonitorInfo> monitorInfoMap = monitorInfos.stream()
// .collect(Collectors.toMap(EmsBaseMonitorInfo::getMonitorCode, m -> m));
//
// // 6. 处理每条记录,基于精确的字段规则进行阈值检查
// List<EmsRecordAlarmData> candidateAlarms = new ArrayList<>();
// for (RecordIotenvInstant record : recentRecords) {
// String monitorId = record.getMonitorId(); // 采集数据中的monitorId实际是monitor_code
// List<EmsRecordAlarmRule> rules = rulesByMonitor.get(monitorId);
//
// if (rules == null || rules.isEmpty()) {
// // 输出更详细的调试信息
// if (rulesByMonitor.containsKey(monitorId)) {
// System.out.println("设备 " + monitorId + " 的规则列表为空");
// } else {
// System.out.println("设备 " + monitorId + " 未找到对应的告警规则");
// }
// continue;
// }
//
// System.out.println("处理设备 " + monitorId + " 的数据,找到 " + rules.size() + " 条规则");
//
// EmsBaseMonitorInfo monitorInfo = monitorInfoMap.get(monitorId);
// if (monitorInfo == null) {
// System.out.println("警告:设备 " + monitorId + " 未找到设备信息");
// continue;
// }
//
// // 7. 遍历该设备的所有规则支持新版本有monitor_field和历史版本monitor_field为NULL
// for (EmsRecordAlarmRule rule : rules) {
// // 检查规则的基本有效性 - 只要有阈值就可以处理
// if (rule.getTriggerValue() == null) {
// System.out.println("跳过无阈值的规则 - 设备:" + monitorId + ", 规则ID:" + rule.getObjId());
// continue;
// }
//
// BigDecimal threshold = rule.getTriggerValue();
// List<BigDecimal> valuesToCheck = new ArrayList<>();
// List<String> fieldDescriptions = new ArrayList<>();
//
// // 8. 根据monitor_field字段处理不同情况
// if (rule.getMonitorField() != null) {
// // 新版本有明确的monitor_field精确检查指定字段
//
// // 验证监测字段是否符合设备类型(可选的业务校验)
// if (!isValidMonitorFieldForDeviceType(rule.getMonitorField(), monitorInfo.getMonitorType())) {
// System.out.println("警告:设备类型与监测字段不匹配 - 设备:" + monitorId +
// ", 设备类型:" + monitorInfo.getMonitorType() +
// ", 监测字段:" + rule.getMonitorField());
// continue;
// }
//
// BigDecimal actualValue = getFieldValue(record, rule.getMonitorField());
// if (actualValue != null) {
// valuesToCheck.add(actualValue);
// String fieldDesc = MONITOR_FIELD_TO_DESC.get(rule.getMonitorField());
// fieldDescriptions.add(fieldDesc != null ? fieldDesc : "未知字段");
// }
//
// } else {
// // 历史版本monitor_field为NULL根据设备类型检查相应的默认字段
// List<BigDecimal> defaultValues = getDefaultFieldValues(record, monitorInfo.getMonitorType());
// List<String> defaultDescriptions = getDefaultFieldDescriptions(monitorInfo.getMonitorType());
//
// if (!defaultValues.isEmpty()) {
// valuesToCheck.addAll(defaultValues);
// fieldDescriptions.addAll(defaultDescriptions);
// System.out.println("历史规则兼容处理 - 设备:" + monitorId +
// ", 设备类型:" + monitorInfo.getMonitorType() +
// ", 检查字段数:" + defaultValues.size());
// } else {
// System.out.println("警告:无法确定历史规则的检查字段 - 设备:" + monitorId +
// ", 设备类型:" + monitorInfo.getMonitorType());
// continue;
// }
// }
//
// // 9. 阈值比较:根据触发规则检查所有相关字段
// for (int i = 0; i < valuesToCheck.size(); i++) {
// BigDecimal actualValue = valuesToCheck.get(i);
// String fieldDesc = fieldDescriptions.get(i);
//
// if (actualValue != null) {
// boolean isAlarmTriggered = false;
// String alarmDescription = "";
//
// // 根据触发规则判断是否触发告警
// if (rule.getTriggerRule() == 0L) {
// // 大于阈值检查
// if (actualValue.compareTo(threshold) > 0) {
// isAlarmTriggered = true;
// alarmDescription = "大于阈值";
// }
// } else if (rule.getTriggerRule() == 1L) {
// // 小于阈值检查
// if (actualValue.compareTo(threshold) < 0) {
// isAlarmTriggered = true;
// alarmDescription = "小于阈值";
// }
// }
//
// if (isAlarmTriggered) {
// EmsRecordAlarmData alarmData = createAlarmData(record, rule, actualValue.toString(), fieldDesc);
// candidateAlarms.add(alarmData);
//
// System.out.println("检测到阈值异常 - 设备:" + monitorId +
// ", 字段:" + fieldDesc +
// ", 实际值:" + actualValue +
// ", 阈值:" + threshold +
// ", 异常类型:" + alarmDescription +
// ", 设备采集时间:" + record.getCollectTime() +
// ", 系统记录时间:" + record.getRecodeTime());
// }
// }
// }
// }
// }
//
// // 10. 使用严格的去重逻辑插入异常数据
// if (!candidateAlarms.isEmpty()) {
// int insertedCount = insertAlarmDataWithStrictDuplicateCheck(candidateAlarms);
// System.out.println("严格去重后实际插入异常数据: " + insertedCount + " 条");
// } else {
// System.out.println("未检测到异常数据");
// }
//
// } catch (Exception e) {
// System.err.println("阈值检查定时任务执行异常: " + e.getMessage());
// e.printStackTrace();
// } finally {
// long executionTime = System.currentTimeMillis() - startTime;
// System.out.println("==== 阈值检查定时任务执行完成,耗时: " + executionTime + "ms ====");
// }
// }
/**
*
@ -750,4 +752,59 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
public int getAlarmDataTotalCount() {
return emsRecordAlarmDataMapper.getAlarmDataTotalCount();
}
/**
* WebSocket
*
*
* @param alarmDataList
* @return
*/
@Override
public int saveWebSocketAlarmDataBatch(List<EmsRecordAlarmData> alarmDataList) {
if (alarmDataList == null || alarmDataList.isEmpty()) {
return 0;
}
System.out.println("==== 开始保存WebSocket告警数据 ====");
System.out.println("接收到告警数据数量: " + alarmDataList.size());
// 设置默认值和创建时间
Date currentTime = DateUtils.getNowDate();
for (EmsRecordAlarmData alarmData : alarmDataList) {
// 设置创建时间
if (alarmData.getCreateTime() == null) {
alarmData.setCreateTime(currentTime);
}
// 设置默认告警状态为未处理
if (alarmData.getAlarmStatus() == null) {
alarmData.setAlarmStatus(1L); // 1表示未处理
}
// 数据完整性检查
if (StringUtils.isEmpty(alarmData.getMonitorId())) {
System.out.println("警告跳过monitorId为空的告警数据");
continue;
}
if (alarmData.getCollectTime() == null) {
System.out.println("警告collectTime为空使用当前时间");
alarmData.setCollectTime(currentTime);
}
System.out.println("准备保存告警数据 - 设备:" + alarmData.getMonitorId() +
", 原因:" + alarmData.getCause() +
", 数值:" + alarmData.getAlarmData() +
", 告警类型:" + alarmData.getAlarmType());
}
// 复用现有的严格去重逻辑
int insertedCount = insertAlarmDataWithStrictDuplicateCheck(alarmDataList);
System.out.println("WebSocket告警数据保存完成实际插入: " + insertedCount + " 条记录");
System.out.println("==== WebSocket告警数据保存结束 ====");
return insertedCount;
}
}

Loading…
Cancel
Save