diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmPushStatusConstants.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmPushStatusConstants.java
new file mode 100644
index 0000000..2eea3d2
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmPushStatusConstants.java
@@ -0,0 +1,38 @@
+package org.dromara.ems.record.constant;
+
+/**
+ * 告警推送状态常量。
+ *
+ *
迁移阶段统一使用字符串语义值,而不是旧库里的单字符占位,
+ * 这样统计 SQL、推送日志和前端展示才能共用同一套口径。
+ */
+public final class EmsAlarmPushStatusConstants {
+
+ private EmsAlarmPushStatusConstants() {
+ }
+
+ /**
+ * 无推送配置,不纳入推送统计。
+ */
+ public static final String NONE = "NONE";
+
+ /**
+ * 已生成待执行推送任务。
+ */
+ public static final String PENDING = "PENDING";
+
+ /**
+ * 推送执行中。
+ */
+ public static final String PROCESSING = "PROCESSING";
+
+ /**
+ * 推送成功。
+ */
+ public static final String SUCCESS = "SUCCESS";
+
+ /**
+ * 推送失败。
+ */
+ public static final String FAIL = "FAIL";
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmStatusConstants.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmStatusConstants.java
new file mode 100644
index 0000000..9417f64
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/constant/EmsAlarmStatusConstants.java
@@ -0,0 +1,23 @@
+package org.dromara.ems.record.constant;
+
+/**
+ * 告警状态常量。
+ *
+ * 这里先只统一当前仓库已经真实落库的主状态,
+ * 避免“已处理/未处理”在前后端之间出现魔法值漂移。
+ */
+public final class EmsAlarmStatusConstants {
+
+ private EmsAlarmStatusConstants() {
+ }
+
+ /**
+ * 已处理
+ */
+ public static final long HANDLED = 0L;
+
+ /**
+ * 未处理
+ */
+ public static final long UNHANDLED = 1L;
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/AlarmHandleResultVo.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/AlarmHandleResultVo.java
new file mode 100644
index 0000000..ec69de6
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/AlarmHandleResultVo.java
@@ -0,0 +1,32 @@
+package org.dromara.ems.record.domain.vo;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 告警处理结果。
+ */
+@Data
+public class AlarmHandleResultVo implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private Integer requestCount = 0;
+
+ private Integer updatedCount = 0;
+
+ private Integer alreadyHandledCount = 0;
+
+ private Integer missingCount = 0;
+
+ private List successIds = new ArrayList<>();
+
+ private List alreadyHandledIds = new ArrayList<>();
+
+ private List missingIds = new ArrayList<>();
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsAlarmHandleResultVo.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsAlarmHandleResultVo.java
new file mode 100644
index 0000000..70f8195
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsAlarmHandleResultVo.java
@@ -0,0 +1,34 @@
+package org.dromara.ems.record.domain.vo;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 告警处置结果。
+ *
+ * 前端需要知道“真的更新了几条”,不能再把 HTTP 200 当成业务成功。
+ */
+@Data
+public class EmsAlarmHandleResultVo implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private Integer requestCount = 0;
+
+ private Integer updatedCount = 0;
+
+ private Integer alreadyHandledCount = 0;
+
+ private Integer missingCount = 0;
+
+ private List successIds = new ArrayList<>();
+
+ private List alreadyHandledIds = new ArrayList<>();
+
+ private List missingIds = new ArrayList<>();
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmBatchResultVo.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmBatchResultVo.java
new file mode 100644
index 0000000..9d73dc7
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmBatchResultVo.java
@@ -0,0 +1,44 @@
+package org.dromara.ems.record.domain.vo;
+
+import lombok.Data;
+import org.dromara.ems.record.domain.EmsRecordAlarmData;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 实时告警批量入库结果。
+ */
+@Data
+public class EmsRealtimeAlarmBatchResultVo implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 接收到的总条数。
+ */
+ private Integer totalCount = 0;
+
+ /**
+ * 实际新增条数。
+ */
+ private Integer insertedCount = 0;
+
+ /**
+ * 去重命中条数。
+ */
+ private Integer duplicateCount = 0;
+
+ /**
+ * 校验失败或跳过条数。
+ */
+ private Integer failedCount = 0;
+
+ /**
+ * 实际入库后的告警记录,供前端实时弹窗和后续联调直接复用。
+ */
+ private List records = new ArrayList<>();
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmEventVo.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmEventVo.java
new file mode 100644
index 0000000..6e83963
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmEventVo.java
@@ -0,0 +1,30 @@
+package org.dromara.ems.record.domain.vo;
+
+import lombok.Data;
+import org.dromara.ems.record.domain.EmsRecordAlarmData;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * 推送给前端的实时告警事件包。
+ */
+@Data
+public class EmsRealtimeAlarmEventVo implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private String eventType = "ems_alarm_realtime";
+
+ private String channel = "SSE";
+
+ private String source = "saveWebSocketAlarmData";
+
+ private Date generatedAt = new Date();
+
+ private List alarms = new ArrayList<>();
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmSaveResultVo.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmSaveResultVo.java
new file mode 100644
index 0000000..bf005cd
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/domain/vo/EmsRealtimeAlarmSaveResultVo.java
@@ -0,0 +1,35 @@
+package org.dromara.ems.record.domain.vo;
+
+import lombok.Data;
+import org.dromara.ems.record.domain.EmsRecordAlarmData;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 实时告警批量入库结果。
+ *
+ * 保留插入/去重/失败明细,是为了让前端和联调脚本能准确判断本次主链到底落到了哪一步。
+ */
+@Data
+public class EmsRealtimeAlarmSaveResultVo implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ private Integer requestCount = 0;
+
+ private Integer insertedCount = 0;
+
+ private Integer duplicateCount = 0;
+
+ private Integer failedCount = 0;
+
+ private List insertedAlarms = new ArrayList<>();
+
+ private List duplicateKeys = new ArrayList<>();
+
+ private List failureMessages = new ArrayList<>();
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/enums/EmsAlarmPushStatus.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/enums/EmsAlarmPushStatus.java
new file mode 100644
index 0000000..15c9f46
--- /dev/null
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/enums/EmsAlarmPushStatus.java
@@ -0,0 +1,32 @@
+package org.dromara.ems.record.enums;
+
+import cn.hutool.core.util.StrUtil;
+
+import java.util.Locale;
+
+/**
+ * 告警推送状态统一口径。
+ *
+ * 这里把历史遗留的数字值和旧字符串都收敛到标准状态,避免统计口径、日志页和实时链路各说各话。
+ */
+public enum EmsAlarmPushStatus {
+
+ PENDING,
+ PROCESSING,
+ SUCCESS,
+ FAIL;
+
+ public static String normalize(String rawValue) {
+ if (StrUtil.isBlank(rawValue)) {
+ return PENDING.name();
+ }
+ String normalized = rawValue.trim().toUpperCase(Locale.ROOT);
+ return switch (normalized) {
+ case "0", "WAIT", "WAITING", "PENDING" -> PENDING.name();
+ case "1", "PROCESSING" -> PROCESSING.name();
+ case "2", "SUCCESS", "SUCCEED" -> SUCCESS.name();
+ case "3", "FAIL", "FAILED", "ERROR" -> FAIL.name();
+ default -> normalized;
+ };
+ }
+}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmDataMapper.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmDataMapper.java
index 2a96cd8..0c91a93 100644
--- a/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmDataMapper.java
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmDataMapper.java
@@ -58,6 +58,17 @@ public interface EmsRecordAlarmDataMapper
*/
public int updateEmsRecordAlarmData(EmsRecordAlarmData emsRecordAlarmData);
+ /**
+ * 将告警记录标记为已处理。
+ *
+ * 这里单独拆接口,是为了把“仅处理未处理记录”的条件收进 SQL,
+ * 避免并发确认时因为先查后改产生双重成功的假象。
+ *
+ * @param emsRecordAlarmData 告警处理信息
+ * @return 实际更新条数
+ */
+ int markAlarmHandled(EmsRecordAlarmData emsRecordAlarmData);
+
/**
* 删除异常数据记录
*
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmRuleMapper.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmRuleMapper.java
index 2a281ca..3d0b123 100644
--- a/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmRuleMapper.java
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/mapper/EmsRecordAlarmRuleMapper.java
@@ -1,5 +1,6 @@
package org.dromara.ems.record.mapper;
+import org.apache.ibatis.annotations.Param;
import org.dromara.ems.record.domain.EmsRecordAlarmRule;
import java.util.List;
@@ -67,4 +68,13 @@ public interface EmsRecordAlarmRuleMapper
* @return 告警规则总数
*/
public int getEmsRecordAlarmRuleTotalCount();
+
+ /**
+ * 根据实时告警上下文匹配启用中的规则。
+ *
+ * 这里复用 SOP 命中同一套 `monitorId + cause` 语义,
+ * 避免“实时弹窗命中一套规则、落库补字段又命中另一套规则”的口径漂移。
+ */
+ List selectMatchedAlarmRulesByAlarmInfo(@Param("monitorId") String monitorId,
+ @Param("cause") String cause);
}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/IEmsRecordAlarmDataService.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/IEmsRecordAlarmDataService.java
index 5f3eb8f..1dedf0e 100644
--- a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/IEmsRecordAlarmDataService.java
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/IEmsRecordAlarmDataService.java
@@ -1,6 +1,8 @@
package org.dromara.ems.record.service;
import org.dromara.ems.record.domain.EmsRecordAlarmData;
+import org.dromara.ems.record.domain.vo.AlarmHandleResultVo;
+import org.dromara.ems.record.domain.vo.EmsRealtimeAlarmBatchResultVo;
import org.dromara.ems.record.domain.vo.EmsRecordAlarmDataSummaryVo;
import java.util.List;
@@ -74,7 +76,7 @@ public interface IEmsRecordAlarmDataService
* @param objIds
* @return
*/
- int handleExceptionsAlarmData(Long[] objIds);
+ AlarmHandleResultVo handleExceptionsAlarmData(Long[] objIds);
/**
* 物联网数据阈值检查定时任务
@@ -93,5 +95,5 @@ public interface IEmsRecordAlarmDataService
* @param alarmDataList 告警数据列表
* @return 实际插入的记录数量
*/
- public int saveWebSocketAlarmDataBatch(List alarmDataList);
+ EmsRealtimeAlarmBatchResultVo saveWebSocketAlarmDataBatch(List alarmDataList);
}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java
index f35fedb..53a7100 100644
--- a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmDataServiceImpl.java
@@ -2,12 +2,27 @@ package org.dromara.ems.record.service.impl;
import cn.hutool.core.util.ObjectUtil;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.DateUtils;
+import org.dromara.common.json.utils.JsonUtils;
+import org.dromara.common.sse.utils.SseMessageUtils;
+import org.dromara.ems.base.domain.EmsAlarmPushLog;
+import org.dromara.ems.base.domain.EmsBaseMonitorInfo;
+import org.dromara.ems.base.domain.bo.EmsAlarmNotifyGroupUserBo;
+import org.dromara.ems.base.domain.vo.EmsAlarmNotifyGroupUserVo;
+import org.dromara.ems.base.mapper.EmsAlarmPushLogMapper;
import org.dromara.ems.base.mapper.EmsBaseEnergyTypeMapper;
import org.dromara.ems.base.mapper.EmsBaseMonitorInfoMapper;
+import org.dromara.ems.base.service.IEmsAlarmNotifyGroupUserService;
+import org.dromara.ems.record.constant.EmsAlarmPushStatusConstants;
+import org.dromara.ems.record.constant.EmsAlarmStatusConstants;
import org.dromara.ems.record.domain.EmsRecordAlarmData;
import org.dromara.ems.record.domain.EmsRecordAlarmRule;
import org.dromara.ems.record.domain.RecordIotenvInstant;
+import org.dromara.ems.record.domain.vo.AlarmHandleResultVo;
+import org.dromara.ems.record.domain.vo.EmsRealtimeAlarmBatchResultVo;
+import org.dromara.ems.record.domain.vo.EmsRealtimeAlarmEventVo;
import org.dromara.ems.record.domain.vo.EmsRecordAlarmDataSummaryVo;
import org.dromara.ems.record.mapper.EmsRecordAlarmDataMapper;
import org.dromara.ems.record.mapper.EmsRecordAlarmRuleMapper;
@@ -15,10 +30,13 @@ import org.dromara.ems.record.mapper.RecordIotenvInstantMapper;
import org.dromara.ems.record.service.IEmsRecordAlarmDataService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.math.BigDecimal;
import java.util.*;
+import static org.dromara.common.satoken.utils.LoginHelper.getUserId;
import static org.dromara.common.satoken.utils.LoginHelper.getUsername;
/**
@@ -28,6 +46,7 @@ import static org.dromara.common.satoken.utils.LoginHelper.getUsername;
* @date 2024-05-15
*/
@Service
+@Slf4j
@RequiredArgsConstructor
public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService {
@@ -45,6 +64,12 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
private final EmsBaseEnergyTypeMapper emsBaseEnergyTypeMapper;
+ private final EmsAlarmPushLogMapper emsAlarmPushLogMapper;
+
+ private final IEmsAlarmNotifyGroupUserService emsAlarmNotifyGroupUserService;
+
+ private static final long REALTIME_DUPLICATE_WINDOW_MS = 5 * 60 * 1000L;
+
/**
* 监测字段字典映射
* 将前端字典值映射到数据库字段名和中文描述
@@ -171,16 +196,40 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
*/
@Override
@Transactional(rollbackFor = Exception.class)
- public int handleExceptionsAlarmData(Long[] objIds) {
- int result = 0;
+ public AlarmHandleResultVo handleExceptionsAlarmData(Long[] objIds) {
+ AlarmHandleResultVo result = new AlarmHandleResultVo();
+ if (objIds == null || objIds.length == 0) {
+ return result;
+ }
+ result.setRequestCount(objIds.length);
for (Long objId : objIds) {
+ EmsRecordAlarmData existing = emsRecordAlarmDataMapper.selectEmsRecordAlarmDataByObjId(objId);
+ if (existing == null) {
+ result.setMissingCount(result.getMissingCount() + 1);
+ result.getMissingIds().add(objId);
+ continue;
+ }
+ if (Objects.equals(existing.getAlarmStatus(), EmsAlarmStatusConstants.HANDLED)) {
+ result.setAlreadyHandledCount(result.getAlreadyHandledCount() + 1);
+ result.getAlreadyHandledIds().add(objId);
+ continue;
+ }
EmsRecordAlarmData alarmData = new EmsRecordAlarmData();
alarmData.setObjId(objId);
- alarmData.setAlarmStatus(0L);
+ alarmData.setAlarmStatus(EmsAlarmStatusConstants.HANDLED);
alarmData.setOperationName(getUsername());
alarmData.setOperationTime(DateUtils.getNowDate());
- int i = emsRecordAlarmDataMapper.updateEmsRecordAlarmData(alarmData);
- result += 1;
+ alarmData.setConfirmUserId(getUserId());
+ alarmData.setUpdateTime(DateUtils.getNowDate());
+ // 只允许从“未处理”切到“已处理”,这样并发确认时第二次提交会拿到 0 行更新。
+ int updated = emsRecordAlarmDataMapper.markAlarmHandled(alarmData);
+ if (updated > 0) {
+ result.setUpdatedCount(result.getUpdatedCount() + updated);
+ result.getSuccessIds().add(objId);
+ } else {
+ result.setAlreadyHandledCount(result.getAlreadyHandledCount() + 1);
+ result.getAlreadyHandledIds().add(objId);
+ }
}
return result;
}
@@ -792,73 +841,71 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
/**
* 批量保存WebSocket告警数据
- * 处理前端传来的告警数据列表,直接插入数据库
+ * 兼容旧入口,但补回结构化结果,便于前端实时链路和联调页直接复用。
*
* @param alarmDataList 告警数据列表
- * @return 实际插入的记录数量
+ * @return 实时告警处理结果
*/
@Override
- public int saveWebSocketAlarmDataBatch(List alarmDataList) {
+ @Transactional(rollbackFor = Exception.class)
+ public EmsRealtimeAlarmBatchResultVo saveWebSocketAlarmDataBatch(List alarmDataList) {
+ EmsRealtimeAlarmBatchResultVo result = new EmsRealtimeAlarmBatchResultVo();
if (alarmDataList == null || alarmDataList.isEmpty()) {
- return 0;
+ return result;
}
- System.out.println("==== 开始保存WebSocket告警数据 ====");
- System.out.println("接收到告警数据数量: " + alarmDataList.size());
-
- // 设置默认值和创建时间
+ result.setTotalCount(alarmDataList.size());
Date currentTime = DateUtils.getNowDate();
- int insertedCount = 0;
-
- for (EmsRecordAlarmData alarmData : alarmDataList) {
- try {
- // 设置创建时间
- if (alarmData.getCreateTime() == null) {
- alarmData.setCreateTime(currentTime);
- }
-
- // 设置默认告警状态为未处理
- if (alarmData.getAlarmStatus() == null) {
- alarmData.setAlarmStatus(1L); // 1表示未处理
- }
-
- // 数据完整性检查
- if (ObjectUtil.isEmpty(alarmData.getMonitorId())) {
- System.out.println("警告:跳过monitorId为空的告警数据");
- continue;
- }
-
- if (alarmData.getCollectTime() == null) {
- System.out.println("警告:collectTime为空,使用当前时间");
- alarmData.setCollectTime(currentTime);
- }
-
- // 推送状态、标题和内容属于新增字段,入库前补齐默认值,避免后续列表/详情出现空链路。
- fillAlarmEnhanceFields(alarmData);
-
- System.out.println("保存告警数据 - 设备:" + alarmData.getMonitorId() +
- ", 原因:" + alarmData.getCause() +
- ", 数值:" + alarmData.getAlarmData() +
- ", 告警类型:" + alarmData.getAlarmType());
-
- // 直接插入数据库,不进行去重检查
- int result = insertEmsRecordAlarmData(alarmData);
- if (result > 0) {
- insertedCount++;
- System.out.println("成功保存告警数据 - 设备:" + alarmData.getMonitorId() +
- ", 原因:" + alarmData.getCause());
- }
-
- } catch (Exception e) {
- System.err.println("保存告警数据时发生错误: " + e.getMessage());
- e.printStackTrace();
+ Set requestDeduplicationKeys = new HashSet<>();
+ for (int index = 0; index < alarmDataList.size(); index++) {
+ EmsRecordAlarmData alarmData = alarmDataList.get(index);
+ Optional invalidReason = validateRealtimeAlarm(alarmData, currentTime);
+ if (invalidReason.isPresent()) {
+ // 这里允许坏消息单条失败,但数据库写入错误仍走事务整体回滚,避免产生半闭环脏数据。
+ result.setFailedCount(result.getFailedCount() + 1);
+ log.warn("实时告警第{}条校验失败,原因={}", index + 1, invalidReason.get());
+ continue;
}
+
+ String requestKey = buildUniqueKey(alarmData);
+ if (requestKey != null && !requestDeduplicationKeys.add(requestKey)) {
+ result.setDuplicateCount(result.getDuplicateCount() + 1);
+ log.info("实时告警命中批次内去重,businessKey={}", requestKey);
+ continue;
+ }
+
+ List matchedRules = emsRecordAlarmRuleMapper.selectMatchedAlarmRulesByAlarmInfo(alarmData.getMonitorId(), alarmData.getCause());
+ enrichAlarmByMonitor(alarmData);
+ enrichAlarmByRule(alarmData, matchedRules);
+ fillAlarmEnhanceFields(alarmData);
+
+ if (isDuplicateRealtimeAlarm(alarmData)) {
+ result.setDuplicateCount(result.getDuplicateCount() + 1);
+ continue;
+ }
+
+ int inserted = insertEmsRecordAlarmData(alarmData);
+ if (inserted != 1 || alarmData.getObjId() == null) {
+ throw new ServiceException("实时告警落库失败,monitorId=" + alarmData.getMonitorId() + ", cause=" + alarmData.getCause());
+ }
+
+ int pushCount = createPendingPushLogs(alarmData, matchedRules);
+ if (pushCount > 0 || !EmsAlarmPushStatusConstants.NONE.equals(alarmData.getPushStatus())) {
+ EmsRecordAlarmData patch = new EmsRecordAlarmData();
+ patch.setObjId(alarmData.getObjId());
+ patch.setPushCount(pushCount);
+ patch.setPushStatus(alarmData.getPushStatus());
+ patch.setUpdateTime(DateUtils.getNowDate());
+ emsRecordAlarmDataMapper.updateEmsRecordAlarmData(patch);
+ alarmData.setPushCount(pushCount);
+ }
+
+ result.getRecords().add(alarmData);
}
- System.out.println("WebSocket告警数据保存完成,实际插入: " + insertedCount + " 条记录");
- System.out.println("==== WebSocket告警数据保存结束 ====");
-
- return insertedCount;
+ result.setInsertedCount(result.getRecords().size());
+ publishRealtimeAlarmEventAfterCommit(result);
+ return result;
}
private void fillAlarmEnhanceFields(EmsRecordAlarmData alarmData) {
@@ -868,10 +915,7 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
if (alarmData.getPushCount() == null) {
alarmData.setPushCount(0);
}
- if (ObjectUtil.isEmpty(alarmData.getPushStatus())) {
- // 约定 0 为未推送,先补默认值,避免新增字段因为空值影响筛选和详情展示。
- alarmData.setPushStatus("0");
- }
+ alarmData.setPushStatus(normalizePushStatus(alarmData.getPushStatus()));
if (alarmData.getActualValue() == null && ObjectUtil.isNotEmpty(alarmData.getAlarmData())) {
try {
alarmData.setActualValue(new BigDecimal(alarmData.getAlarmData()));
@@ -906,4 +950,214 @@ public class EmsRecordAlarmDataServiceImpl implements IEmsRecordAlarmDataService
}
}
}
+
+ private Optional validateRealtimeAlarm(EmsRecordAlarmData alarmData, Date fallbackTime) {
+ if (alarmData == null) {
+ return Optional.of("告警对象为空");
+ }
+ if (ObjectUtil.isEmpty(alarmData.getMonitorId())) {
+ return Optional.of("monitorId不能为空");
+ }
+ if (ObjectUtil.isEmpty(alarmData.getCause())) {
+ return Optional.of("cause不能为空,SOP 命中链无法建立");
+ }
+ if (alarmData.getCollectTime() == null) {
+ alarmData.setCollectTime(fallbackTime);
+ }
+ if (alarmData.getCreateTime() == null) {
+ alarmData.setCreateTime(fallbackTime);
+ }
+ if (alarmData.getAlarmStatus() == null) {
+ alarmData.setAlarmStatus(EmsAlarmStatusConstants.UNHANDLED);
+ }
+ return Optional.empty();
+ }
+
+ private boolean isDuplicateRealtimeAlarm(EmsRecordAlarmData alarmData) {
+ Date collectTime = alarmData.getCollectTime() != null ? alarmData.getCollectTime() : DateUtils.getNowDate();
+ Date startTime = new Date(collectTime.getTime() - REALTIME_DUPLICATE_WINDOW_MS);
+ Date endTime = new Date(collectTime.getTime() + REALTIME_DUPLICATE_WINDOW_MS);
+ Integer duplicateCount = emsRecordAlarmDataMapper.checkDuplicateAlarmData(alarmData.getMonitorId(), alarmData.getCause(), startTime, endTime);
+ return duplicateCount != null && duplicateCount > 0;
+ }
+
+ private void enrichAlarmByMonitor(EmsRecordAlarmData alarmData) {
+ if (ObjectUtil.isEmpty(alarmData.getMonitorId())) {
+ return;
+ }
+ EmsBaseMonitorInfo query = new EmsBaseMonitorInfo();
+ query.setMonitorCode(alarmData.getMonitorId());
+ List monitorInfos = emsBaseMonitorInfoMapper.selectEmsBaseMonitorInfoList(query);
+ if (monitorInfos == null || monitorInfos.isEmpty()) {
+ return;
+ }
+ EmsBaseMonitorInfo monitorInfo = monitorInfos.get(0);
+ if (ObjectUtil.isEmpty(alarmData.getMonitorName())) {
+ alarmData.setMonitorName(monitorInfo.getMonitorName());
+ }
+ if (ObjectUtil.isEmpty(alarmData.getMetricCode())) {
+ alarmData.setMetricCode(monitorInfo.getMetricCode());
+ }
+ if (ObjectUtil.isEmpty(alarmData.getTenantId())) {
+ alarmData.setTenantId(monitorInfo.getTenantId());
+ }
+ }
+
+ private void enrichAlarmByRule(EmsRecordAlarmData alarmData, List matchedRules) {
+ if (matchedRules == null || matchedRules.isEmpty()) {
+ return;
+ }
+ EmsRecordAlarmRule matchedRule = matchedRules.get(0);
+ if (ObjectUtil.isEmpty(alarmData.getTenantId())) {
+ alarmData.setTenantId(matchedRule.getTenantId());
+ }
+ if (ObjectUtil.isEmpty(alarmData.getMetricCode())) {
+ alarmData.setMetricCode(matchedRule.getMetricCode());
+ }
+ if (ObjectUtil.isEmpty(alarmData.getAlarmLevel())) {
+ alarmData.setAlarmLevel(matchedRule.getAlarmLevel());
+ }
+ if (ObjectUtil.isEmpty(alarmData.getNotifyUser())) {
+ alarmData.setNotifyUser(matchedRule.getNotifyUser());
+ }
+ if (alarmData.getThresholdValue() == null) {
+ if (matchedRule.getTriggerValue() != null) {
+ alarmData.setThresholdValue(matchedRule.getTriggerValue());
+ } else if (Objects.equals(1L, matchedRule.getTriggerRule())) {
+ alarmData.setThresholdValue(matchedRule.getAlarmLower());
+ } else {
+ alarmData.setThresholdValue(matchedRule.getAlarmUpper());
+ }
+ }
+ }
+
+ private int createPendingPushLogs(EmsRecordAlarmData alarmData, List matchedRules) {
+ Set pushTargets = new LinkedHashSet<>();
+ collectExplicitNotifyUsers(pushTargets, alarmData.getNotifyUser());
+ if (matchedRules != null) {
+ for (EmsRecordAlarmRule matchedRule : matchedRules) {
+ collectExplicitNotifyUsers(pushTargets, matchedRule.getNotifyUser());
+ if (matchedRule.getNotifyGroupId() != null) {
+ EmsAlarmNotifyGroupUserBo query = new EmsAlarmNotifyGroupUserBo();
+ query.setGroupId(matchedRule.getNotifyGroupId());
+ query.setIsEnable("0");
+ List groupUsers = emsAlarmNotifyGroupUserService.queryList(query);
+ if (groupUsers == null || groupUsers.isEmpty()) {
+ pushTargets.add("GROUP:" + matchedRule.getNotifyGroupId());
+ continue;
+ }
+ for (EmsAlarmNotifyGroupUserVo groupUser : groupUsers) {
+ if (ObjectUtil.isNotEmpty(groupUser.getPhone())) {
+ pushTargets.add(groupUser.getPhone());
+ } else if (ObjectUtil.isNotEmpty(groupUser.getEmail())) {
+ pushTargets.add(groupUser.getEmail());
+ } else if (ObjectUtil.isNotEmpty(groupUser.getNickName())) {
+ pushTargets.add(groupUser.getNickName());
+ } else if (ObjectUtil.isNotEmpty(groupUser.getUserName())) {
+ pushTargets.add(groupUser.getUserName());
+ }
+ }
+ }
+ }
+ }
+
+ for (String target : pushTargets) {
+ EmsAlarmPushLog pushLog = new EmsAlarmPushLog();
+ pushLog.setAlarmObjId(alarmData.getObjId());
+ pushLog.setChannelType(resolveChannelType(target));
+ pushLog.setTargetValue(target);
+ pushLog.setAlarmLevel(alarmData.getAlarmLevel());
+ pushLog.setPushContent(alarmData.getAlarmContent());
+ pushLog.setPushStatus(EmsAlarmPushStatusConstants.PENDING);
+ pushLog.setResponseMsg("已生成待推送日志,等待真实推送执行链接管");
+ pushLog.setPushTime(DateUtils.getNowDate());
+ emsAlarmPushLogMapper.insert(pushLog);
+ }
+
+ // 这里明确区分“已生成推送任务”和“本条告警没有通知配置”,避免统计页把未配置也算成待推送。
+ alarmData.setPushStatus(pushTargets.isEmpty() ? EmsAlarmPushStatusConstants.NONE : EmsAlarmPushStatusConstants.PENDING);
+ return pushTargets.size();
+ }
+
+ private void collectExplicitNotifyUsers(Set pushTargets, String notifyUser) {
+ if (ObjectUtil.isEmpty(notifyUser)) {
+ return;
+ }
+ String[] parts = notifyUser.split("[,;,;\\s]+");
+ for (String part : parts) {
+ if (ObjectUtil.isNotEmpty(part)) {
+ pushTargets.add(part.trim());
+ }
+ }
+ }
+
+ private String resolveChannelType(String targetValue) {
+ if (ObjectUtil.isEmpty(targetValue)) {
+ return "SITE";
+ }
+ if (targetValue.contains("@")) {
+ return "EMAIL";
+ }
+ if (targetValue.matches("^1\\d{10}$")) {
+ return "SMS";
+ }
+ if (targetValue.startsWith("http://") || targetValue.startsWith("https://")) {
+ return "WEBHOOK";
+ }
+ return "SITE";
+ }
+
+ private String normalizePushStatus(String pushStatus) {
+ if (ObjectUtil.isEmpty(pushStatus)) {
+ return EmsAlarmPushStatusConstants.NONE;
+ }
+ String normalized = pushStatus.trim().toUpperCase(Locale.ROOT);
+ if ("0".equals(normalized) || "WAIT".equals(normalized)) {
+ return EmsAlarmPushStatusConstants.PENDING;
+ }
+ if ("1".equals(normalized)) {
+ return EmsAlarmPushStatusConstants.PROCESSING;
+ }
+ if ("2".equals(normalized) || "FAILED".equals(normalized)) {
+ return EmsAlarmPushStatusConstants.FAIL;
+ }
+ if (EmsAlarmPushStatusConstants.NONE.equals(normalized)
+ || EmsAlarmPushStatusConstants.PENDING.equals(normalized)
+ || EmsAlarmPushStatusConstants.PROCESSING.equals(normalized)
+ || EmsAlarmPushStatusConstants.SUCCESS.equals(normalized)
+ || EmsAlarmPushStatusConstants.FAIL.equals(normalized)) {
+ return normalized;
+ }
+ return EmsAlarmPushStatusConstants.NONE;
+ }
+
+ private void publishRealtimeAlarmEventAfterCommit(EmsRealtimeAlarmBatchResultVo result) {
+ if (result == null || result.getInsertedCount() <= 0 || result.getRecords().isEmpty()) {
+ return;
+ }
+ Runnable publishTask = () -> {
+ try {
+ EmsRealtimeAlarmEventVo event = new EmsRealtimeAlarmEventVo();
+ // 这里使用前端约定的 eventType,避免迁移阶段再引入额外的协议转换层。
+ event.setEventType("ems_alarm_realtime");
+ event.setGeneratedAt(DateUtils.getNowDate());
+ event.setSource("saveWebSocketAlarmData");
+ event.setAlarms(result.getRecords());
+ SseMessageUtils.publishAll(JsonUtils.toJsonString(event));
+ } catch (Exception ex) {
+ // 这里不能反向影响主事务,避免“落库成功但广播失败”把记录整体回滚。
+ log.warn("实时告警 SSE 广播失败,insertedCount={}", result.getInsertedCount(), ex);
+ }
+ };
+ if (TransactionSynchronizationManager.isSynchronizationActive()) {
+ TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
+ @Override
+ public void afterCommit() {
+ publishTask.run();
+ }
+ });
+ return;
+ }
+ publishTask.run();
+ }
}
diff --git a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmRuleServiceImpl.java b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmRuleServiceImpl.java
index 841b234..54c8d8e 100644
--- a/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmRuleServiceImpl.java
+++ b/ruoyi-ems/src/main/java/org/dromara/ems/record/service/impl/EmsRecordAlarmRuleServiceImpl.java
@@ -3,6 +3,8 @@ package org.dromara.ems.record.service.impl;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.utils.DateUtils;
import org.dromara.ems.base.domain.EmsAlarmActionStep;
+import org.dromara.ems.base.domain.EmsAlarmNotifyGroup;
+import org.dromara.ems.base.mapper.EmsAlarmNotifyGroupMapper;
import org.dromara.ems.base.service.IEmsAlarmActionStepService;
import org.dromara.ems.record.domain.EmsRecordAlarmRule;
import org.dromara.ems.record.mapper.EmsRecordAlarmRuleMapper;
@@ -28,6 +30,8 @@ public class EmsRecordAlarmRuleServiceImpl implements IEmsRecordAlarmRuleService
private final IEmsAlarmActionStepService emsAlarmActionStepService;
+ private final EmsAlarmNotifyGroupMapper emsAlarmNotifyGroupMapper;
+
/**
* 查询异常告警规则
*
@@ -61,6 +65,7 @@ public class EmsRecordAlarmRuleServiceImpl implements IEmsRecordAlarmRuleService
@Override
public int insertEmsRecordAlarmRule(EmsRecordAlarmRule emsRecordAlarmRule)
{
+ validateNotifyConfig(emsRecordAlarmRule);
emsRecordAlarmRule.setCreateTime(DateUtils.getNowDate());
return emsRecordAlarmRuleMapper.insertEmsRecordAlarmRule(emsRecordAlarmRule);
}
@@ -74,6 +79,7 @@ public class EmsRecordAlarmRuleServiceImpl implements IEmsRecordAlarmRuleService
@Override
public int updateEmsRecordAlarmRule(EmsRecordAlarmRule emsRecordAlarmRule)
{
+ validateNotifyConfig(emsRecordAlarmRule);
emsRecordAlarmRule.setUpdateTime(DateUtils.getNowDate());
return emsRecordAlarmRuleMapper.updateEmsRecordAlarmRule(emsRecordAlarmRule);
}
@@ -152,4 +158,17 @@ public class EmsRecordAlarmRuleServiceImpl implements IEmsRecordAlarmRuleService
{
return emsRecordAlarmRuleMapper.getEmsRecordAlarmRuleTotalCount();
}
+
+ private void validateNotifyConfig(EmsRecordAlarmRule emsRecordAlarmRule) {
+ if (emsRecordAlarmRule == null || emsRecordAlarmRule.getNotifyGroupId() == null) {
+ return;
+ }
+ EmsAlarmNotifyGroup notifyGroup = emsAlarmNotifyGroupMapper.selectById(emsRecordAlarmRule.getNotifyGroupId());
+ if (notifyGroup == null) {
+ throw new IllegalArgumentException("通知组不存在,无法保存告警规则");
+ }
+ if ("1".equals(notifyGroup.getIsEnable())) {
+ throw new IllegalArgumentException("通知组已停用,请先启用后再绑定到告警规则");
+ }
+ }
}
diff --git a/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml b/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml
index 8e0f013..9e18531 100644
--- a/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml
+++ b/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmDataMapper.xml
@@ -82,6 +82,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
and rad.collect_device_id = #{collectDeviceId}
and ebcdi.collect_device_name like concat('%', #{collectDeviceName}, '%')
and rad.collect_time = #{collectTime}
+ and rad.collect_time >= #{beginCollectTime}
+ and rad.collect_time <= #{endCollectTime}
and rad.alarm_type = #{alarmType}
and rad.alarm_status = #{alarmStatus}
and rad.alarm_data = #{alarmData}
@@ -102,6 +104,12 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
and rad.confirm_user_id = #{confirmUserId}
and rad.confirm_remark like concat('%', #{confirmRemark}, '%')
and rad.notify_user = #{notifyUser}
+
+ and rad.monitor_id in
+
+ #{monitorId}
+
+
@@ -115,7 +123,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
SELECT COUNT(1) AS totalCount,
COALESCE(SUM(CASE WHEN rad.alarm_status = 1 THEN 1 ELSE 0 END), 0) AS unhandledCount,
COALESCE(SUM(CASE WHEN rad.alarm_status = 0 THEN 1 ELSE 0 END), 0) AS handledCount,
- COALESCE(SUM(CASE WHEN UPPER(COALESCE(rad.push_status, '')) IN ('PENDING', 'WAIT') THEN 1 ELSE 0 END), 0) AS pushPendingCount,
+ COALESCE(SUM(CASE WHEN UPPER(COALESCE(rad.push_status, '')) IN ('PENDING', 'WAIT', '0') THEN 1 ELSE 0 END), 0) AS pushPendingCount,
COALESCE(SUM(CASE WHEN UPPER(COALESCE(rad.push_status, '')) = 'PROCESSING' THEN 1 ELSE 0 END), 0) AS pushProcessingCount,
COALESCE(SUM(CASE WHEN UPPER(COALESCE(rad.push_status, '')) = 'SUCCESS' THEN 1 ELSE 0 END), 0) AS pushSuccessCount,
COALESCE(SUM(CASE WHEN UPPER(COALESCE(rad.push_status, '')) IN ('FAIL', 'FAILED') THEN 1 ELSE 0 END), 0) AS pushFailCount
@@ -223,6 +231,17 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
where obj_id = #{objId}
+
+ update ems_record_alarm_data
+ set alarm_status = #{alarmStatus},
+ operation_name = #{operationName},
+ operation_time = #{operationTime},
+ confirm_user_id = #{confirmUserId},
+ update_time = #{updateTime}
+ where obj_id = #{objId}
+ and alarm_status <> #{alarmStatus}
+
+
delete from ems_record_alarm_data where obj_id = #{objId}
diff --git a/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmRuleMapper.xml b/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmRuleMapper.xml
index eddaade..111edd0 100644
--- a/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmRuleMapper.xml
+++ b/ruoyi-ems/src/main/resources/mapper/ems/record/EmsRecordAlarmRuleMapper.xml
@@ -106,6 +106,32 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
where RAL.obj_id = #{objId}
+
+
insert into ems_record_alarm_rule