#region << 版 本 注 释 >> /*-------------------------------------------------------------------- * 版权所有 (c) 2025 WenJY 保留所有权利。 * CLR版本:4.0.30319.42000 * 机器名称:Mr.Wen's MacBook Pro * 命名空间:Sln.Iot.Business * 唯一标识:D5F7092D-7C5C-42B9-A373-1332CFD77169 * * 创建者:WenJY * 电子邮箱: * 创建时间:2025-05-20 14:45:41 * 版本:V1.0.0 * 描述: * *-------------------------------------------------------------------- * 修改人: * 时间: * 修改说明: * * 版本:V1.0.0 *--------------------------------------------------------------------*/ #endregion << 版 本 注 释 >> using System; using System.Collections.Generic; using System.Linq; using Newtonsoft.Json; using Sln.Iot.Business.@base; using Sln.Iot.Common; using Sln.Iot.Config; using Sln.Iot.Model.dao; using Sln.Iot.Model.dto; using Sln.Iot.Repository.service; using Sln.Iot.Serilog; using Sln.Iot.Socket.Adapter; using TouchSocket.Core; using TouchSocket.Sockets; namespace Sln.Iot.Business { public class FluidBusiness:BaseBusiness { private Dictionary _lastCollectTimeDict = new Dictionary(); private readonly IRecordFluidInstantService? _service; public FluidBusiness(SerilogHelper logger, AppConfig appConfig, StringChange stringChange, IRecordFluidInstantService? service) : base(logger, appConfig, stringChange) { _service = service; } public override FilterResult BufferAnalysis(ISocketClient client, BufferRequestInfo requestInfo, int bodyLength) { ByteBlock byteBlock = new ByteBlock(requestInfo.Body); if (byteBlock.CanReadLen < 1) { return FilterResult.Cache; } int pos = byteBlock.Pos; try { List result = new List(); var amount = requestInfo.BufferLength / bodyLength; _logger.Info($"收到{amount}个流体数据,开始循环解析......"); for (int i = 0; i < amount; i++) { RecordFluidInstant fluidInstant = new RecordFluidInstant(); #region 表号解析 byteBlock.Read(out byte[] b_MeterID, 2); var decParts = b_MeterID.Select(b => $"{b:D2}").ToArray(); var equipId = $"{requestInfo.ColletEquipCode}_{decParts[0]}{decParts[1]}"; #endregion fluidInstant.monitorId = equipId; do { byteBlock.Read(out byte[] b_UA_flag, 2); base._stringChange.ConvertBytesToUInt16(b_UA_flag, out uint flag); switch (flag) { case CommParams.Press: //压力值 byteBlock.Read(out byte[] b_Press, 4); base._stringChange.SwapBytes(ref b_Press); float f_Press = BitConverter.ToSingle(b_Press, 0); ValueIsNan(ref f_Press); fluidInstant.press = (decimal)f_Press; break; case CommParams.STemperature: //温度值 byteBlock.Read(out byte[] b_Temperature, 4); base._stringChange.SwapBytes(ref b_Temperature); float f_Temperature = BitConverter.ToSingle(b_Temperature, 0); ValueIsNan(ref f_Temperature); fluidInstant.temperature = (decimal)f_Temperature; break; case CommParams.Frequency: //频率值 byteBlock.Read(out byte[] b_Frequency, 4); base._stringChange.SwapBytes(ref b_Frequency); float f_Frequency = BitConverter.ToSingle(b_Frequency, 0); ValueIsNan(ref f_Frequency); fluidInstant.frequency = (decimal)f_Frequency; break; case CommParams.Density: //密度值 byteBlock.Read(out byte[] b_Density, 4); base._stringChange.SwapBytes(ref b_Density); float f_Density = BitConverter.ToSingle(b_Density, 0); ValueIsNan(ref f_Density); fluidInstant.density = (decimal)f_Density; break; case CommParams.FluxInstantValue: //瞬时流值 byteBlock.Read(out byte[] b_FluxInstantValue, 4); base._stringChange.SwapBytes(ref b_FluxInstantValue); float f_FluxInstantValue = BitConverter.ToSingle(b_FluxInstantValue, 0); ValueIsNan(ref f_FluxInstantValue); fluidInstant.instantFlow = (decimal)f_FluxInstantValue; break; case CommParams.FluxEyeableTotalValue: //累计流量值 byteBlock.Read(out byte[] b_FluxEyeableTotalValue, 4); base._stringChange.SwapBytes(ref b_FluxEyeableTotalValue); float f_FluxEyeableTotalValue = BitConverter.ToSingle(b_FluxEyeableTotalValue, 0); ValueIsNan(ref f_FluxEyeableTotalValue); fluidInstant.totalFlow = (decimal)f_FluxEyeableTotalValue; break; case CommParams.HeatInstantValue: //瞬时热量 byteBlock.Read(out byte[] b_HeatInstantValue, 4); base._stringChange.SwapBytes(ref b_HeatInstantValue); float f_HeatInstantValue = BitConverter.ToSingle(b_HeatInstantValue, 0); ValueIsNan(ref f_HeatInstantValue); fluidInstant.instantHeat = (decimal)f_HeatInstantValue; break; case CommParams.HeatToftalValue: //累计热量值 byteBlock.Read(out byte[] b_HeatToftalValue, 4); base._stringChange.SwapBytes(ref b_HeatToftalValue); float f_HeatToftalValue = BitConverter.ToSingle(b_HeatToftalValue, 0); ValueIsNan(ref f_HeatToftalValue); fluidInstant.totalHeat = (decimal)f_HeatToftalValue; break; case CommParams.CJSJ: //采集时间 byteBlock.Read(out byte[] b_CJSJ, 6); string strDateTime = "20" + b_CJSJ[5].ToString("x2") + "-" + b_CJSJ[4].ToString("x2") + "-" + b_CJSJ[3].ToString("x2") + " " + b_CJSJ[2].ToString("x2") + ":" + b_CJSJ[1].ToString("x2") + ":" + b_CJSJ[0].ToString("x2"); fluidInstant.collectTime = Convert.ToDateTime(strDateTime); break; } } while (byteBlock.Pos % bodyLength != 0); fluidInstant.recordTime = DateTime.Now; var serializeObject = JsonConvert.SerializeObject(fluidInstant); _logger.Info($"第{i+1}个流体仪表{fluidInstant.monitorId}解析完成:{serializeObject}"); DateTime? lastCollectTime = GetLastCollectTime(fluidInstant.monitorId); if (lastCollectTime != null && DateTime.Now -lastCollectTime < TimeSpan.FromMinutes(_appConfig.fluidTimeInterval)) { //时间间隔小于采集间隔,不保存 continue; } // 如果字典中已有该键,则更新;否则新增 _lastCollectTimeDict[fluidInstant.monitorId] = DateTime.Now; result.Add(fluidInstant); } if (result.Count > 0) { //是否开启 FF 异常值过滤 if (_appConfig.virtualFlag) { ParamVerification(ref result); } var inRes = _service.SplitInsert(result,out List insertIds); _logger.Info($"{result.Count}个流体数据解析处理完成,数据保存{(inRes ? "成功" : "失败")}"); } else { _logger.Info($"{amount}个流体数据解析处理完成,没有需要保存的数据"); } return FilterResult.Success; } catch (Exception e) { base._logger.Error($"流体数据解析异常:{e.Message}"); } return FilterResult.Cache; } /// /// FF FF参数过滤 /// /// /// private void ParamVerification(ref List fluidInstants) { if (fluidInstants == null) { throw new ArgumentNullException($"过滤参数方法异常,传入参数为空"); } for (int i = fluidInstants.Count - 1; i >= 0; i--) { var item = fluidInstants[i]; if (item.totalFlow == _appConfig.virtualValue) { _logger.Info($"MonitorId:{item.monitorId},累计流量为 FF FF FF FF,已启用过滤不保存该表数据"); fluidInstants.RemoveAt(i); continue; } } } private DateTime? GetLastCollectTime(string key) { // 检查键是否存在 if (_lastCollectTimeDict.TryGetValue(key, out DateTime collectTime)) { return collectTime; } else { return null; } } /// /// 应答处理 /// /// /// public override void ResponseHandle(ISocketClient client, byte[] buffer) { ResponsePack sendResponsePackInfo = new ResponsePack() { m_MessageType = 0xB4 }; base.GetMessagePack(ref sendResponsePackInfo, buffer); base.SendMessageAsync(client, sendResponsePackInfo); } } }