You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
8.2 KiB
C#

using log4net.Core;
using Mesnac.DeviceAdapter.RFly_I160;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Highway.Assemble.EquipClient
{
using Highway.Assemble.common;
using Mesnac.DeviceAdapterNet;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class DeviceHeartbeatManager : IDisposable
{
private readonly ConcurrentDictionary<string, CancellationTokenSource> _deviceTokens =
new ConcurrentDictionary<string, CancellationTokenSource>();
private List<DeviceInfo> _deviceIds;
private readonly SemaphoreSlim _concurrencySemaphore;
private readonly int _heartbeatIntervalMs;
private readonly int _timeoutMs;
private readonly ILogger _logger;
private readonly Func<string, MessagePack> _createHeartbeatMessage;
private readonly Func<string, MessagePack, Task<bool>> _sendMessageAsync;
private readonly Action<string, byte> _handleResult;
private Timer _heartbeatTimer;
private bool _isDisposed;
public DeviceHeartbeatManager(
int maxConcurrentDevices = 10,
int heartbeatIntervalMs = 2000,
int timeoutMs = 2000,
ILogger logger = null,
Func<string, MessagePack> createHeartbeatMessage = null,
Func<string, MessagePack, Task<bool>> sendMessageAsync = null,
Action<string, byte> handleResult = null)
{
_concurrencySemaphore = new SemaphoreSlim(maxConcurrentDevices);
_heartbeatIntervalMs = heartbeatIntervalMs;
_timeoutMs = timeoutMs;
_logger = logger ?? new NullLogger();
_createHeartbeatMessage = createHeartbeatMessage ?? CreateDefaultHeartbeatMessage;
_sendMessageAsync = sendMessageAsync ?? ((id, msg) => Task.FromResult(false));
_handleResult = handleResult ?? ((id, result) => { });
}
public void StartMonitoring(List<DeviceInfo> deviceIds)
{
if (_heartbeatTimer != null)
throw new InvalidOperationException("Heartbeat manager is already running.");
_deviceIds = deviceIds;
// 创建定时器,使用固定间隔触发心跳周期
_heartbeatTimer = new Timer(
callback: ExecuteHeartbeatCycle,
state: null,
dueTime: 0,
period: _heartbeatIntervalMs);
}
public void StopMonitoring()
{
if (_heartbeatTimer == null)
return;
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
// 取消所有正在执行的设备任务
foreach (var cts in _deviceTokens.Values)
{
cts.Cancel();
cts.Dispose();
}
_deviceTokens.Clear();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_isDisposed)
return;
if (disposing)
{
StopMonitoring();
_concurrencySemaphore.Dispose();
}
_isDisposed = true;
}
private async void ExecuteHeartbeatCycle(object state)
{
try
{
var cycleStartTime = DateTime.UtcNow;
_logger.Debug($"Starting heartbeat cycle at {cycleStartTime:O}");
// 为每个设备创建独立的取消令牌
var deviceTasks = new List<Task>();
foreach (var deviceId in _deviceIds)
{
var cts = new CancellationTokenSource();
_deviceTokens.AddOrUpdate(deviceId.m_iDeviceId.ToString(), cts, (key, oldValue) => { oldValue.Dispose(); return cts; });
deviceTasks.Add(SendHeartbeatAsync(deviceId.m_iDeviceId.ToString(), cts.Token));
}
// 并发执行所有设备的心跳任务
await Task.WhenAll(deviceTasks);
var cycleDuration = (DateTime.UtcNow - cycleStartTime).TotalMilliseconds;
_logger.Debug($"Heartbeat cycle completed in {cycleDuration:F2}ms");
}
catch (OperationCanceledException)
{
_logger.Info("Heartbeat cycle was canceled.");
}
catch (Exception ex)
{
_logger.Error($"Exception in heartbeat cycle: {ex.Message}");
}
}
private async Task SendHeartbeatAsync(string deviceId, CancellationToken cancellationToken)
{
await _concurrencySemaphore.WaitAsync(cancellationToken);
try
{
var startTime = DateTime.UtcNow;
_logger.Debug($"Sending heartbeat to device {deviceId} at {startTime:O}");
var message = _createHeartbeatMessage(deviceId);
var result = await SendWithTimeoutAsync(deviceId, message, cancellationToken);
_handleResult(deviceId, result);
var elapsed = (DateTime.UtcNow - startTime).TotalMilliseconds;
_logger.Debug($"Heartbeat to device {deviceId} processed in {elapsed:F2}ms");
}
catch (OperationCanceledException)
{
_handleResult(deviceId, 2); // 操作取消
}
catch (Exception ex)
{
_logger.Error($"Exception sending heartbeat to device {deviceId}: {ex.Message}");
_handleResult(deviceId, 2); // 发生异常
}
finally
{
_concurrencySemaphore.Release();
// 清理完成的任务令牌
if (_deviceTokens.TryGetValue(deviceId, out var cts) && !cts.IsCancellationRequested)
{
cts.Dispose();
_deviceTokens.TryRemove(deviceId, out _);
}
}
}
private async Task<byte> SendWithTimeoutAsync(string deviceId, MessagePack message, CancellationToken cancellationToken)
{
using (var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
timeoutCts.CancelAfter(_timeoutMs);
try
{
var sendTask = _sendMessageAsync(deviceId, message);
var completedTask = await Task.WhenAny(sendTask, Task.Delay(Timeout.Infinite, timeoutCts.Token));
if (completedTask == sendTask)
{
return await sendTask ? (byte)1 : (byte)2;
}
_logger.Warn($"Heartbeat to device {deviceId} timed out after {_timeoutMs}ms");
return 0; // 超时
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)
{
_logger.Warn($"Heartbeat to device {deviceId} timed out after {_timeoutMs}ms");
return 0; // 超时
}
}
}
private MessagePack CreateDefaultHeartbeatMessage(string deviceId)
{
return new MessagePack
{
m_pData = new byte[9] { 0xAA, 0x55, 0x00, 0x90, 0x90, 0x0D, 0x00, 0x00, 0x0D }
};
}
}
// 简单的日志接口实现
public interface ILogger
{
void Debug(string message);
void Info(string message);
void Warn(string message);
void Error(string message);
}
public class NullLogger : ILogger
{
public void Debug(string message) { }
public void Info(string message) { }
public void Warn(string message) { }
public void Error(string message) { }
}
// 消息包结构
public class MessagePack
{
public byte[] m_pData { get; set; }
}
}