You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

276 lines
12 KiB
C#

#region << 版 本 注 释 >>
/*--------------------------------------------------------------------
* (c) 2025 WenJY
* CLR4.0.30319.42000
* Mr.Wen's MacBook Pro
* Sln.Iot.Business
* D5F7092D-7C5C-42B9-A373-1332CFD77169
*
* WenJY
*
* 2025-05-20 14:45:41
* V1.0.0
*
*
*--------------------------------------------------------------------
*
*
*
*
* V1.0.0
*--------------------------------------------------------------------*/
#endregion << 版 本 注 释 >>
using System;
using System.Collections.Generic;
using System.Linq;
using Newtonsoft.Json;
using Sln.Iot.Business.@base;
using Sln.Iot.Common;
using Sln.Iot.Config;
using Sln.Iot.Model.dao;
using Sln.Iot.Model.dto;
using Sln.Iot.Repository.service;
using Sln.Iot.Serilog;
using Sln.Iot.Socket.Adapter;
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace Sln.Iot.Business
{
public class FluidBusiness:BaseBusiness
{
private Dictionary<string ,DateTime> _lastCollectTimeDict = new Dictionary<string, DateTime>();
private readonly IRecordFluidInstantService? _service;
public FluidBusiness(SerilogHelper logger, AppConfig appConfig, StringChange stringChange, IRecordFluidInstantService? service) : base(logger, appConfig, stringChange)
{
_service = service;
}
public override FilterResult BufferAnalysis(ISocketClient client, BufferRequestInfo requestInfo, int bodyLength)
{
ByteBlock byteBlock = new ByteBlock(requestInfo.Body);
if (byteBlock.CanReadLen < 1)
{
return FilterResult.Cache;
}
int pos = byteBlock.Pos;
try
{
List<RecordFluidInstant> result = new List<RecordFluidInstant>();
var amount = requestInfo.BufferLength / bodyLength;
_logger.Info($"收到{amount}个流体数据,开始循环解析......");
for (int i = 0; i < amount; i++)
{
RecordFluidInstant fluidInstant = new RecordFluidInstant();
#region 表号解析
byteBlock.Read(out byte[] b_MeterID, 2);
var decParts = b_MeterID.Select(b => $"{b:D2}").ToArray();
var equipId = $"{requestInfo.ColletEquipCode}_{decParts[0]}{decParts[1]}";
#endregion
fluidInstant.monitorId = equipId;
do
{
byteBlock.Read(out byte[] b_UA_flag, 2);
base._stringChange.ConvertBytesToUInt16(b_UA_flag, out uint flag);
switch (flag)
{
case CommParams.Press: //压力值
byteBlock.Read(out byte[] b_Press, 4);
base._stringChange.SwapBytes(ref b_Press);
float f_Press = BitConverter.ToSingle(b_Press, 0);
ValueIsNan(ref f_Press);
fluidInstant.press = (decimal)f_Press;
break;
case CommParams.STemperature: //温度值
byteBlock.Read(out byte[] b_Temperature, 4);
base._stringChange.SwapBytes(ref b_Temperature);
float f_Temperature = BitConverter.ToSingle(b_Temperature, 0);
ValueIsNan(ref f_Temperature);
fluidInstant.temperature = (decimal)f_Temperature;
break;
case CommParams.Frequency: //频率值
byteBlock.Read(out byte[] b_Frequency, 4);
base._stringChange.SwapBytes(ref b_Frequency);
float f_Frequency = BitConverter.ToSingle(b_Frequency, 0);
ValueIsNan(ref f_Frequency);
fluidInstant.frequency = (decimal)f_Frequency;
break;
case CommParams.Density: //密度值
byteBlock.Read(out byte[] b_Density, 4);
base._stringChange.SwapBytes(ref b_Density);
float f_Density = BitConverter.ToSingle(b_Density, 0);
ValueIsNan(ref f_Density);
fluidInstant.density = (decimal)f_Density;
break;
case CommParams.FluxInstantValue: //瞬时流值
byteBlock.Read(out byte[] b_FluxInstantValue, 4);
base._stringChange.SwapBytes(ref b_FluxInstantValue);
float f_FluxInstantValue = BitConverter.ToSingle(b_FluxInstantValue, 0);
ValueIsNan(ref f_FluxInstantValue);
fluidInstant.instantFlow = (decimal)f_FluxInstantValue;
break;
case CommParams.FluxEyeableTotalValue: //累计流量值
byteBlock.Read(out byte[] b_FluxEyeableTotalValue, 4);
base._stringChange.SwapBytes(ref b_FluxEyeableTotalValue);
float f_FluxEyeableTotalValue = BitConverter.ToSingle(b_FluxEyeableTotalValue, 0);
ValueIsNan(ref f_FluxEyeableTotalValue);
fluidInstant.totalFlow = (decimal)f_FluxEyeableTotalValue;
break;
case CommParams.HeatInstantValue: //瞬时热量
byteBlock.Read(out byte[] b_HeatInstantValue, 4);
base._stringChange.SwapBytes(ref b_HeatInstantValue);
float f_HeatInstantValue = BitConverter.ToSingle(b_HeatInstantValue, 0);
ValueIsNan(ref f_HeatInstantValue);
fluidInstant.instantHeat = (decimal)f_HeatInstantValue;
break;
case CommParams.HeatToftalValue: //累计热量值
byteBlock.Read(out byte[] b_HeatToftalValue, 4);
base._stringChange.SwapBytes(ref b_HeatToftalValue);
float f_HeatToftalValue = BitConverter.ToSingle(b_HeatToftalValue, 0);
ValueIsNan(ref f_HeatToftalValue);
fluidInstant.totalHeat = (decimal)f_HeatToftalValue;
break;
case CommParams.CJSJ: //采集时间
byteBlock.Read(out byte[] b_CJSJ, 6);
string strDateTime = "20" + b_CJSJ[5].ToString("x2")
+ "-" + b_CJSJ[4].ToString("x2")
+ "-" + b_CJSJ[3].ToString("x2")
+ " " + b_CJSJ[2].ToString("x2")
+ ":" + b_CJSJ[1].ToString("x2")
+ ":" + b_CJSJ[0].ToString("x2");
fluidInstant.collectTime = Convert.ToDateTime(strDateTime);
break;
}
} while (byteBlock.Pos % bodyLength != 0);
fluidInstant.recordTime = DateTime.Now;
var serializeObject = JsonConvert.SerializeObject(fluidInstant);
_logger.Info($"第{i+1}个流体仪表{fluidInstant.monitorId}解析完成:{serializeObject}");
DateTime? lastCollectTime = GetLastCollectTime(fluidInstant.monitorId);
if (lastCollectTime != null && DateTime.Now -lastCollectTime < TimeSpan.FromMinutes(_appConfig.fluidTimeInterval))
{
//时间间隔小于采集间隔,不保存
continue;
}
// 如果字典中已有该键,则更新;否则新增
_lastCollectTimeDict[fluidInstant.monitorId] = DateTime.Now;
result.Add(fluidInstant);
}
if (result.Count > 0)
{
//是否开启 FF 异常值过滤
if (_appConfig.virtualFlag)
{
ParamVerification(ref result);
}
var inRes = _service.SplitInsert(result,out List<long> insertIds);
_logger.Info($"{result.Count}个流体数据解析处理完成,数据保存{(inRes ? "" : "")}");
}
else
{
_logger.Info($"{amount}个流体数据解析处理完成,没有需要保存的数据");
}
return FilterResult.Success;
}
catch (Exception e)
{
base._logger.Error($"流体数据解析异常:{e.Message}");
}
return FilterResult.Cache;
}
/// <summary>
/// FF FF参数过滤
/// </summary>
/// <param name="dnbInstants"></param>
/// <exception cref="ArgumentNullException"></exception>
private void ParamVerification(ref List<RecordFluidInstant> fluidInstants)
{
if (fluidInstants == null)
{
throw new ArgumentNullException($"过滤参数方法异常,传入参数为空");
}
for (int i = fluidInstants.Count - 1; i >= 0; i--)
{
var item = fluidInstants[i];
if (item.totalFlow == _appConfig.virtualValue)
{
_logger.Info($"MonitorId:{item.monitorId},累计流量为 FF FF FF FF已启用过滤不保存该表数据");
fluidInstants.RemoveAt(i);
continue;
}
}
}
private DateTime? GetLastCollectTime(string key)
{
// 检查键是否存在
if (_lastCollectTimeDict.TryGetValue(key, out DateTime collectTime))
{
return collectTime;
}
else
{
return null;
}
}
/// <summary>
/// 应答处理
/// </summary>
/// <param name="client"></param>
/// <param name="buffer"></param>
public override void ResponseHandle(ISocketClient client, byte[] buffer)
{
ResponsePack sendResponsePackInfo = new ResponsePack()
{
m_MessageType = 0xB4
};
base.GetMessagePack(ref sendResponsePackInfo, buffer);
base.SendMessageAsync(client, sendResponsePackInfo);
}
}
}