You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

276 lines
12 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#region << 版 本 注 释 >>
/*--------------------------------------------------------------------
* 版权所有 (c) 2025 WenJY 保留所有权利。
* CLR版本4.0.30319.42000
* 机器名称Mr.Wen's MacBook Pro
* 命名空间Sln.Iot.Business
* 唯一标识D5F7092D-7C5C-42B9-A373-1332CFD77169
*
* 创建者WenJY
* 电子邮箱:
* 创建时间2025-05-20 14:45:41
* 版本V1.0.0
* 描述:
*
*--------------------------------------------------------------------
* 修改人:
* 时间:
* 修改说明:
*
* 版本V1.0.0
*--------------------------------------------------------------------*/
#endregion << 版 本 注 释 >>
using System;
using System.Collections.Generic;
using System.Linq;
using Newtonsoft.Json;
using Sln.Iot.Business.@base;
using Sln.Iot.Common;
using Sln.Iot.Config;
using Sln.Iot.Model.dao;
using Sln.Iot.Model.dto;
using Sln.Iot.Repository.service;
using Sln.Iot.Serilog;
using Sln.Iot.Socket.Adapter;
using TouchSocket.Core;
using TouchSocket.Sockets;
namespace Sln.Iot.Business
{
public class FluidBusiness:BaseBusiness
{
private Dictionary<string ,DateTime> _lastCollectTimeDict = new Dictionary<string, DateTime>();
private readonly IRecordFluidInstantService? _service;
public FluidBusiness(SerilogHelper logger, AppConfig appConfig, StringChange stringChange, IRecordFluidInstantService? service) : base(logger, appConfig, stringChange)
{
_service = service;
}
public override FilterResult BufferAnalysis(ISocketClient client, BufferRequestInfo requestInfo, int bodyLength)
{
ByteBlock byteBlock = new ByteBlock(requestInfo.Body);
if (byteBlock.CanReadLen < 1)
{
return FilterResult.Cache;
}
int pos = byteBlock.Pos;
try
{
List<RecordFluidInstant> result = new List<RecordFluidInstant>();
var amount = requestInfo.BufferLength / bodyLength;
_logger.Info($"收到{amount}个流体数据,开始循环解析......");
for (int i = 0; i < amount; i++)
{
RecordFluidInstant fluidInstant = new RecordFluidInstant();
#region 表号解析
byteBlock.Read(out byte[] b_MeterID, 2);
var decParts = b_MeterID.Select(b => $"{b:D2}").ToArray();
var equipId = $"{requestInfo.ColletEquipCode}_{decParts[0]}{decParts[1]}";
#endregion
fluidInstant.monitorId = equipId;
do
{
byteBlock.Read(out byte[] b_UA_flag, 2);
base._stringChange.ConvertBytesToUInt16(b_UA_flag, out uint flag);
switch (flag)
{
case CommParams.Press: //压力值
byteBlock.Read(out byte[] b_Press, 4);
base._stringChange.SwapBytes(ref b_Press);
float f_Press = BitConverter.ToSingle(b_Press, 0);
ValueIsNan(ref f_Press);
fluidInstant.press = (decimal)f_Press;
break;
case CommParams.STemperature: //温度值
byteBlock.Read(out byte[] b_Temperature, 4);
base._stringChange.SwapBytes(ref b_Temperature);
float f_Temperature = BitConverter.ToSingle(b_Temperature, 0);
ValueIsNan(ref f_Temperature);
fluidInstant.temperature = (decimal)f_Temperature;
break;
case CommParams.Frequency: //频率值
byteBlock.Read(out byte[] b_Frequency, 4);
base._stringChange.SwapBytes(ref b_Frequency);
float f_Frequency = BitConverter.ToSingle(b_Frequency, 0);
ValueIsNan(ref f_Frequency);
fluidInstant.frequency = (decimal)f_Frequency;
break;
case CommParams.Density: //密度值
byteBlock.Read(out byte[] b_Density, 4);
base._stringChange.SwapBytes(ref b_Density);
float f_Density = BitConverter.ToSingle(b_Density, 0);
ValueIsNan(ref f_Density);
fluidInstant.density = (decimal)f_Density;
break;
case CommParams.FluxInstantValue: //瞬时流值
byteBlock.Read(out byte[] b_FluxInstantValue, 4);
base._stringChange.SwapBytes(ref b_FluxInstantValue);
float f_FluxInstantValue = BitConverter.ToSingle(b_FluxInstantValue, 0);
ValueIsNan(ref f_FluxInstantValue);
fluidInstant.instantFlow = (decimal)f_FluxInstantValue;
break;
case CommParams.FluxEyeableTotalValue: //累计流量值
byteBlock.Read(out byte[] b_FluxEyeableTotalValue, 4);
base._stringChange.SwapBytes(ref b_FluxEyeableTotalValue);
float f_FluxEyeableTotalValue = BitConverter.ToSingle(b_FluxEyeableTotalValue, 0);
ValueIsNan(ref f_FluxEyeableTotalValue);
fluidInstant.totalFlow = (decimal)f_FluxEyeableTotalValue;
break;
case CommParams.HeatInstantValue: //瞬时热量
byteBlock.Read(out byte[] b_HeatInstantValue, 4);
base._stringChange.SwapBytes(ref b_HeatInstantValue);
float f_HeatInstantValue = BitConverter.ToSingle(b_HeatInstantValue, 0);
ValueIsNan(ref f_HeatInstantValue);
fluidInstant.instantHeat = (decimal)f_HeatInstantValue;
break;
case CommParams.HeatToftalValue: //累计热量值
byteBlock.Read(out byte[] b_HeatToftalValue, 4);
base._stringChange.SwapBytes(ref b_HeatToftalValue);
float f_HeatToftalValue = BitConverter.ToSingle(b_HeatToftalValue, 0);
ValueIsNan(ref f_HeatToftalValue);
fluidInstant.totalHeat = (decimal)f_HeatToftalValue;
break;
case CommParams.CJSJ: //采集时间
byteBlock.Read(out byte[] b_CJSJ, 6);
string strDateTime = "20" + b_CJSJ[5].ToString("x2")
+ "-" + b_CJSJ[4].ToString("x2")
+ "-" + b_CJSJ[3].ToString("x2")
+ " " + b_CJSJ[2].ToString("x2")
+ ":" + b_CJSJ[1].ToString("x2")
+ ":" + b_CJSJ[0].ToString("x2");
fluidInstant.collectTime = Convert.ToDateTime(strDateTime);
break;
}
} while (byteBlock.Pos % bodyLength != 0);
fluidInstant.recordTime = DateTime.Now;
var serializeObject = JsonConvert.SerializeObject(fluidInstant);
_logger.Info($"第{i+1}个流体仪表{fluidInstant.monitorId}解析完成:{serializeObject}");
DateTime? lastCollectTime = GetLastCollectTime(fluidInstant.monitorId);
if (lastCollectTime != null && DateTime.Now -lastCollectTime < TimeSpan.FromMinutes(_appConfig.fluidTimeInterval))
{
//时间间隔小于采集间隔,不保存
continue;
}
// 如果字典中已有该键,则更新;否则新增
_lastCollectTimeDict[fluidInstant.monitorId] = DateTime.Now;
result.Add(fluidInstant);
}
if (result.Count > 0)
{
//是否开启 FF 异常值过滤
if (_appConfig.virtualFlag)
{
ParamVerification(ref result);
}
var inRes = _service.SplitInsert(result,out List<long> insertIds);
_logger.Info($"{result.Count}个流体数据解析处理完成,数据保存{(inRes ? "" : "")}");
}
else
{
_logger.Info($"{amount}个流体数据解析处理完成,没有需要保存的数据");
}
return FilterResult.Success;
}
catch (Exception e)
{
base._logger.Error($"流体数据解析异常:{e.Message}");
}
return FilterResult.Cache;
}
/// <summary>
/// FF FF参数过滤
/// </summary>
/// <param name="dnbInstants"></param>
/// <exception cref="ArgumentNullException"></exception>
private void ParamVerification(ref List<RecordFluidInstant> fluidInstants)
{
if (fluidInstants == null)
{
throw new ArgumentNullException($"过滤参数方法异常,传入参数为空");
}
for (int i = fluidInstants.Count - 1; i >= 0; i--)
{
var item = fluidInstants[i];
if (item.totalFlow == _appConfig.virtualValue)
{
_logger.Info($"MonitorId:{item.monitorId},累计流量为 FF FF FF FF已启用过滤不保存该表数据");
fluidInstants.RemoveAt(i);
continue;
}
}
}
private DateTime? GetLastCollectTime(string key)
{
// 检查键是否存在
if (_lastCollectTimeDict.TryGetValue(key, out DateTime collectTime))
{
return collectTime;
}
else
{
return null;
}
}
/// <summary>
/// 应答处理
/// </summary>
/// <param name="client"></param>
/// <param name="buffer"></param>
public override void ResponseHandle(ISocketClient client, byte[] buffer)
{
ResponsePack sendResponsePackInfo = new ResponsePack()
{
m_MessageType = 0xB4
};
base.GetMessagePack(ref sendResponsePackInfo, buffer);
base.SendMessageAsync(client, sendResponsePackInfo);
}
}
}