change - 采集逻辑修改

master
WenJY 5 days ago
parent eacc913d47
commit 7d611f471c

@ -0,0 +1,343 @@
#region << 版 本 注 释 >>
/*--------------------------------------------------------------------
* (c) 2025 WenJY
* CLR4.0.30319.42000
* Mr.Wen's MacBook Pro
* Sln.Imm.Daemon.Business
* A7D0F481-367C-4C8F-9E27-416639212E62
*
* WenJY
*
* 2025-12-19 09:57:36
* V1.0.0
*
*
*--------------------------------------------------------------------
*
*
*
*
* V1.0.0
*--------------------------------------------------------------------*/
#endregion << 版 本 注 释 >>
using Newtonsoft.Json;
using Sln.Imm.Daemon.Cache;
using Sln.Imm.Daemon.Model.dao;
using Sln.Imm.Daemon.Model.dto;
using Sln.Imm.Daemon.Opc;
using Sln.Imm.Daemon.Opc.Impl;
using Sln.Imm.Daemon.Repository.service.@base;
using Sln.Imm.Daemon.Serilog;
namespace Sln.Imm.Daemon.Business;
public class DeviceDataCollector : IDisposable
{
private readonly SerilogHelper _serilog;
private readonly SemaphoreSlim _semaphore; // 并发控制:限制同时采集的设备数(避免资源耗尽)
private CancellationTokenSource _cts; // 取消令牌:用于终止所有采集任务
private readonly int _collectIntervalMs; // 循环采集间隔(毫秒)
private bool _isCollecting; // 是否正在采集(防止重复启动)
private readonly BaseDeviceInfoCacheService _cacheService;
private readonly IBaseService<BaseDeviceParamVal> _paramValService;
public DeviceDataCollector(SerilogHelper serilogHelper,
BaseDeviceInfoCacheService cacheService,
IBaseService<BaseDeviceParamVal> paramValService, int maxConcurrentDevices = 15)
{
_serilog = serilogHelper;
_cacheService = cacheService;
_paramValService = paramValService;
_semaphore = new SemaphoreSlim(maxConcurrentDevices, maxConcurrentDevices);
_cts = new CancellationTokenSource();
_collectIntervalMs = 1000 * 60 * 1;
_isCollecting = false;
}
#region 循环采集
/// <summary>
/// 启动15设备并行循环采集
/// </summary>
/// <param name="devices">15台设备列表</param>
/// <param name="loopCount">循环次数(-1=无限)</param>
public async Task StartParallelLoopCollectAsync(int loopCount = -1)
{
if (_isCollecting)
{
Console.WriteLine($"[{DateTime.Now}] 并行采集已在运行,无需重复启动");
return;
}
_isCollecting = true;
int currentLoop = 0;
try
{
while (!_cts.Token.IsCancellationRequested)
{
// 达到指定循环次数退出
if (loopCount > 0 && currentLoop >= loopCount)
{
Console.WriteLine($"[{DateTime.Now}] 完成指定循环次数({loopCount}次),停止采集");
break;
}
currentLoop++;
Console.WriteLine($"\n========== 第 {currentLoop} 轮并行采集开始 [{DateTime.Now}] ==========");
// 记录本轮开始时间保证10秒间隔精准
var roundStartTime = DateTime.Now;
// 核心并行采集15台设备真正同时启动
var collectResult = await CollectDevicesInParallelAsync();
// 输出本轮结果
OutputCollectResult(collectResult);
// 计算耗时补足10秒间隔
var roundCost = (DateTime.Now - roundStartTime).TotalMilliseconds;
var waitMs = _collectIntervalMs - roundCost;
if (waitMs > 0)
{
Console.WriteLine($"\n第 {currentLoop} 轮采集完成(耗时{roundCost:F0}ms等待{waitMs:F0}ms后下一轮");
await Task.Delay((int)waitMs, _cts.Token);
}
else
{
Console.WriteLine($"\n第 {currentLoop} 轮采集超时(耗时{roundCost:F0}ms立即开始下一轮");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"[{DateTime.Now}] 并行采集已手动终止");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.Now}] 并行采集异常:{ex.Message}", ex);
}
finally
{
_isCollecting = false;
Console.WriteLine($"[{DateTime.Now}] 并行采集循环结束");
}
}
/// <summary>
/// 输出采集结果
/// </summary>
private void OutputCollectResult(Dictionary<string, object> collectResult)
{
Console.WriteLine($"\n[{DateTime.Now}] 本轮采集结果汇总:");
foreach (var kvp in collectResult)
{
var deviceAddress = kvp.Key;
var result = kvp.Value;
if (result is Exception ex)
{
Console.WriteLine($" {deviceAddress}:失败 - {ex.Message}");
}
else
{
Console.WriteLine($" {deviceAddress}:成功 - {JsonConvert.SerializeObject(result)}");
}
}
}
public async Task<Dictionary<string, object>> CollectDevicesInParallelAsync()
{
var deviceInfos = await _cacheService.GetValueAsync("BaseDeviceInfoCache");
if (deviceInfos == null || deviceInfos.Count == 0)
throw new ArgumentException("设备列表不能为空", nameof(deviceInfos));
var resultDict = new Dictionary<string, object>();
var taskList = new List<Task>();
foreach (var item in deviceInfos)
{
// 每个设备创建独立的采集任务
taskList.Add(CollectSingleDeviceParallelAsync(item, resultDict, _cts.Token));
}
// 等待所有任务完成(无论成功/失败)
await Task.WhenAll(taskList);
return resultDict;
}
private async Task CollectSingleDeviceParallelAsync(BaseDeviceInfo device, Dictionary<string, object> resultDict,
CancellationToken cancellationToken)
{
await _semaphore.WaitAsync(cancellationToken);
List<OpcNode> opcItemValues = null;
IOpcService opcUa = null;
try
{
if (device.deviceFacture.Contains("伊之密"))
{
opcUa =new OpcUaService();
}
else
{
opcUa =new OpcDaService();
}
Console.WriteLine($"[{DateTime.Now}] {device.deviceName} - 开始连接并采集");
bool connectResult = await opcUa.ConnectAsync(device.networkAddress);
if (!connectResult)
{
resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 连接失败");
Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接失败");
return;
}
Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接成功");
// 2. 读取设备数据你封装的ReadParam方法
//Console.WriteLine($"[{DateTime.Now}] 开始读取设备 {device.deviceName} 数据");
_serilog.Info($"开始采集{device.deviceName}");
opcItemValues = await ReadParam(device, opcUa);
this.SaveParam(device, opcItemValues, out List<BaseDeviceParamVal> paramValues);
_serilog.Info($"{device.deviceName}数据采集完成:{JsonConvert.SerializeObject(opcItemValues)}");
//Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 数据读取完成");
// 3. 存储读取结果
resultDict[device.networkAddress] = opcItemValues;
}
catch (Exception ex)
{
// 捕获采集过程中的异常
resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 采集异常:{ex.Message}", ex);
Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 采集异常:{ex.Message}");
}
finally
{
// 无论成功/失败,都断开设备连接(释放资源)
try
{
await opcUa.DisconnectAsync();
Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 已断开连接");
}
catch (Exception ex)
{
Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 断开连接失败:{ex.Message}");
}
// 释放信号量(允许下一个设备采集)
_semaphore.Release();
}
}
#endregion
/// <summary>
/// 读取设备参数
/// </summary>
/// <param name="device"></param>
public async Task<List<OpcNode>> ReadParam(BaseDeviceInfo device,IOpcService opcUa)
{
try
{
if (device == null)
{
throw new ArgumentNullException($"设备信息不允许为空");
}
List<string> deviceParams = device.deviceParams.Select(x => x.paramAddr).ToList();
List<OpcNode> infos = await opcUa.ReadNodeAsync(deviceParams);
return infos;
}
catch (Exception e)
{
throw new InvalidOperationException($"设备参数读取异常:{e.Message}");
}
}
/// <summary>
/// 保存设备参数值到数据库
/// </summary>
/// <param name="device">设备信息</param>
/// <param name="opcItemValues">OPC节点值列表</param>
/// <param name="paramValues">输出参数值DTO列表</param>
public void SaveParam(BaseDeviceInfo device, List<OpcNode> opcItemValues,
out List<BaseDeviceParamVal> paramValues)
{
paramValues = new List<BaseDeviceParamVal>();
try
{
foreach (OpcNode opcItem in opcItemValues)
{
BaseDeviceParamVal deviceParamVal = new BaseDeviceParamVal();
var paramInfo = device.deviceParams.Where(x => x.paramAddr == opcItem.NodeId).FirstOrDefault();
if (paramInfo != null)
{
deviceParamVal.paramCode = paramInfo.paramCode;
deviceParamVal.paramName = paramInfo.paramName;
}
deviceParamVal.deviceCode = device.deviceCode;
deviceParamVal.deviceId = device.objid;
deviceParamVal.paramValue = opcItem.Value.ToString();
deviceParamVal.paramType = opcItem.DataType;
deviceParamVal.collectTime = DateTime.Now;
deviceParamVal.recordTime = DateTime.Now;
paramValues.Add(deviceParamVal);
}
var isRes = _paramValService.Insert(paramValues);
if (isRes)
{
_serilog.Info(($"{device.deviceName} 设备参数保存成功"));
}
else
{
_serilog.Info(($"{device.deviceName} 设备参数保存失败"));
}
}catch (Exception e)
{
throw new InvalidOperationException($"设备参数保存异常:{e.Message}");
}
}
/// <summary>
/// 终止所有采集任务
/// </summary>
public void CancelAllCollectTasks()
{
if (!_cts.IsCancellationRequested)
{
_cts.Cancel();
Console.WriteLine($"[{DateTime.Now}] 已触发采集任务终止");
}
}
public void Dispose()
{
throw new NotImplementedException();
}
}

@ -47,9 +47,9 @@ namespace Sln.Imm.Daemon
};
var deviceCollectionBusiness = ServiceProvider.GetService<DeviceCollectionBusiness>();
var deviceCollectionBusiness = ServiceProvider.GetService<DeviceDataCollector>();
deviceCollectionBusiness?.Handle();
deviceCollectionBusiness?.StartParallelLoopCollectAsync();
@ -99,7 +99,6 @@ namespace Sln.Imm.Daemon
Assembly.LoadFrom("Sln.Imm.Daemon.Cache.dll"),
Assembly.LoadFrom("Sln.Imm.Daemon.Business.dll"),
Assembly.LoadFrom("Sln.Imm.Daemon.Opc.dll"),
// Assembly.LoadFrom("Sln.Iot.Business.dll"),
};
services.Scan(scan => scan.FromAssemblies(assemblies)
@ -122,7 +121,7 @@ namespace Sln.Imm.Daemon
CachePath = "D:\\working_area\\project\\澳柯玛注塑采集\\cache\\FusionCache.db"
}));
services.AddOpcDeviceFactorySetup();
//services.AddOpcDeviceFactorySetup();
}
}
}
Loading…
Cancel
Save