using log4net.Core; using Mesnac.DeviceAdapter.RFly_I160; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Highway.Assemble.EquipClient { using Highway.Assemble.common; using Mesnac.DeviceAdapterNet; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; public class DeviceHeartbeatManager : IDisposable { private readonly ConcurrentDictionary _deviceTokens = new ConcurrentDictionary(); private List _deviceIds; private readonly SemaphoreSlim _concurrencySemaphore; private readonly int _heartbeatIntervalMs; private readonly int _timeoutMs; private readonly ILogger _logger; private readonly Func _createHeartbeatMessage; private readonly Func> _sendMessageAsync; private readonly Action _handleResult; private Timer _heartbeatTimer; private bool _isDisposed; public DeviceHeartbeatManager( int maxConcurrentDevices = 10, int heartbeatIntervalMs = 2000, int timeoutMs = 2000, ILogger logger = null, Func createHeartbeatMessage = null, Func> sendMessageAsync = null, Action handleResult = null) { _concurrencySemaphore = new SemaphoreSlim(maxConcurrentDevices); _heartbeatIntervalMs = heartbeatIntervalMs; _timeoutMs = timeoutMs; _logger = logger ?? new NullLogger(); _createHeartbeatMessage = createHeartbeatMessage ?? CreateDefaultHeartbeatMessage; _sendMessageAsync = sendMessageAsync ?? ((id, msg) => Task.FromResult(false)); _handleResult = handleResult ?? ((id, result) => { }); } public void StartMonitoring(List deviceIds) { if (_heartbeatTimer != null) throw new InvalidOperationException("Heartbeat manager is already running."); _deviceIds = deviceIds; // 创建定时器,使用固定间隔触发心跳周期 _heartbeatTimer = new Timer( callback: ExecuteHeartbeatCycle, state: null, dueTime: 0, period: _heartbeatIntervalMs); } public void StopMonitoring() { if (_heartbeatTimer == null) return; _heartbeatTimer.Dispose(); _heartbeatTimer = null; // 取消所有正在执行的设备任务 foreach (var cts in _deviceTokens.Values) { cts.Cancel(); cts.Dispose(); } _deviceTokens.Clear(); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (_isDisposed) return; if (disposing) { StopMonitoring(); _concurrencySemaphore.Dispose(); } _isDisposed = true; } private async void ExecuteHeartbeatCycle(object state) { try { var cycleStartTime = DateTime.UtcNow; _logger.Debug($"Starting heartbeat cycle at {cycleStartTime:O}"); // 为每个设备创建独立的取消令牌 var deviceTasks = new List(); foreach (var deviceId in _deviceIds) { var cts = new CancellationTokenSource(); _deviceTokens.AddOrUpdate(deviceId.m_iDeviceId.ToString(), cts, (key, oldValue) => { oldValue.Dispose(); return cts; }); deviceTasks.Add(SendHeartbeatAsync(deviceId.m_iDeviceId.ToString(), cts.Token)); } // 并发执行所有设备的心跳任务 await Task.WhenAll(deviceTasks); var cycleDuration = (DateTime.UtcNow - cycleStartTime).TotalMilliseconds; _logger.Debug($"Heartbeat cycle completed in {cycleDuration:F2}ms"); } catch (OperationCanceledException) { _logger.Info("Heartbeat cycle was canceled."); } catch (Exception ex) { _logger.Error($"Exception in heartbeat cycle: {ex.Message}"); } } private async Task SendHeartbeatAsync(string deviceId, CancellationToken cancellationToken) { await _concurrencySemaphore.WaitAsync(cancellationToken); try { var startTime = DateTime.UtcNow; _logger.Debug($"Sending heartbeat to device {deviceId} at {startTime:O}"); var message = _createHeartbeatMessage(deviceId); var result = await SendWithTimeoutAsync(deviceId, message, cancellationToken); _handleResult(deviceId, result); var elapsed = (DateTime.UtcNow - startTime).TotalMilliseconds; _logger.Debug($"Heartbeat to device {deviceId} processed in {elapsed:F2}ms"); } catch (OperationCanceledException) { _handleResult(deviceId, 2); // 操作取消 } catch (Exception ex) { _logger.Error($"Exception sending heartbeat to device {deviceId}: {ex.Message}"); _handleResult(deviceId, 2); // 发生异常 } finally { _concurrencySemaphore.Release(); // 清理完成的任务令牌 if (_deviceTokens.TryGetValue(deviceId, out var cts) && !cts.IsCancellationRequested) { cts.Dispose(); _deviceTokens.TryRemove(deviceId, out _); } } } private async Task SendWithTimeoutAsync(string deviceId, MessagePack message, CancellationToken cancellationToken) { using (var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { timeoutCts.CancelAfter(_timeoutMs); try { var sendTask = _sendMessageAsync(deviceId, message); var completedTask = await Task.WhenAny(sendTask, Task.Delay(Timeout.Infinite, timeoutCts.Token)); if (completedTask == sendTask) { return await sendTask ? (byte)1 : (byte)2; } _logger.Warn($"Heartbeat to device {deviceId} timed out after {_timeoutMs}ms"); return 0; // 超时 } catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) { _logger.Warn($"Heartbeat to device {deviceId} timed out after {_timeoutMs}ms"); return 0; // 超时 } } } private MessagePack CreateDefaultHeartbeatMessage(string deviceId) { return new MessagePack { m_pData = new byte[9] { 0xAA, 0x55, 0x00, 0x90, 0x90, 0x0D, 0x00, 0x00, 0x0D } }; } } // 简单的日志接口实现 public interface ILogger { void Debug(string message); void Info(string message); void Warn(string message); void Error(string message); } public class NullLogger : ILogger { public void Debug(string message) { } public void Info(string message) { } public void Warn(string message) { } public void Error(string message) { } } // 消息包结构 public class MessagePack { public byte[] m_pData { get; set; } } }