diff --git a/Sln.Imm.Daemon.Business/DeviceDataCollector.cs b/Sln.Imm.Daemon.Business/DeviceDataCollector.cs new file mode 100644 index 0000000..7145712 --- /dev/null +++ b/Sln.Imm.Daemon.Business/DeviceDataCollector.cs @@ -0,0 +1,343 @@ +#region << 版 本 注 释 >> + +/*-------------------------------------------------------------------- +* 版权所有 (c) 2025 WenJY 保留所有权利。 +* CLR版本:4.0.30319.42000 +* 机器名称:Mr.Wen's MacBook Pro +* 命名空间:Sln.Imm.Daemon.Business +* 唯一标识:A7D0F481-367C-4C8F-9E27-416639212E62 +* +* 创建者:WenJY +* 电子邮箱: +* 创建时间:2025-12-19 09:57:36 +* 版本:V1.0.0 +* 描述: +* +*-------------------------------------------------------------------- +* 修改人: +* 时间: +* 修改说明: +* +* 版本:V1.0.0 +*--------------------------------------------------------------------*/ + +#endregion << 版 本 注 释 >> + +using Newtonsoft.Json; +using Sln.Imm.Daemon.Cache; +using Sln.Imm.Daemon.Model.dao; +using Sln.Imm.Daemon.Model.dto; +using Sln.Imm.Daemon.Opc; +using Sln.Imm.Daemon.Opc.Impl; +using Sln.Imm.Daemon.Repository.service.@base; +using Sln.Imm.Daemon.Serilog; + +namespace Sln.Imm.Daemon.Business; + +public class DeviceDataCollector : IDisposable +{ + + private readonly SerilogHelper _serilog; + + private readonly SemaphoreSlim _semaphore; // 并发控制:限制同时采集的设备数(避免资源耗尽) + + private CancellationTokenSource _cts; // 取消令牌:用于终止所有采集任务 + + private readonly int _collectIntervalMs; // 循环采集间隔(毫秒) + + private bool _isCollecting; // 是否正在采集(防止重复启动) + + private readonly BaseDeviceInfoCacheService _cacheService; + + private readonly IBaseService _paramValService; + + public DeviceDataCollector(SerilogHelper serilogHelper, + BaseDeviceInfoCacheService cacheService, + IBaseService paramValService, int maxConcurrentDevices = 15) + { + _serilog = serilogHelper; + _cacheService = cacheService; + _paramValService = paramValService; + + _semaphore = new SemaphoreSlim(maxConcurrentDevices, maxConcurrentDevices); + + _cts = new CancellationTokenSource(); + _collectIntervalMs = 1000 * 60 * 1; + _isCollecting = false; + } + + #region 循环采集 + + /// + /// 启动15设备并行循环采集 + /// + /// 15台设备列表 + /// 循环次数(-1=无限) + public async Task StartParallelLoopCollectAsync(int loopCount = -1) + { + if (_isCollecting) + { + Console.WriteLine($"[{DateTime.Now}] 并行采集已在运行,无需重复启动"); + return; + } + + _isCollecting = true; + int currentLoop = 0; + + try + { + while (!_cts.Token.IsCancellationRequested) + { + // 达到指定循环次数退出 + if (loopCount > 0 && currentLoop >= loopCount) + { + Console.WriteLine($"[{DateTime.Now}] 完成指定循环次数({loopCount}次),停止采集"); + break; + } + + currentLoop++; + Console.WriteLine($"\n========== 第 {currentLoop} 轮并行采集开始 [{DateTime.Now}] =========="); + + // 记录本轮开始时间(保证10秒间隔精准) + var roundStartTime = DateTime.Now; + + // 核心:并行采集15台设备(真正同时启动) + var collectResult = await CollectDevicesInParallelAsync(); + + // 输出本轮结果 + OutputCollectResult(collectResult); + + // 计算耗时,补足10秒间隔 + var roundCost = (DateTime.Now - roundStartTime).TotalMilliseconds; + var waitMs = _collectIntervalMs - roundCost; + + if (waitMs > 0) + { + Console.WriteLine($"\n第 {currentLoop} 轮采集完成(耗时{roundCost:F0}ms),等待{waitMs:F0}ms后下一轮"); + await Task.Delay((int)waitMs, _cts.Token); + } + else + { + Console.WriteLine($"\n第 {currentLoop} 轮采集超时(耗时{roundCost:F0}ms),立即开始下一轮"); + } + } + } + catch (OperationCanceledException) + { + Console.WriteLine($"[{DateTime.Now}] 并行采集已手动终止"); + } + catch (Exception ex) + { + Console.WriteLine($"[{DateTime.Now}] 并行采集异常:{ex.Message}", ex); + } + finally + { + _isCollecting = false; + Console.WriteLine($"[{DateTime.Now}] 并行采集循环结束"); + } + } + + /// + /// 输出采集结果 + /// + private void OutputCollectResult(Dictionary collectResult) + { + Console.WriteLine($"\n[{DateTime.Now}] 本轮采集结果汇总:"); + foreach (var kvp in collectResult) + { + var deviceAddress = kvp.Key; + var result = kvp.Value; + + if (result is Exception ex) + { + Console.WriteLine($" {deviceAddress}:失败 - {ex.Message}"); + } + else + { + Console.WriteLine($" {deviceAddress}:成功 - {JsonConvert.SerializeObject(result)}"); + } + } + } + + + public async Task> CollectDevicesInParallelAsync() + { + var deviceInfos = await _cacheService.GetValueAsync("BaseDeviceInfoCache"); + + if (deviceInfos == null || deviceInfos.Count == 0) + throw new ArgumentException("设备列表不能为空", nameof(deviceInfos)); + + var resultDict = new Dictionary(); + var taskList = new List(); + + foreach (var item in deviceInfos) + { + // 每个设备创建独立的采集任务 + taskList.Add(CollectSingleDeviceParallelAsync(item, resultDict, _cts.Token)); + } + + // 等待所有任务完成(无论成功/失败) + await Task.WhenAll(taskList); + + return resultDict; + } + + private async Task CollectSingleDeviceParallelAsync(BaseDeviceInfo device, Dictionary resultDict, + CancellationToken cancellationToken) + { + await _semaphore.WaitAsync(cancellationToken); + List opcItemValues = null; + + IOpcService opcUa = null; + try + { + if (device.deviceFacture.Contains("伊之密")) + { + opcUa =new OpcUaService(); + } + else + { + opcUa =new OpcDaService(); + } + + Console.WriteLine($"[{DateTime.Now}] {device.deviceName} - 开始连接并采集"); + bool connectResult = await opcUa.ConnectAsync(device.networkAddress); + if (!connectResult) + { + resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 连接失败"); + Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接失败"); + return; + } + Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 连接成功"); + + // 2. 读取设备数据(你封装的ReadParam方法) + //Console.WriteLine($"[{DateTime.Now}] 开始读取设备 {device.deviceName} 数据"); + _serilog.Info($"开始采集{device.deviceName};"); + opcItemValues = await ReadParam(device, opcUa); + + this.SaveParam(device, opcItemValues, out List paramValues); + + _serilog.Info($"{device.deviceName}数据采集完成:{JsonConvert.SerializeObject(opcItemValues)}"); + + //Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 数据读取完成"); + + // 3. 存储读取结果 + resultDict[device.networkAddress] = opcItemValues; + } + catch (Exception ex) + { + // 捕获采集过程中的异常 + resultDict[device.networkAddress] = new Exception($"设备 {device.deviceName} 采集异常:{ex.Message}", ex); + Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 采集异常:{ex.Message}"); + } + finally + { + // 无论成功/失败,都断开设备连接(释放资源) + try + { + await opcUa.DisconnectAsync(); + Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 已断开连接"); + } + catch (Exception ex) + { + Console.WriteLine($"[{DateTime.Now}] 设备 {device.deviceName} 断开连接失败:{ex.Message}"); + } + + // 释放信号量(允许下一个设备采集) + _semaphore.Release(); + } + } + + #endregion + + /// + /// 读取设备参数 + /// + /// + public async Task> ReadParam(BaseDeviceInfo device,IOpcService opcUa) + { + try + { + if (device == null) + { + throw new ArgumentNullException($"设备信息不允许为空"); + } + + List deviceParams = device.deviceParams.Select(x => x.paramAddr).ToList(); + + List infos = await opcUa.ReadNodeAsync(deviceParams); + + return infos; + + } + catch (Exception e) + { + throw new InvalidOperationException($"设备参数读取异常:{e.Message}"); + } + } + + /// + /// 保存设备参数值到数据库 + /// + /// 设备信息 + /// OPC节点值列表 + /// 输出参数值DTO列表 + public void SaveParam(BaseDeviceInfo device, List opcItemValues, + out List paramValues) + { + paramValues = new List(); + try + { + foreach (OpcNode opcItem in opcItemValues) + { + BaseDeviceParamVal deviceParamVal = new BaseDeviceParamVal(); + + var paramInfo = device.deviceParams.Where(x => x.paramAddr == opcItem.NodeId).FirstOrDefault(); + if (paramInfo != null) + { + deviceParamVal.paramCode = paramInfo.paramCode; + deviceParamVal.paramName = paramInfo.paramName; + } + deviceParamVal.deviceCode = device.deviceCode; + deviceParamVal.deviceId = device.objid; + deviceParamVal.paramValue = opcItem.Value.ToString(); + deviceParamVal.paramType = opcItem.DataType; + deviceParamVal.collectTime = DateTime.Now; + deviceParamVal.recordTime = DateTime.Now; + + paramValues.Add(deviceParamVal); + } + + var isRes = _paramValService.Insert(paramValues); + if (isRes) + { + _serilog.Info(($"{device.deviceName} 设备参数保存成功")); + } + else + { + _serilog.Info(($"{device.deviceName} 设备参数保存失败")); + } + }catch (Exception e) + { + throw new InvalidOperationException($"设备参数保存异常:{e.Message}"); + } + + } + + /// + /// 终止所有采集任务 + /// + public void CancelAllCollectTasks() + { + if (!_cts.IsCancellationRequested) + { + _cts.Cancel(); + Console.WriteLine($"[{DateTime.Now}] 已触发采集任务终止"); + } + } + + public void Dispose() + { + throw new NotImplementedException(); + } +} \ No newline at end of file diff --git a/Sln.Imm.Daemon/Program.cs b/Sln.Imm.Daemon/Program.cs index 9e72876..8f1d050 100644 --- a/Sln.Imm.Daemon/Program.cs +++ b/Sln.Imm.Daemon/Program.cs @@ -47,9 +47,9 @@ namespace Sln.Imm.Daemon }; - var deviceCollectionBusiness = ServiceProvider.GetService(); + var deviceCollectionBusiness = ServiceProvider.GetService(); - deviceCollectionBusiness?.Handle(); + deviceCollectionBusiness?.StartParallelLoopCollectAsync(); @@ -99,7 +99,6 @@ namespace Sln.Imm.Daemon Assembly.LoadFrom("Sln.Imm.Daemon.Cache.dll"), Assembly.LoadFrom("Sln.Imm.Daemon.Business.dll"), Assembly.LoadFrom("Sln.Imm.Daemon.Opc.dll"), - // Assembly.LoadFrom("Sln.Iot.Business.dll"), }; services.Scan(scan => scan.FromAssemblies(assemblies) @@ -122,7 +121,7 @@ namespace Sln.Imm.Daemon CachePath = "D:\\working_area\\project\\澳柯玛注塑采集\\cache\\FusionCache.db" })); - services.AddOpcDeviceFactorySetup(); + //services.AddOpcDeviceFactorySetup(); } } } \ No newline at end of file