#region << 版 本 注 释 >> /*-------------------------------------------------------------------- * 版权所有 (c) 2025 WenJY 保留所有权利。 * CLR版本:4.0.30319.42000 * 机器名称:Mr.Wen's MacBook Pro * 命名空间:Sln.Imm.Daemon.Business * 唯一标识:A7D0F481-367C-4C8F-9E27-416639212E62 * * 创建者:WenJY * 电子邮箱: * 创建时间:2025-12-19 09:57:36 * 版本:V1.0.0 * 描述: * *-------------------------------------------------------------------- * 修改人: * 时间: * 修改说明: * * 版本:V1.0.0 *--------------------------------------------------------------------*/ #endregion << 版 本 注 释 >> using Newtonsoft.Json; using Sln.Imm.Daemon.Cache; using Sln.Imm.Daemon.Model.dao; using Sln.Imm.Daemon.Model.dto; using Sln.Imm.Daemon.Opc; using Sln.Imm.Daemon.Opc.Impl; using Sln.Imm.Daemon.Repository.service.@base; using Sln.Imm.Daemon.Serilog; namespace Sln.Imm.Daemon.Business; public class DeviceDataCollector : IDisposable { private readonly SerilogHelper _serilog; private readonly SemaphoreSlim _semaphore; // 并发控制:限制同时采集的设备数(避免资源耗尽) private CancellationTokenSource _cts; // 取消令牌:用于终止所有采集任务 private readonly int _collectIntervalMs; // 循环采集间隔(毫秒) private bool _isCollecting; // 是否正在采集(防止重复启动) private readonly BaseDeviceInfoCacheService _cacheService; private readonly IBaseService _paramValService; public DeviceDataCollector(SerilogHelper serilogHelper, BaseDeviceInfoCacheService cacheService, IBaseService paramValService, int maxConcurrentDevices = 15) { _serilog = serilogHelper; _cacheService = cacheService; _paramValService = paramValService; _semaphore = new SemaphoreSlim(maxConcurrentDevices, maxConcurrentDevices); _cts = new CancellationTokenSource(); _collectIntervalMs = 1000 * 60 * 1; _isCollecting = false; } #region 循环采集 /// /// 启动15设备并行循环采集 /// /// 15台设备列表 /// 循环次数(-1=无限) public async Task StartParallelLoopCollectAsync(int loopCount = -1) { if (_isCollecting) { Console.WriteLine($"[{DateTime.Now}] 并行采集已在运行,无需重复启动"); return; } _isCollecting = true; int currentLoop = 0; try { while (!_cts.Token.IsCancellationRequested) { // 达到指定循环次数退出 if (loopCount > 0 && currentLoop >= loopCount) { Console.WriteLine($"[{DateTime.Now}] 完成指定循环次数({loopCount}次),停止采集"); break; } currentLoop++; Console.WriteLine($"\n========== 第 {currentLoop} 轮并行采集开始 [{DateTime.Now}] =========="); // 记录本轮开始时间(保证10秒间隔精准) var roundStartTime = DateTime.Now; // 核心:并行采集15台设备(真正同时启动) var collectResult = await CollectDevicesInParallelAsync(); // 输出本轮结果 OutputCollectResult(collectResult); // 计算耗时,补足10秒间隔 var roundCost = (DateTime.Now - roundStartTime).TotalMilliseconds; var waitMs = _collectIntervalMs - roundCost; if (waitMs > 0) { Console.WriteLine($"\n第 {currentLoop} 轮采集完成(耗时{roundCost:F0}ms),等待{waitMs:F0}ms后下一轮"); await Task.Delay((int)waitMs, _cts.Token); } else { Console.WriteLine($"\n第 {currentLoop} 轮采集超时(耗时{roundCost:F0}ms),立即开始下一轮"); } } } catch (OperationCanceledException) { Console.WriteLine($"[{DateTime.Now}] 并行采集已手动终止"); } catch (Exception ex) { Console.WriteLine($"[{DateTime.Now}] 并行采集异常:{ex.Message}", ex); } finally { _isCollecting = false; Console.WriteLine($"[{DateTime.Now}] 并行采集循环结束"); } } /// /// 输出采集结果 /// private void OutputCollectResult(Dictionary collectResult) { Console.WriteLine($"\n[{DateTime.Now}] 本轮采集结果汇总:"); foreach (var kvp in collectResult) { var deviceAddress = kvp.Key; var result = kvp.Value; if (result is Exception ex) { Console.WriteLine($" {deviceAddress}:失败 - {ex.Message}"); } else { Console.WriteLine($" {deviceAddress}:成功 - {JsonConvert.SerializeObject(result)}"); } } } public async Task> CollectDevicesInParallelAsync() { var deviceInfos = await _cacheService.GetValueAsync("BaseDeviceInfoCache"); if (deviceInfos == null || deviceInfos.Count == 0) throw new ArgumentException("设备列表不能为空", nameof(deviceInfos)); var resultDict = new Dictionary(); var taskList = new List(); foreach (var item in deviceInfos) { // 每个设备创建独立的采集任务 taskList.Add(CollectSingleDeviceParallelAsync(item, resultDict, _cts.Token)); } // 等待所有任务完成(无论成功/失败) await Task.WhenAll(taskList); return resultDict; } private async Task CollectSingleDeviceParallelAsync(BaseDeviceInfo device, Dictionary resultDict, CancellationToken cancellationToken) { await _semaphore.WaitAsync(cancellationToken); List opcItemValues = null; IOpcService opcUa = null; try { if (device.deviceFacture.Contains("伊之密")) { opcUa =new OpcUaService(); } else { opcUa =new OpcDaService(); } Console.WriteLine($"[{DateTime.Now}] {device.deviceName} - 开始连接并采集"); bool connectResult = await opcUa.ConnectAsync(device.networkAddress); if (!connectResult) { resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 连接失败"); Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接失败"); return; } Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接成功"); // 2. 读取设备数据(你封装的ReadParam方法) //Console.WriteLine($"[{DateTime.Now}] 开始读取设备 {device.deviceName} 数据"); _serilog.Info($"开始采集{device.deviceName};"); opcItemValues = await ReadParam(device, opcUa); this.SaveParam(device, opcItemValues, out List paramValues); _serilog.Info($"{device.deviceName}数据采集完成:{JsonConvert.SerializeObject(opcItemValues)}"); //Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 数据读取完成"); // 3. 存储读取结果 resultDict[device.networkAddress] = opcItemValues; } catch (Exception ex) { // 捕获采集过程中的异常 resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 采集异常:{ex.Message}", ex); Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 采集异常:{ex.Message}"); } finally { // 无论成功/失败,都断开设备连接(释放资源) try { await opcUa.DisconnectAsync(); Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 已断开连接"); } catch (Exception ex) { Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 断开连接失败:{ex.Message}"); } // 释放信号量(允许下一个设备采集) _semaphore.Release(); } } #endregion /// /// 读取设备参数 /// /// public async Task> ReadParam(BaseDeviceInfo device,IOpcService opcUa) { try { if (device == null) { throw new ArgumentNullException($"设备信息不允许为空"); } List deviceParams = device.deviceParams.Select(x => x.paramAddr).ToList(); List infos = await opcUa.ReadNodeAsync(deviceParams); return infos; } catch (Exception e) { throw new InvalidOperationException($"设备参数读取异常:{e.Message}"); } } /// /// 保存设备参数值到数据库 /// /// 设备信息 /// OPC节点值列表 /// 输出参数值DTO列表 public void SaveParam(BaseDeviceInfo device, List opcItemValues, out List paramValues) { paramValues = new List(); try { foreach (OpcNode opcItem in opcItemValues) { BaseDeviceParamVal deviceParamVal = new BaseDeviceParamVal(); var paramInfo = device.deviceParams.Where(x => x.paramAddr == opcItem.NodeId).FirstOrDefault(); if (paramInfo != null) { deviceParamVal.paramCode = paramInfo.paramCode; deviceParamVal.paramName = paramInfo.paramName; } deviceParamVal.deviceCode = device.deviceCode; deviceParamVal.deviceId = device.objid; deviceParamVal.paramValue = opcItem.Value.ToString(); deviceParamVal.paramType = opcItem.DataType; deviceParamVal.collectTime = DateTime.Now; deviceParamVal.recordTime = DateTime.Now; paramValues.Add(deviceParamVal); } var isRes = _paramValService.Insert(paramValues); if (isRes) { _serilog.Info(($"{device.deviceName} 设备参数保存成功")); } else { _serilog.Info(($"{device.deviceName} 设备参数保存失败")); } }catch (Exception e) { throw new InvalidOperationException($"设备参数保存异常:{e.Message}"); } } /// /// 终止所有采集任务 /// public void CancelAllCollectTasks() { if (!_cts.IsCancellationRequested) { _cts.Cancel(); Console.WriteLine($"[{DateTime.Now}] 已触发采集任务终止"); } } public void Dispose() { throw new NotImplementedException(); } }