diff --git a/package.json b/package.json index ced938a..83c5ce0 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,8 @@ "gif-encoder-2": "^1.0.5", "helmet": "^7.1.0", "jsonwebtoken": "^9.0.2", - "mysql2": "^3.12.0" + "mysql2": "^3.12.0", + "node-cron": "^4.2.1", + "ws": "^8.21.0" } } diff --git a/src/server.js b/src/server.js index 3a3aafc..f89f9aa 100644 --- a/src/server.js +++ b/src/server.js @@ -1,4 +1,5 @@ const express = require('express'); +const http = require('http'); const cors = require('cors'); const helmet = require('helmet'); const fs = require('fs/promises'); @@ -14,6 +15,7 @@ const dictService = require('./services/system/dictService'); const systemService = require('./services/system/systemService'); const extraService = require('./services/system/extraService'); const reportService = require('./services/ems/common/reportService'); +const analysisReportService = require('./services/report/analysisReportService'); const temperatureReportService = require('./services/ems/temperature/reportService'); const electricityReportService = require('./services/ems/electricity/reportService'); const waterReportService = require('./services/ems/water/reportService'); @@ -22,12 +24,20 @@ const energyMetricService = require('./services/ems/energyMetricService'); const shiftTeamService = require('./services/ems/shiftTeamService'); const electricityPriceService = require('./services/ems/electricityPriceService'); const electricityBillService = require('./services/ems/electricityBillService'); +const electricityTouService = require('./services/ems/electricityTouService'); +const electricityLineLossService = require('./services/ems/electricityLineLossService'); const measuringDeviceService = require('./services/ems/measuringDeviceService'); const measurementControlPointService = require('./services/ems/measurementControlPointService'); +const collectingDeviceService = require('./services/ems/collectingDeviceService'); +const alarmService = require('./services/ems/alarmService'); const floorInfoService = require('./services/floorInfoService'); +const lineInfoService = require('./services/lineInfoService'); const { authenticate } = require('./middleware/auth'); const { parseRequestBody } = require('./cryptoBody'); const { nextId } = require('./id'); +const wsService = require('./wsService'); +const alarmCron = require('./alarmCron'); +const menuInitializer = require('./services/system/menuInitializer'); const app = express(); @@ -938,7 +948,7 @@ app.put('/ems/energyMetric', authenticate, parseRequestBody, routeOk(async (req) // 接口: DELETE /ems/energyMetric/:ids;入参: path(ids, 逗号分隔);出参: R.ok(null)。 app.delete('/ems/energyMetric/:ids', authenticate, routeOk(async (req) => { await energyMetricService.remove(req.params.ids); return null; })); -// 接口: GET /ems/measuringDevice/list;入参: query(pageNum,pageSize,deviceName,deviceCode,deviceId,deviceType,installPosition);出参: TableDataInfo(计量器具台账)。 +// 接口: GET /ems/measuringDevice/list;入参: query(pageNum,pageSize,keyword,deviceName,deviceCode,deviceId,deviceType,installPosition);出参: TableDataInfo(计量器具台账)。 app.get('/ems/measuringDevice/list', authenticate, routeRaw((req) => measuringDeviceService.list(req.query))); // 接口: GET /ems/measuringDevice/:id;入参: path(id);出参: R.ok(计量器具详情)。 @@ -950,6 +960,12 @@ app.post('/ems/measuringDevice', authenticate, parseRequestBody, routeOk(async ( // 接口: PUT /ems/measuringDevice;入参: body(id 必填及计量器具字段);出参: R.ok(null)。 app.put('/ems/measuringDevice', authenticate, parseRequestBody, routeOk(async (req) => { await measuringDeviceService.update(req.body); return null; })); +// 接口: PUT /ems/measuringDevice/mountChildren;入参: body(parentDeviceGroupId,childIds[]);出参: R.ok(null)。 +app.put('/ems/measuringDevice/mountChildren', authenticate, parseRequestBody, routeOk(async (req) => { await measuringDeviceService.mountChildren(req.body); return null; })); + +// 接口: PUT /ems/measuringDevice/unmountChild;入参: body(parentDeviceGroupId,childId);出参: R.ok(null)。 +app.put('/ems/measuringDevice/unmountChild', authenticate, parseRequestBody, routeOk(async (req) => { await measuringDeviceService.unmountChild(req.body); return null; })); + // 接口: DELETE /ems/measuringDevice/:ids;入参: path(ids, 逗号分隔);出参: R.ok(null)。 app.delete('/ems/measuringDevice/:ids', authenticate, routeOk(async (req) => { await measuringDeviceService.remove(req.params.ids); return null; })); @@ -968,6 +984,51 @@ app.put('/ems/measurementControlPoint', authenticate, parseRequestBody, routeOk( // 接口: DELETE /ems/measurementControlPoint/:ids;入参: path(ids, 逗号分隔);出参: R.ok(null)。 app.delete('/ems/measurementControlPoint/:ids', authenticate, routeOk(async (req) => { await measurementControlPointService.remove(req.params.ids); return null; })); +// 接口: GET /ems/collectingDevice/list;入参: query(pageNum,pageSize,deviceCode,deviceName,deviceType,installPosition,status);出参: TableDataInfo(采集设备信息)。 +app.get('/ems/collectingDevice/list', authenticate, routeRaw((req) => collectingDeviceService.list(req.query))); + +// 接口: GET /ems/collectingDevice/:id;入参: path(id);出参: R.ok(采集设备详情)。 +app.get('/ems/collectingDevice/:id', authenticate, routeOk((req) => collectingDeviceService.get(req.params.id))); + +// 接口: POST /ems/collectingDevice;入参: body(deviceCode,deviceName,deviceType,manufacturer,specificationModel,serialNo,assetNo,installPosition,status,responsiblePerson);出参: R.ok({ id })。 +app.post('/ems/collectingDevice', authenticate, parseRequestBody, routeOk(async (req) => ({ id: await collectingDeviceService.add(req.body) }))); + +// 接口: PUT /ems/collectingDevice;入参: body(id 必填及采集设备字段);出参: R.ok(null)。 +app.put('/ems/collectingDevice', authenticate, parseRequestBody, routeOk(async (req) => { await collectingDeviceService.update(req.body); return null; })); + +// 接口: PUT /ems/collectingDevice/bindMeasuringDevices;入参: body(id,measuringDeviceIds[]);出参: R.ok(null)。 +app.put('/ems/collectingDevice/bindMeasuringDevices', authenticate, parseRequestBody, routeOk(async (req) => { await collectingDeviceService.bindMeasuringDevices(req.body); return null; })); + +// 接口: DELETE /ems/collectingDevice/:ids;入参: path(ids, 逗号分隔);出参: R.ok(null)。 +app.delete('/ems/collectingDevice/:ids', authenticate, routeOk(async (req) => { await collectingDeviceService.remove(req.params.ids); return null; })); + +// 接口: GET /ems/alarmRule/list;入参: query(pageNum,pageSize,ruleName,alarmType,enabled);出参: TableDataInfo(报警规则)。 +app.get('/ems/alarmRule/list', authenticate, routeRaw((req) => alarmService.listRules(req.query))); + +// 接口: GET /ems/alarmRule/:id;入参: path(id);出参: R.ok(报警规则详情)。 +app.get('/ems/alarmRule/:id', authenticate, routeOk((req) => alarmService.getRule(req.params.id))); + +// 接口: POST /ems/alarmRule;入参: body(ruleName,version,enabled,effectiveStart,effectiveEnd,alarmType,alarmRule,alarmLevel,bindDeviceIds);出参: R.ok({ id })。 +app.post('/ems/alarmRule', authenticate, parseRequestBody, routeOk(async (req) => ({ id: await alarmService.addRule(req.body) }))); + +// 接口: PUT /ems/alarmRule;入参: body(id 必填,ruleName,version,enabled,effectiveStart,effectiveEnd,alarmType,alarmRule,alarmLevel,bindDeviceIds);出参: R.ok(null)。 +app.put('/ems/alarmRule', authenticate, parseRequestBody, routeOk(async (req) => { await alarmService.updateRule(req.body); return null; })); + +// 接口: PUT /ems/alarmRule/bindDevices;入参: body(id,deviceIds[]);出参: R.ok(null)。 +app.put('/ems/alarmRule/bindDevices', authenticate, parseRequestBody, routeOk(async (req) => { await alarmService.bindRuleDevices(req.body); return null; })); + +// 接口: DELETE /ems/alarmRule/:ids;入参: path(ids, 逗号分隔);出参: R.ok(null)。 +app.delete('/ems/alarmRule/:ids', authenticate, routeOk(async (req) => { await alarmService.removeRule(req.params.ids); return null; })); + +// 接口: GET /ems/alarmDetail/list;入参: query(pageNum,pageSize,alarmDeviceName,alarmType,alarmLevel,unhandled);出参: TableDataInfo(报警明细)。 +app.get('/ems/alarmDetail/list', authenticate, routeRaw((req) => alarmService.listDetails(req.query))); + +// 接口: PUT /ems/alarmDetail/handle;入参: body(id,handler);出参: R.ok(null)。 +app.put('/ems/alarmDetail/handle', authenticate, parseRequestBody, routeOk(async (req) => { await alarmService.handleDetail(req.body); return null; })); + +// 接口: POST /ems/alarmRule/scan;入参: 无;出参: R.ok({ created })。 +app.post('/ems/alarmRule/scan', authenticate, routeRaw(() => alarmService.scan())); + // 接口: GET /ems/shiftTeam/list;入参: query(pageNum,pageSize,name);出参: TableDataInfo(班组信息)。 app.get('/ems/shiftTeam/list', authenticate, routeRaw((req) => shiftTeamService.list(req.query))); @@ -1001,33 +1062,51 @@ app.delete('/electricityManagement/electricityPrices/:ids', authenticate, routeO // 接口: GET /electricityManagement/electricityBill;入参: query(beginTime,endTime,interval(minute/hour/day/month/shift));出参: R.ok({ buckets, series, tableRows })。 app.get('/electricityManagement/electricityBill', authenticate, routeRaw((req) => electricityBillService.bill(req.query))); +// 接口: GET /electricityManagement/TOU;入参: query(nodeIds 逗号分隔,beginTime,endTime,interval(minute/hour/day/month/year/shift),unified);出参: R.ok({ unified, buckets, nodes })。 +app.get('/electricityManagement/TOU', authenticate, routeRaw((req) => electricityTouService.tou(req.query))); + +// 接口: GET /electricityManagement/lineLoss;入参: query(nodeId,timeType(day/month/year),date);出参: R.ok({ beginTime,endTime,timeType,date,rows })。 +app.get('/electricityManagement/lineLoss', authenticate, routeRaw((req) => electricityLineLossService.lineLoss(req.query))); + // 接口: GET /ems/report/energy;入参: query(deviceType,nodeIds,timeType(hour/day/month/year/shift/latest),date,startYear,endYear,latestCount);出参: R.ok({ columns, rows, buckets, fields, deviceType, deviceTypeLabel })。 app.get('/ems/report/energy', authenticate, routeRaw((req) => reportService.energy(req.query))); +// 接口: GET /ems/report/compare;入参: query(deviceType,nodeIds,compareTypes,timeType,date,startYear,endYear);出参: R.ok(能源对比与分析图表数据)。 +app.get('/ems/report/compare', authenticate, routeRaw((req) => analysisReportService.compare(req.query))); + +// 接口: GET /ems/report/trend;入参: query(deviceType,deviceIds,interval,beginTime,endTime,aggregateType);出参: R.ok(趋势曲线数据)。 +app.get('/ems/report/trend', authenticate, routeRaw((req) => analysisReportService.trend(req.query))); + // 接口: GET /ems/temperature/devices;入参: 无;出参: R.ok(Array<{ label,value }>),返回温度设备下拉选项。 app.get('/ems/temperature/devices', authenticate, routeRaw(() => temperatureReportService.devices())); -// 接口: GET /ems/temperature/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime })。 +// 接口: GET /ems/temperature/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId,interval 秒数可选);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime, interval })。 app.get('/ems/temperature/line', authenticate, routeRaw((req) => temperatureReportService.line(req.query))); // 接口: GET /ems/electricity/devices;入参: 无;出参: R.ok(Array<{ label,value }>),返回用电设备下拉选项。 app.get('/ems/electricity/devices', authenticate, routeRaw(() => electricityReportService.devices())); -// 接口: GET /ems/electricity/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime })。 +// 接口: GET /ems/electricity/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId,interval 秒数可选);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime, interval })。 app.get('/ems/electricity/line', authenticate, routeRaw((req) => electricityReportService.line(req.query))); // 接口: GET /ems/water/devices;入参: 无;出参: R.ok(Array<{ label,value }>),返回用水设备下拉选项。 app.get('/ems/water/devices', authenticate, routeRaw(() => waterReportService.devices())); -// 接口: GET /ems/water/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime })。 +// 接口: GET /ems/water/line;入参: query(beginTime,endTime,deviceIds 逗号分隔,metricId,interval 秒数可选);出参: R.ok({ xAxis, series, devices, metric, beginTime, endTime, interval })。 app.get('/ems/water/line', authenticate, routeRaw((req) => waterReportService.line(req.query))); // 接口: GET /ems/senior/power;入参: query(beginTime,endTime,showGroup,reportType);出参: R.ok({ riverData, treemapData, devices, beginTime, endTime, showGroup })。 app.get('/ems/senior/power', authenticate, routeRaw((req) => seniorReportService.power(req.query))); +// 接口: GET /ems/senior/flow;入参: query(beginTime,endTime,reportType);出参: R.ok({ sankeyData, devices, beginTime, endTime, reportType })。 +app.get('/ems/senior/flow', authenticate, routeRaw((req) => seniorReportService.flow(req.query))); + // 接口: GET /floorInfo/list;入参: query(name);出参: TableDataInfo,rows 为按 children 无限嵌套的楼层树,每个节点包含 level。 app.get('/floorInfo/list', authenticate, routeRaw((req) => floorInfoService.list(req.query))); +// 接口: POST /floorInfo/syncMountedDevices;入参: 无;出参: R.ok({ inserted }),将楼层树已绑定设备组的下级设备同步挂到楼层树。 +app.post('/floorInfo/syncMountedDevices', authenticate, routeOk(() => floorInfoService.syncMountedDevices())); + // 接口: GET /floorInfo/:id;入参: path(id);出参: R.ok(floorInfo 行)。 app.get('/floorInfo/:id', authenticate, routeOk((req) => floorInfoService.get(req.params.id))); @@ -1043,19 +1122,44 @@ app.post('/floorInfo/mountDevice', authenticate, parseRequestBody, routeOk(async // 接口: DELETE /floorInfo/:id;入参: path(id);出参: R.ok(null),递归逻辑删除当前节点及全部子孙节点。 app.delete('/floorInfo/:id', authenticate, routeOk(async (req) => { await floorInfoService.remove(req.params.id); return null; })); +// 接口: GET /lineInfo/list;入参: query(name);出参: TableDataInfo,rows 为按 children 无限嵌套的线路树,每个节点包含 level。 +app.get('/lineInfo/list', authenticate, routeRaw((req) => lineInfoService.list(req.query))); + +// 接口: POST /lineInfo/syncMountedDevices;入参: 无;出参: R.ok({ inserted }),将线路树已绑定设备组的下级设备同步挂到线路树。 +app.post('/lineInfo/syncMountedDevices', authenticate, routeOk(() => lineInfoService.syncMountedDevices())); + +// 接口: GET /lineInfo/:id;入参: path(id);出参: R.ok(lineInfo 行)。 +app.get('/lineInfo/:id', authenticate, routeOk((req) => lineInfoService.get(req.params.id))); + +// 接口: POST /lineInfo;入参: body(pid,name,isDevice,deviceId,isDeviceGroup,deviceGroupId,deviceType,reserve5);出参: R.ok({ id })。 +app.post('/lineInfo', authenticate, parseRequestBody, routeOk(async (req) => ({ id: await lineInfoService.add(req.body) }))); + +// 接口: PUT /lineInfo;入参: body(id 必填,pid,name,isDevice,deviceId,isDeviceGroup,deviceGroupId,deviceType,reserve5);出参: R.ok(null)。 +app.put('/lineInfo', authenticate, parseRequestBody, routeOk(async (req) => { await lineInfoService.update(req.body); return null; })); + +// 接口: POST /lineInfo/mountDevice;入参: body(pid 必填,isDeviceGroup,deviceId,deviceGroupId,deviceType,name);出参: R.ok({ id })。 +app.post('/lineInfo/mountDevice', authenticate, parseRequestBody, routeOk(async (req) => ({ id: await lineInfoService.mountDevice(req.body) }))); + +// 接口: DELETE /lineInfo/:id;入参: path(id);出参: R.ok(null),递归逻辑删除当前节点及全部子孙节点。 +app.delete('/lineInfo/:id', authenticate, routeOk(async (req) => { await lineInfoService.remove(req.params.id); return null; })); + app.use((req, res) => { res.status(404).json(fail('接口不存在', 404)); }); if (require.main === module) { + const server = http.createServer(app); + wsService.init(server); pool.query('select 1') - .then(() => { + .then(async () => { + await menuInitializer.ensureMenus(); setInterval(() => { pool.query('select 1').catch((error) => { console.error(`database keepalive failed: ${error.message}`); }); }, 30000).unref(); - app.listen(config.port, () => { + server.listen(config.port, () => { + alarmCron.start(); console.log(`EMS Node auth server listening on port ${config.port}`); }); }) diff --git a/src/services/ems/measuringDeviceService.js b/src/services/ems/measuringDeviceService.js index 72e51dd..1b5ada2 100644 --- a/src/services/ems/measuringDeviceService.js +++ b/src/services/ems/measuringDeviceService.js @@ -31,7 +31,13 @@ function mapRow(row) { deviceCode: row.device_code, installPosition: row.install_position, installPositionName: row.install_position_name, + collectingDeviceId: row.collecting_device_id, + collectingDeviceName: row.collecting_device_name, deviceType: row.device_type, + isDeviceGroup: row.is_device_group || '0', + parentDeviceGroupId: row.parent_device_group_id, + parentDeviceGroupName: row.parent_device_group_name, + childDevices: row.child_devices || [], measuringUnit: row.measuring_unit_summary || row.measuring_unit, standardUnit: row.standard_unit, standardUnitName: row.standard_unit_name_summary || row.standard_unit_name, @@ -45,6 +51,9 @@ function mapRow(row) { communicationProtocol: row.communication_protocol, communicationAddress: row.communication_address, collectionCycle: row.collection_cycle, + lastCollectionTime: row.last_collection_time, + alarmRules: row.alarm_rules || [], + alarmRuleNames: row.alarm_rule_names || '', status: row.status, lastVerificationDate: row.last_verification_date, responsibleDept: row.responsible_dept, @@ -85,12 +94,16 @@ function normalizeUnits(body = {}) { function normalizeBody(body = {}) { const units = normalizeUnits(body); const firstUnit = units[0] || {}; - return { + const isDeviceGroup = body.isDeviceGroup === true || body.isDeviceGroup === '1' || body.isDeviceGroup === 1 ? '1' : '0'; + const data = { deviceId: body.deviceId || null, deviceName: body.deviceName || null, deviceCode: body.deviceCode || null, installPosition: body.installPosition || null, + collectingDeviceId: body.collectingDeviceId || null, deviceType: body.deviceType === undefined || body.deviceType === null || body.deviceType === '' ? null : String(body.deviceType), + isDeviceGroup, + parentDeviceGroupId: body.parentDeviceGroupId || null, measuringUnit: firstUnit.measuringUnit || null, standardUnit: firstUnit.standardUnit || null, conversionFactor: firstUnit.conversionFactor ?? null, @@ -103,16 +116,39 @@ function normalizeBody(body = {}) { communicationProtocol: body.communicationProtocol || null, communicationAddress: body.communicationAddress || null, collectionCycle: body.collectionCycle || null, - status: body.status || null, + status: body.status === undefined || body.status === null || body.status === '' ? null : String(body.status), lastVerificationDate: body.lastVerificationDate || null, responsibleDept: body.responsibleDept || null, responsiblePerson: body.responsiblePerson || null, remark: body.remark || null }; + if (isDeviceGroup === '1') { + data.installPosition = null; + data.collectingDeviceId = null; + data.specificationModel = null; + data.manufacturer = null; + data.factoryNo = null; + data.accuracyLevel = null; + data.measureRange = null; + data.communicationProtocol = null; + data.communicationAddress = null; + data.collectionCycle = null; + data.lastVerificationDate = null; + data.responsibleDept = null; + data.responsiblePerson = null; + } + return data; } const TABLE = 'measuring_device'; -const selectSql = `select md.* from ${TABLE} md`; +const selectSql = ` + select md.*, + cd.device_name as collecting_device_name, + parent.device_name as parent_device_group_name + from ${TABLE} md + left join collecting_device cd on cd.id collate utf8mb4_general_ci = md.collecting_device_id collate utf8mb4_general_ci and cd.del_flag = '0' + left join ${TABLE} parent on parent.id collate utf8mb4_general_ci = md.parent_device_group_id collate utf8mb4_general_ci and parent.del_flag = '0' +`; async function loadFloorPathMap() { const rows = await query( @@ -201,17 +237,123 @@ async function loadUnitsMap(deviceIds, metricNameMap, deviceTypeMap) { return map; } +async function loadAlarmRuleMap(deviceDeviceIds = []) { + const validDeviceIds = [...new Set(deviceDeviceIds.map((item) => String(item || '').trim()).filter(Boolean))]; + if (!validDeviceIds.length) return new Map(); + const rows = await query( + `select id, + rule_name, + alarm_type, + alarm_level, + enabled, + bind_device_ids + from alarm_rule_info + where del_flag = '0' + and bind_device_ids is not null + and bind_device_ids <> ''` + ); + const map = new Map(); + rows.forEach((row) => { + const bindDeviceIds = String(row.bind_device_ids || '') + .split(',') + .map((item) => item.trim()) + .filter(Boolean); + bindDeviceIds.forEach((deviceId) => { + if (!validDeviceIds.includes(deviceId)) return; + if (!map.has(deviceId)) map.set(deviceId, []); + map.get(deviceId).push({ + id: row.id, + ruleName: row.rule_name, + alarmType: row.alarm_type, + alarmLevel: row.alarm_level, + enabled: row.enabled + }); + }); + }); + return map; +} + +async function loadChildDeviceMap(deviceIds = []) { + const ids = [...new Set(deviceIds.map((item) => String(item || '').trim()).filter(Boolean))]; + if (!ids.length) return new Map(); + const params = {}; + const placeholders = ids.map((id, index) => { + params[`id${index}`] = id; + return `:id${index}`; + }); + const rows = await query( + `select id, + device_id, + device_name, + device_code, + device_type, + is_device_group, + parent_device_group_id, + specification_model, + manufacturer, + factory_no, + accuracy_level, + measure_range, + communication_protocol, + communication_address, + collection_cycle, + status, + last_verification_date, + responsible_person, + remark + from ${TABLE} + where del_flag = '0' + and parent_device_group_id in (${placeholders.join(', ')}) + order by is_device_group desc, create_time asc, id asc`, + params + ); + const map = new Map(); + rows.forEach((row) => { + const parentId = String(row.parent_device_group_id || ''); + if (!map.has(parentId)) map.set(parentId, []); + map.get(parentId).push({ + id: row.id, + deviceId: row.device_id, + deviceName: row.device_name, + deviceCode: row.device_code, + deviceType: row.device_type, + isDeviceGroup: row.is_device_group || '0', + parentDeviceGroupId: row.parent_device_group_id, + specificationModel: row.specification_model, + manufacturer: row.manufacturer, + factoryNo: row.factory_no, + accuracyLevel: row.accuracy_level, + measureRange: row.measure_range, + communicationProtocol: row.communication_protocol, + communicationAddress: row.communication_address, + collectionCycle: row.collection_cycle, + status: row.status, + lastVerificationDate: row.last_verification_date, + responsiblePerson: row.responsible_person, + remark: row.remark + }); + }); + return map; +} + async function decorateRows(rows) { const [floorPathMap, metricNameMap] = await Promise.all([loadFloorPathMap(), loadMetricNameMap()]); const deviceTypeMap = new Map(rows.map((row) => [String(row.id), row.device_type])); - const unitsMap = await loadUnitsMap(rows.map((row) => row.id), metricNameMap, deviceTypeMap); + const [unitsMap, alarmRuleMap, childDeviceMap] = await Promise.all([ + loadUnitsMap(rows.map((row) => row.id), metricNameMap, deviceTypeMap), + loadAlarmRuleMap(rows.map((row) => row.device_id)), + loadChildDeviceMap(rows.filter((row) => row.is_device_group === '1').map((row) => row.id)) + ]); return rows.map((row) => ({ ...row, install_position_name: floorPathMap.get(String(row.install_position)) || '', standard_unit_name: metricNameMap.get(`${String(row.device_type)}:${row.standard_unit}`) || row.standard_unit, measuring_unit_summary: (unitsMap.get(String(row.id)) || []).map((item) => item.unit.measuringUnit).filter(Boolean).join('、'), standard_unit_name_summary: (unitsMap.get(String(row.id)) || []).map((item) => item.label).filter(Boolean).join(';'), - units: (unitsMap.get(String(row.id)) || []).map((item) => item.unit) + units: (unitsMap.get(String(row.id)) || []).map((item) => item.unit), + alarm_rules: alarmRuleMap.get(String(row.device_id)) || [], + alarm_rule_names: (alarmRuleMap.get(String(row.device_id)) || []).map((item) => item.ruleName).filter(Boolean).join('、'), + child_devices: childDeviceMap.get(String(row.id)) || [] })); } @@ -235,6 +377,8 @@ async function saveUnits(deviceId, units = []) { } async function syncInstallPositionByDeviceId() { + // floorInfo 现在只维护普通楼层节点,设备树以 measuring_device 为准。 + return; const [deviceRows, floorRows] = await Promise.all([ query( `select id, device_id, install_position @@ -301,6 +445,10 @@ async function list(params = {}) { where.push('md.device_name like :deviceName'); sqlParams.deviceName = `%${String(params.deviceName).trim()}%`; } + if (params.keyword) { + where.push('(md.device_id like :keyword or md.device_name like :keyword or md.device_code like :keyword)'); + sqlParams.keyword = `%${String(params.keyword).trim()}%`; + } if (params.deviceCode) { where.push('md.device_code like :deviceCode'); sqlParams.deviceCode = `%${String(params.deviceCode).trim()}%`; @@ -313,6 +461,14 @@ async function list(params = {}) { where.push('md.device_type = :deviceType'); sqlParams.deviceType = String(params.deviceType); } + if (params.isDeviceGroup !== undefined && params.isDeviceGroup !== '') { + where.push('md.is_device_group = :isDeviceGroup'); + sqlParams.isDeviceGroup = String(params.isDeviceGroup); + } + if (params.parentDeviceGroupId !== undefined && params.parentDeviceGroupId !== '') { + where.push('md.parent_device_group_id = :parentDeviceGroupId'); + sqlParams.parentDeviceGroupId = String(params.parentDeviceGroupId); + } if (params.installPosition) { where.push('md.install_position = :installPosition'); sqlParams.installPosition = String(params.installPosition); @@ -345,13 +501,14 @@ async function get(id) { async function add(body = {}) { const id = body.id ? String(body.id) : nextId(); const data = normalizeBody(body); + await assertParentDeviceGroup(data.parentDeviceGroupId, id); await query( `insert into ${TABLE} - (id, device_id, device_name, device_code, install_position, device_type, measuring_unit, standard_unit, conversion_factor, + (id, device_id, device_name, device_code, install_position, collecting_device_id, device_type, is_device_group, parent_device_group_id, measuring_unit, standard_unit, conversion_factor, specification_model, manufacturer, factory_no, accuracy_level, measure_range, communication_protocol, communication_address, collection_cycle, status, last_verification_date, responsible_dept, responsible_person, remark, create_time, update_time, del_flag) values - (:id, :deviceId, :deviceName, :deviceCode, :installPosition, :deviceType, :measuringUnit, :standardUnit, :conversionFactor, + (:id, :deviceId, :deviceName, :deviceCode, :installPosition, :collectingDeviceId, :deviceType, :isDeviceGroup, :parentDeviceGroupId, :measuringUnit, :standardUnit, :conversionFactor, :specificationModel, :manufacturer, :factoryNo, :accuracyLevel, :measureRange, :communicationProtocol, :communicationAddress, :collectionCycle, :status, :lastVerificationDate, :responsibleDept, :responsiblePerson, :remark, now(), now(), '0')`, { id, ...data } @@ -363,13 +520,17 @@ async function add(body = {}) { async function update(body = {}) { if (!body.id) throw new Error('ID不能为空'); const data = normalizeBody(body); + await assertParentDeviceGroup(data.parentDeviceGroupId, body.id); await query( `update ${TABLE} set device_id = :deviceId, device_name = :deviceName, device_code = :deviceCode, install_position = :installPosition, + collecting_device_id = :collectingDeviceId, device_type = :deviceType, + is_device_group = :isDeviceGroup, + parent_device_group_id = :parentDeviceGroupId, measuring_unit = :measuringUnit, standard_unit = :standardUnit, conversion_factor = :conversionFactor, @@ -393,6 +554,294 @@ async function update(body = {}) { await saveUnits(body.id, data.units); } +async function assertParentDeviceGroup(parentDeviceGroupId, selfId) { + if (!parentDeviceGroupId) return; + if (String(parentDeviceGroupId) === String(selfId)) { + throw new Error('所属设备组不能选择自己'); + } + const parent = await queryOne( + `select id + from ${TABLE} + where id = :id + and del_flag = '0' + and is_device_group = '1' + limit 1`, + { id: parentDeviceGroupId } + ); + if (!parent) throw new Error('所属设备组不存在'); +} + +async function descendantGroupIds(parentId, result = new Set()) { + const children = await query( + `select id + from ${TABLE} + where del_flag = '0' + and is_device_group = '1' + and parent_device_group_id = :parentId`, + { parentId } + ); + for (const child of children) { + const id = String(child.id); + if (result.has(id)) continue; + result.add(id); + await descendantGroupIds(id, result); + } + return result; +} + +async function mountChildren(body = {}) { + if (!body.parentDeviceGroupId) throw new Error('设备组不能为空'); + const parentId = String(body.parentDeviceGroupId); + const parent = await queryOne( + `select id + from ${TABLE} + where id = :id + and del_flag = '0' + and is_device_group = '1' + limit 1`, + { id: parentId } + ); + if (!parent) throw new Error('设备组不存在'); + + const childIds = Array.isArray(body.childIds) + ? body.childIds.map((item) => String(item).trim()).filter(Boolean) + : String(body.childIds || '').split(',').map((item) => item.trim()).filter(Boolean); + const uniqueChildIds = [...new Set(childIds)]; + if (!uniqueChildIds.length) return; + + const descendants = await descendantGroupIds(parentId); + for (const childId of uniqueChildIds) { + if (childId === parentId) throw new Error('不能挂载自身'); + if (descendants.has(childId)) throw new Error('不能重复挂载当前设备组的下级设备组'); + const childDescendants = await descendantGroupIds(childId); + if (childDescendants.has(parentId)) throw new Error('不能将上级设备组挂载到下级设备组下'); + await query( + `update ${TABLE} + set parent_device_group_id = :parentId, + update_time = now() + where id = :childId + and del_flag = '0'`, + { parentId, childId } + ); + } +} + +async function unmountChild(body = {}) { + if (!body.parentDeviceGroupId) throw new Error('设备组不能为空'); + if (!body.childId) throw new Error('设备不能为空'); + await query( + `update ${TABLE} + set parent_device_group_id = null, + update_time = now() + where id = :childId + and parent_device_group_id = :parentDeviceGroupId + and del_flag = '0'`, + { + parentDeviceGroupId: String(body.parentDeviceGroupId), + childId: String(body.childId) + } + ); +} + +function deviceKey(row = {}) { + return row.device_id || row.deviceId || row.device_code || row.deviceCode || row.id || ''; +} + +async function syncFloorNodesForMeasuringDevice(oldRow, newRow) { + const oldKey = deviceKey(oldRow); + const newKey = deviceKey(newRow); + if (!oldKey && !newKey) return; + const isGroup = newRow.isDeviceGroup === '1' || oldRow?.is_device_group === '1'; + const idColumn = isGroup ? 'device_group_id' : 'device_id'; + const flagColumn = isGroup ? 'is_device_group' : 'is_device'; + await query( + `update floorInfo + set ${idColumn} = :newKey, + name = :name, + device_type = :deviceType, + update_time = now() + where del_flag = '0' + and ${flagColumn} = '1' + and ${idColumn} collate utf8mb4_general_ci = :oldKey collate utf8mb4_general_ci`, + { + oldKey: String(oldKey || newKey), + newKey: String(newKey || oldKey), + name: newRow.deviceName || oldRow?.device_name || '', + deviceType: newRow.deviceType === undefined || newRow.deviceType === null || newRow.deviceType === '' ? null : Number(newRow.deviceType) + } + ); +} + +async function markFloorNodesRemovedForMeasuringDevices(rows = []) { + for (const row of rows) { + const key = deviceKey(row); + if (!key) continue; + const isGroup = row.is_device_group === '1'; + const idColumn = isGroup ? 'device_group_id' : 'device_id'; + const flagColumn = isGroup ? 'is_device_group' : 'is_device'; + const matched = await query( + `select id + from floorInfo + where del_flag = '0' + and ${flagColumn} = '1' + and ${idColumn} collate utf8mb4_general_ci = :deviceKey collate utf8mb4_general_ci`, + { deviceKey: String(key) } + ); + await removeFloorNodes(matched.map((item) => item.id)); + } +} + +async function removeFloorNodes(ids = []) { + const uniqueIds = [...new Set(ids.map((item) => String(item || '').trim()).filter(Boolean))]; + if (!uniqueIds.length) return; + const rows = await query(`select id, pid from floorInfo where del_flag = '0'`); + const removeIds = []; + const visit = (id) => { + if (removeIds.includes(String(id))) return; + removeIds.push(String(id)); + rows.filter((row) => String(row.pid || '0') === String(id)).forEach((row) => visit(row.id)); + }; + uniqueIds.forEach(visit); + const params = {}; + const placeholders = removeIds.map((id, index) => { + params[`id${index}`] = id; + return `:id${index}`; + }); + await query( + `update floorInfo + set del_flag = '1', + update_time = now() + where id in (${placeholders.join(',')})`, + params + ); +} + +async function syncFloorGroupsByMeasuringGroupIds(groupIds = [], visited = new Set()) { + const ids = [...new Set(groupIds.map((item) => String(item || '').trim()).filter(Boolean))]; + for (const id of ids) { + if (visited.has(id)) continue; + visited.add(id); + const group = await queryOne( + `select id, device_id, device_name, device_code, device_type + from ${TABLE} + where id = :id + and del_flag = '0' + and is_device_group = '1' + limit 1`, + { id } + ); + if (!group) continue; + await syncOneFloorGroup(group, visited); + } +} + +async function syncOneFloorGroup(group, visited) { + const groupKeys = [group.id, group.device_id, group.device_code].map((item) => String(item || '').trim()).filter(Boolean); + if (!groupKeys.length) return; + const params = {}; + const groupPlaceholders = groupKeys.map((key, index) => { + params[`groupKey${index}`] = key; + return `:groupKey${index}`; + }); + const floorGroups = await query( + `select id + from floorInfo + where del_flag = '0' + and is_device_group = '1' + and device_group_id in (${groupPlaceholders.join(',')})`, + params + ); + if (!floorGroups.length) return; + + const children = await query( + `select id, device_id, device_name, device_code, device_type, is_device_group + from ${TABLE} + where del_flag = '0' + and parent_device_group_id = :groupId + order by is_device_group desc, create_time asc, id asc`, + { groupId: group.id } + ); + const expected = new Set(children.map((child) => `${child.is_device_group === '1' ? 'group' : 'device'}:${deviceKey(child)}`)); + + for (const floorGroup of floorGroups) { + await query( + `update floorInfo + set name = :name, + device_type = :deviceType, + update_time = now() + where id = :id`, + { + id: floorGroup.id, + name: group.device_name || group.device_code || group.device_id || '', + deviceType: group.device_type === undefined || group.device_type === null ? null : Number(group.device_type) + } + ); + const currentChildren = await query( + `select id, is_device, device_id, is_device_group, device_group_id + from floorInfo + where del_flag = '0' + and pid = :pid + and (is_device = '1' or is_device_group = '1')`, + { pid: floorGroup.id } + ); + const staleIds = currentChildren + .filter((child) => { + const key = child.is_device_group === '1' ? `group:${child.device_group_id}` : `device:${child.device_id}`; + return !expected.has(key); + }) + .map((child) => child.id); + await removeFloorNodes(staleIds); + + for (const child of children) { + const isGroup = child.is_device_group === '1'; + const childKey = deviceKey(child); + const existing = await queryOne( + `select id + from floorInfo + where del_flag = '0' + and ${isGroup ? 'is_device_group' : 'is_device'} = '1' + and ${isGroup ? 'device_group_id' : 'device_id'} collate utf8mb4_general_ci = :childKey collate utf8mb4_general_ci + limit 1`, + { childKey: String(childKey) } + ); + if (existing) { + await query( + `update floorInfo + set pid = :pid, + name = :name, + device_type = :deviceType, + update_time = now() + where id = :id`, + { + id: existing.id, + pid: floorGroup.id, + name: child.device_name || child.device_code || child.device_id || '', + deviceType: child.device_type === undefined || child.device_type === null ? null : Number(child.device_type) + } + ); + } else { + await query( + `insert into floorInfo + (id, pid, name, create_time, update_time, del_flag, is_device, device_id, is_device_group, device_group_id, device_type, reserve5) + values + (:id, :pid, :name, now(), now(), '0', :isDevice, :deviceId, :isDeviceGroup, :deviceGroupId, :deviceType, null)`, + { + id: nextId(), + pid: floorGroup.id, + name: child.device_name || child.device_code || child.device_id || '', + isDevice: isGroup ? '0' : '1', + deviceId: isGroup ? null : childKey, + isDeviceGroup: isGroup ? '1' : '0', + deviceGroupId: isGroup ? childKey : null, + deviceType: child.device_type === undefined || child.device_type === null ? null : Number(child.device_type) + } + ); + } + if (isGroup) await syncFloorGroupsByMeasuringGroupIds([child.id], visited); + } + } +} + async function remove(ids) { const list = String(ids || '') .split(',') @@ -419,5 +868,7 @@ module.exports = { get, add, update, + mountChildren, + unmountChild, remove }; diff --git a/src/services/floorInfoService.js b/src/services/floorInfoService.js index d84b327..e2f7607 100644 --- a/src/services/floorInfoService.js +++ b/src/services/floorInfoService.js @@ -17,7 +17,9 @@ function camelRow(row) { isDeviceGroup: row.is_device_group, deviceGroupId: row.device_group_id, deviceType: row.device_type, - reserve5: row.reserve5 + reserve5: row.reserve5, + source: row.source, + measuringDeviceId: row.measuring_device_id }; } @@ -30,11 +32,95 @@ function buildTree(rows, pid = '0', level = 1) { .filter((row) => String(row.pid || '0') === String(pid)) .map((row) => { const item = { ...camelRow(row), level }; - item.children = buildTree(rows, row.id, level + 1); + const children = buildTree(rows, row.id, level + 1); + if (children.length) { + item.children = children; + } return item; }); } +function deviceKey(row = {}) { + return row.device_id || row.device_code || row.id || ''; +} + +async function cleanupAutoMountedRows() { + await query( + `update ${TABLE} + set del_flag = '1', + update_time = now() + where del_flag = '0' + and reserve5 = 'auto_mounted'` + ); +} + +async function loadMeasuringDeviceChildren(parentDeviceIds = []) { + const ids = [...new Set(parentDeviceIds.map((item) => String(item || '').trim()).filter(Boolean))]; + if (!ids.length) return new Map(); + const params = {}; + const placeholders = ids.map((id, index) => { + params[`id${index}`] = id; + return `:id${index}`; + }); + const rows = await query( + `select id, + parent_device_group_id, + device_id, + device_name, + device_code, + device_type, + is_device_group, + create_time, + update_time + from measuring_device + where del_flag = '0' + and parent_device_group_id in (${placeholders.join(', ')}) + order by is_device_group desc, create_time asc, id asc`, + params + ); + const map = new Map(); + rows.forEach((row) => { + const parentId = String(row.parent_device_group_id || ''); + if (!map.has(parentId)) map.set(parentId, []); + map.get(parentId).push(row); + }); + return map; +} + +async function findMeasuringDeviceGroupsByKeys(keys = []) { + const validKeys = [...new Set(keys.map((item) => String(item || '').trim()).filter(Boolean))]; + if (!validKeys.length) return new Map(); + const params = {}; + const placeholders = validKeys.map((key, index) => { + params[`key${index}`] = key; + return `:key${index}`; + }); + const rows = await query( + `select id, + device_id, + device_code + from measuring_device + where del_flag = '0' + and is_device_group = '1' + and ( + id in (${placeholders.join(', ')}) + or device_id in (${placeholders.join(', ')}) + or device_code in (${placeholders.join(', ')}) + )`, + params + ); + const map = new Map(); + rows.forEach((row) => { + [row.id, row.device_id, row.device_code] + .map((item) => String(item || '').trim()) + .filter(Boolean) + .forEach((key) => { + map.set(key, row.id); + }); + }); + return map; +} + async function list(params = {}) { const where = ["del_flag = '0'"]; const sqlParams = {}; @@ -98,16 +184,92 @@ async function update(body = {}) { async function mountDevice(body = {}) { if (!body.pid) throw new Error('pid不能为空'); - const normalized = normalizeDeviceFields({ - ...body, - isDevice: truthy(body.isDeviceGroup) ? '0' : '1', - isDeviceGroup: truthy(body.isDeviceGroup) ? '1' : '0' - }); - return add({ - ...body, - ...normalized, - name: body.name || (truthy(body.isDeviceGroup) ? `设备组-${normalized.deviceGroupId}` : `设备-${normalized.deviceId}`) + if (!body.measuringDeviceId) throw new Error('计量设备不能为空'); + const device = await queryOne( + `select id, device_id, device_name, device_code, device_type, is_device_group + from measuring_device + where id = :id + and del_flag = '0' + limit 1`, + { id: body.measuringDeviceId } + ); + if (!device) throw new Error('计量设备不存在'); + const isGroup = device.is_device_group === '1'; + const deviceKey = device.device_id || device.device_code || device.id; + const floorId = await add({ + pid: body.pid, + name: device.device_name || device.device_code || device.device_id || '', + isDevice: isGroup ? '0' : '1', + deviceId: isGroup ? null : deviceKey, + isDeviceGroup: isGroup ? '1' : '0', + deviceGroupId: isGroup ? deviceKey : null, + deviceType: device.device_type }); + return floorId; +} + +async function syncMountedDevices() { + await cleanupAutoMountedRows(); + const floorGroups = await query( + `select id, + device_group_id + from ${TABLE} + where del_flag = '0' + and is_device_group = '1' + and device_group_id is not null + and device_group_id <> '' + and (reserve5 is null or reserve5 <> 'auto_mounted') + order by create_time asc, id asc` + ); + const groupMap = await findMeasuringDeviceGroupsByKeys(floorGroups.map((row) => row.device_group_id)); + let inserted = 0; + for (const floorGroup of floorGroups) { + const measuringGroupId = groupMap.get(String(floorGroup.device_group_id || '')); + if (!measuringGroupId) continue; + inserted += await syncMeasuringDeviceGroupChildren(floorGroup.id, measuringGroupId, new Set()); + } + return { inserted }; +} + +async function syncMeasuringDeviceGroupChildren(floorParentId, measuringGroupId, visited) { + const visitKey = `${floorParentId}:${measuringGroupId}`; + if (visited.has(visitKey)) return 0; + visited.add(visitKey); + const childMap = await loadMeasuringDeviceChildren([measuringGroupId]); + const children = childMap.get(String(measuringGroupId)) || []; + let inserted = 0; + for (const child of children) { + const isGroup = child.is_device_group === '1'; + const childKey = deviceKey(child); + const existing = await queryOne( + `select id + from ${TABLE} + where del_flag = '0' + and pid = :pid + and ${isGroup ? 'is_device_group' : 'is_device'} = '1' + and ${isGroup ? 'device_group_id' : 'device_id'} collate utf8mb4_general_ci = :childKey collate utf8mb4_general_ci + limit 1`, + { + pid: floorParentId, + childKey: String(childKey) + } + ); + const floorChildId = existing?.id || await add({ + pid: floorParentId, + name: child.device_name || child.device_code || child.device_id || '', + isDevice: isGroup ? '0' : '1', + deviceId: isGroup ? null : childKey, + isDeviceGroup: isGroup ? '1' : '0', + deviceGroupId: isGroup ? childKey : null, + deviceType: child.device_type, + reserve5: 'auto_mounted' + }); + if (!existing) inserted += 1; + if (isGroup) { + inserted += await syncMeasuringDeviceGroupChildren(floorChildId, child.id, visited); + } + } + return inserted; } async function remove(id) { @@ -175,6 +337,7 @@ async function assertCanUseParent(pid, body = {}, selfId = null) { module.exports = { list, + syncMountedDevices, get, add, update, diff --git a/src/services/report/metricLineService.js b/src/services/report/metricLineService.js index 974d9ba..0eaee6a 100644 --- a/src/services/report/metricLineService.js +++ b/src/services/report/metricLineService.js @@ -29,6 +29,7 @@ const AGGREGATION_SQL = { const AGGREGATION_RULES = new Set([...Object.keys(AGGREGATION_SQL), 'subtract']); const COLLECTION_FIELDS = new Set(Object.values(METRIC_FIELD_MAP)); +const LINE_INTERVAL_SECONDS = new Set([5, 10, 15, 30, 60, 300, 600, 1800, 3600, 86400]); function splitIds(value) { if (Array.isArray(value)) { @@ -82,7 +83,7 @@ function ok(data) { }; } -function emptyLine(beginTime, endTime, metric) { +function emptyLine(beginTime, endTime, metric, interval) { return ok({ xAxis: [], series: [], @@ -90,7 +91,8 @@ function emptyLine(beginTime, endTime, metric) { records: [], metric, beginTime, - endTime + endTime, + interval }); } @@ -131,6 +133,21 @@ function normalizeAggregationRule(rule) { return AGGREGATION_RULES.has(rule) ? rule : 'avg'; } +function normalizeInterval(value) { + if (value === null || value === undefined || value === '') { + return undefined; + } + const interval = Number(value); + return LINE_INTERVAL_SECONDS.has(interval) ? interval : undefined; +} + +function buildReportTimeSql(interval) { + if (!interval) { + return "date_format(report_time, '%Y-%m-%d %H:%i:%s')"; + } + return `date_format(from_unixtime(floor(unix_timestamp(report_time) / ${interval}) * ${interval}), '%Y-%m-%d %H:%i:%s')`; +} + function buildLocationMaps(rows) { const byId = new Map(rows.map((row) => [String(row.id), row])); const nameMap = new Map(); @@ -187,12 +204,14 @@ function createMetricLineService(config) { const endTime = normalizeDateTime(queryParams.endTime, new Date(range.endTime)); const deviceIds = splitIds(queryParams.deviceIds); const metric = await getMetric(config, queryParams.metricId); + const interval = normalizeInterval(queryParams.interval); if (!metric.field || !deviceIds.length) { - return emptyLine(beginTime, endTime, metric); + return emptyLine(beginTime, endTime, metric, interval); } const aggregationRule = normalizeAggregationRule(metric.aggregationRule); + const reportTimeSql = buildReportTimeSql(interval); const params = { beginTime, endTime }; const placeholders = deviceIds.map((deviceId, index) => { params[`deviceId${index}`] = deviceId; @@ -212,21 +231,35 @@ function createMetricLineService(config) { ); const { nameMap, locationMap } = buildLocationMaps(nodeRows); - const rows = aggregationRule === 'subtract' - ? await query( + let rows; + if (!interval) { + rows = await query( + `select device_id as deviceId, + ${reportTimeSql} as reportTime, + ${metric.field} as value + from device_collection_data_info + where report_time >= :beginTime + and report_time <= :endTime + and ${metric.field} is not null + and device_id in (${placeholders.join(', ')}) + order by report_time asc, id asc, device_id asc`, + params + ); + } else if (aggregationRule === 'subtract') { + rows = await query( `select deviceId, reportTime, max(case when rnDesc = 1 then metricValue end) - max(case when rnAsc = 1 then metricValue end) as value from ( select device_id as deviceId, - date_format(report_time, '%Y-%m-%d %H:%i') as reportTime, + ${reportTimeSql} as reportTime, ${metric.field} as metricValue, row_number() over ( - partition by device_id, date_format(report_time, '%Y-%m-%d %H:%i') + partition by device_id, ${reportTimeSql} order by report_time asc, id asc ) as rnAsc, row_number() over ( - partition by device_id, date_format(report_time, '%Y-%m-%d %H:%i') + partition by device_id, ${reportTimeSql} order by report_time desc, id desc ) as rnDesc from device_collection_data_info @@ -238,10 +271,11 @@ function createMetricLineService(config) { group by deviceId, reportTime order by reportTime asc, deviceId asc`, params - ) - : await query( + ); + } else { + rows = await query( `select device_id as deviceId, - date_format(report_time, '%Y-%m-%d %H:%i') as reportTime, + ${reportTimeSql} as reportTime, ${AGGREGATION_SQL[aggregationRule](metric.field)} as value from device_collection_data_info where report_time >= :beginTime @@ -252,6 +286,7 @@ function createMetricLineService(config) { order by reportTime asc, device_id asc`, params ); + } const xAxis = [...new Set(rows.map((row) => row.reportTime))]; const byDevice = new Map(); @@ -269,7 +304,7 @@ function createMetricLineService(config) { name: nameMap.get(deviceId) || deviceId, type: 'line', smooth: true, - connectNulls: false, + connectNulls: true, data: xAxis.map((time) => values.get(time) ?? null) }; }); @@ -298,7 +333,8 @@ function createMetricLineService(config) { }, records, beginTime, - endTime + endTime, + interval }); } diff --git a/src/services/report/senior.js b/src/services/report/senior.js index 071f031..2ce79e3 100644 --- a/src/services/report/senior.js +++ b/src/services/report/senior.js @@ -9,6 +9,12 @@ const REPORT_HANDLERS = { 3: seniorTemperature.power }; +const FLOW_HANDLERS = { + 1: seniorWater.flow, + 2: seniorElectricity.flow, + 3: seniorTemperature.flow +}; + function power(queryParams = {}) { const reportType = String(queryParams.reportType || ''); const handler = REPORT_HANDLERS[reportType]; @@ -18,6 +24,16 @@ function power(queryParams = {}) { return handler(queryParams); } +function flow(queryParams = {}) { + const reportType = String(queryParams.reportType || ''); + const handler = FLOW_HANDLERS[reportType]; + if (!handler) { + return emptyResult(queryParams); + } + return handler(queryParams); +} + module.exports = { - power + power, + flow }; diff --git a/src/services/report/seniorCommon.js b/src/services/report/seniorCommon.js index f2c90f2..5568d39 100644 --- a/src/services/report/seniorCommon.js +++ b/src/services/report/seniorCommon.js @@ -59,6 +59,10 @@ function emptyResult(queryParams = {}) { unit: '', riverData: [], treemapData: [], + sankeyData: { + nodes: [], + links: [] + }, devices: [] }); } @@ -150,6 +154,71 @@ async function loadValueRows(deviceIds, beginTime, endTime, config) { ); } +async function loadTotalRows(deviceIds, beginTime, endTime, config) { + if (!deviceIds.length) { + return []; + } + + const params = { beginTime, endTime }; + const placeholders = deviceIds.map((deviceId, index) => { + params[`deviceId${index}`] = deviceId; + return `:deviceId${index}`; + }); + const deviceSql = placeholders.join(', '); + + if (config.totalMode === 'subtract') { + return query( + `select latest.device_id as deviceId, + greatest(coalesce(latest.value, 0) - coalesce(earliest.value, 0), 0) as value + from ( + select source.device_id, + source.${config.valueField} as value + from device_collection_data_info source + join ( + select device_id, + max(report_time) as report_time + from device_collection_data_info + where report_time >= :beginTime + and report_time <= :endTime + and ${config.valueField} is not null + and device_id in (${deviceSql}) + group by device_id + ) marker on marker.device_id = source.device_id and marker.report_time = source.report_time + ) latest + join ( + select source.device_id, + source.${config.valueField} as value + from device_collection_data_info source + join ( + select device_id, + min(report_time) as report_time + from device_collection_data_info + where report_time >= :beginTime + and report_time <= :endTime + and ${config.valueField} is not null + and device_id in (${deviceSql}) + group by device_id + ) marker on marker.device_id = source.device_id and marker.report_time = source.report_time + ) earliest on earliest.device_id = latest.device_id + order by latest.device_id asc`, + params + ); + } + + return query( + `select device_id as deviceId, + ${config.totalAggregate || config.aggregate || 'sum'}(${config.valueField}) as value + from device_collection_data_info + where report_time >= :beginTime + and report_time <= :endTime + and ${config.valueField} is not null + and device_id in (${deviceSql}) + group by device_id + order by device_id asc`, + params + ); +} + function buildDeviceValueMap(rows) { const byDevice = new Map(); rows.forEach((row) => { @@ -178,6 +247,18 @@ function sumDeviceTotal(deviceIds, byDevice) { }, 0); } +function buildDeviceTotalMap(rows) { + const map = new Map(); + rows.forEach((row) => { + map.set(row.deviceId, Number(row.value || 0)); + }); + return map; +} + +function sumTotalByDevices(deviceIds, totalMap) { + return deviceIds.reduce((sum, deviceId) => sum + Number(totalMap.get(deviceId) || 0), 0); +} + function buildDisplayEntries({ roots, typedDevices, showGroup, deviceType }) { const entries = typedDevices.map((device) => ({ id: String(device.id), @@ -276,6 +357,58 @@ function buildTreemapData({ roots, typedDevices, showGroup, byDevice, deviceType return roots.map(buildNode).filter(Boolean); } +function buildSankeyData({ roots, deviceType, totalMap }) { + const nodes = new Map(); + const links = []; + const rootName = '根节点'; + nodes.set(rootName, { name: rootName, value: 0 }); + + function normalChildren(node) { + return node.children.filter((child) => child.isDevice !== '1' && child.isDeviceGroup !== '1'); + } + + function nodeDeviceIds(node) { + return uniqueDevices(collectTypedDevices(node, deviceType, [])).map((device) => device.deviceId); + } + + function nodeValue(node) { + return sumTotalByDevices(nodeDeviceIds(node), totalMap); + } + + function nodeName(node) { + return `${node.name || node.id}`; + } + + function addNormalNode(node, parentName) { + const value = Number(nodeValue(node).toFixed(3)); + if (value <= 0) { + return; + } + + const name = nodeName(node); + nodes.set(name, { name, value }); + links.push({ + source: parentName, + target: name, + value + }); + + normalChildren(node).forEach((child) => addNormalNode(child, name)); + } + + roots + .filter((node) => node.isDevice !== '1' && node.isDeviceGroup !== '1') + .forEach((node) => addNormalNode(node, rootName)); + + const total = links.filter((link) => link.source === rootName).reduce((sum, link) => sum + Number(link.value || 0), 0); + nodes.set(rootName, { name: rootName, value: Number(total.toFixed(3)) }); + + return { + nodes: [...nodes.values()], + links + }; +} + async function buildSeniorPower(queryParams = {}, config) { const range = defaultRange(); const beginTime = normalizeDateTime(queryParams.beginTime, new Date(range.beginTime)); @@ -306,7 +439,33 @@ async function buildSeniorPower(queryParams = {}, config) { }); } +async function buildSeniorFlow(queryParams = {}, config) { + const range = defaultRange(); + const beginTime = normalizeDateTime(queryParams.beginTime, new Date(range.beginTime)); + const endTime = normalizeDateTime(queryParams.endTime, new Date(range.endTime)); + const { roots } = await loadFloorTree(); + const typedDevices = uniqueDevices(roots.flatMap((node) => collectTypedDevices(node, config.deviceType, []))); + const deviceIds = typedDevices.map((device) => device.deviceId); + const rows = await loadTotalRows(deviceIds, beginTime, endTime, config); + const totalMap = buildDeviceTotalMap(rows); + + return ok({ + beginTime, + endTime, + reportType: config.reportType, + valueName: config.flowValueName || config.valueName, + unit: config.unit, + sankeyData: buildSankeyData({ roots, deviceType: config.deviceType, totalMap }), + devices: typedDevices.map((device) => ({ + id: device.id, + name: device.name || device.deviceId, + deviceId: device.deviceId + })) + }); +} + module.exports = { buildSeniorPower, + buildSeniorFlow, emptyResult }; diff --git a/src/services/report/seniorElectricity.js b/src/services/report/seniorElectricity.js index c12e988..1eaa70b 100644 --- a/src/services/report/seniorElectricity.js +++ b/src/services/report/seniorElectricity.js @@ -1,4 +1,4 @@ -const { buildSeniorPower } = require('./seniorCommon'); +const { buildSeniorFlow, buildSeniorPower } = require('./seniorCommon'); function power(queryParams = {}) { return buildSeniorPower(queryParams, { @@ -11,6 +11,19 @@ function power(queryParams = {}) { }); } +function flow(queryParams = {}) { + return buildSeniorFlow(queryParams, { + reportType: '2', + deviceType: '2', + valueField: 'forward_active_total_energy', + totalMode: 'subtract', + flowValueName: '用电', + valueName: '用电', + unit: 'kWh' + }); +} + module.exports = { - power + power, + flow }; diff --git a/src/services/report/seniorTemperature.js b/src/services/report/seniorTemperature.js index c39c5dd..0c1de5a 100644 --- a/src/services/report/seniorTemperature.js +++ b/src/services/report/seniorTemperature.js @@ -1,4 +1,4 @@ -const { buildSeniorPower } = require('./seniorCommon'); +const { buildSeniorFlow, buildSeniorPower } = require('./seniorCommon'); function power(queryParams = {}) { return buildSeniorPower(queryParams, { @@ -11,6 +11,19 @@ function power(queryParams = {}) { }); } +function flow(queryParams = {}) { + return buildSeniorFlow(queryParams, { + reportType: '3', + deviceType: '3', + valueField: 'temperature_reading', + totalAggregate: 'avg', + flowValueName: '温度', + valueName: '温度', + unit: '℃' + }); +} + module.exports = { - power + power, + flow }; diff --git a/src/services/report/seniorWater.js b/src/services/report/seniorWater.js index ff0f58e..d685d02 100644 --- a/src/services/report/seniorWater.js +++ b/src/services/report/seniorWater.js @@ -1,4 +1,4 @@ -const { buildSeniorPower } = require('./seniorCommon'); +const { buildSeniorFlow, buildSeniorPower } = require('./seniorCommon'); function power(queryParams = {}) { return buildSeniorPower(queryParams, { @@ -11,6 +11,19 @@ function power(queryParams = {}) { }); } +function flow(queryParams = {}) { + return buildSeniorFlow(queryParams, { + reportType: '1', + deviceType: '1', + valueField: 'water_meter_reading', + totalMode: 'subtract', + flowValueName: '用水', + valueName: '用水', + unit: 'm³' + }); +} + module.exports = { - power + power, + flow }; diff --git a/yarn.lock b/yarn.lock index 2a22365..c3075f2 100644 --- a/yarn.lock +++ b/yarn.lock @@ -493,6 +493,11 @@ negotiator@0.6.3: resolved "https://registry.npmmirror.com/negotiator/-/negotiator-0.6.3.tgz#58e323a72fedc0d6f9cd4d31fe49f51479590ccd" integrity sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg== +node-cron@^4.2.1: + version "4.2.1" + resolved "https://registry.npmmirror.com/node-cron/-/node-cron-4.2.1.tgz#6979be4aee4702f06322d21220df8de252c8e265" + integrity sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg== + object-assign@^4: version "4.1.1" resolved "https://registry.npmmirror.com/object-assign/-/object-assign-4.1.1.tgz#2109adc7965887cfc05cbbd442cac8bfbb360863" @@ -676,3 +681,8 @@ vary@^1, vary@~1.1.2: version "1.1.2" resolved "https://registry.npmmirror.com/vary/-/vary-1.1.2.tgz#2299f02c6ded30d4a5961b0b9f74524a18f634fc" integrity sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg== + +ws@^8.21.0: + version "8.21.0" + resolved "https://registry.npmmirror.com/ws/-/ws-8.21.0.tgz#012e413fc07429945121b0c153158c4343086951" + integrity sha512-Vsp28b7DRcimFQvrqu2Wek3z1iYxDCWqHYB8Qsnk/S4RfaCQzPGPyBNuVjJV3cd6UiKtUtp6sNM77gWvzcCH+g==