You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wcs_core/Sln.Wcs.Strategy/MaterialInStoreExecutor.cs

203 lines
6.9 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System.Collections.Concurrent;
using Sln.Wcs.Model.Domain;
using Sln.Wcs.Repository.service;
using Sln.Wcs.Serilog;
namespace Sln.Wcs.Strategy;
/// <summary>
/// 包材入库任务执行器 (taskCategory=1, taskType=1)
/// 轮询待执行任务 → 多任务并行、单任务内串行逐条下发明细
/// </summary>
public class MaterialInStoreExecutor
{
private readonly SerilogHelper _logger;
private readonly ILiveTaskQueueService _taskQueueService;
private readonly ILiveTaskDetailService _taskDetailService;
// 每栋楼提升机限 2 台并发AGV 不限
private static readonly ConcurrentDictionary<int, SemaphoreSlim> HoistSemaphores = new();
// DB 写操作互斥锁SqlSugar Context 共享导致写入不能并发)
private static readonly object DbWriteLock = new();
// 最大同时执行任务数
private static readonly SemaphoreSlim TaskSemaphore = new(10, 10);
private readonly object _lock = new();
private CancellationTokenSource? _cts;
private volatile bool _isRunning;
public bool IsRunning => _isRunning;
public MaterialInStoreExecutor(
SerilogHelper logger,
ILiveTaskQueueService taskQueueService,
ILiveTaskDetailService taskDetailService)
{
_logger = logger;
_taskQueueService = taskQueueService;
_taskDetailService = taskDetailService;
}
public void Start()
{
lock (_lock)
{
if (_isRunning) return;
_cts = new CancellationTokenSource();
_isRunning = true;
}
Task.Run(() => RunLoopAsync(_cts!.Token));
_logger.Info("包材入库调度已启动");
}
public void Stop()
{
CancellationTokenSource? cts;
lock (_lock)
{
if (!_isRunning) return;
cts = _cts;
_isRunning = false;
}
cts?.Cancel();
_logger.Info("包材入库调度已停止");
}
private async Task RunLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await Task.Run(() => ExecuteAsync(ct), ct); }
catch (OperationCanceledException) { break; }
catch (Exception ex) { _logger.Error($"调度异常: {ex.Message}"); }
try { await Task.Delay(5000, ct); }
catch (OperationCanceledException) { break; }
}
}
private void ExecuteAsync(CancellationToken ct)
{
var tasks = _taskQueueService.getLiveTaskQueues(x =>
x.taskType == 1 && x.taskCategory == 1 && x.taskStatus == 1);
if (tasks.Count == 0) return;
_logger.Info($"查询到 {tasks.Count} 条待执行包材入库任务");
var threads = new List<Thread>();
foreach (var task in tasks)
{
var captured = task;
var t = new Thread(() =>
{
TaskSemaphore.Wait();
try
{
if (ct.IsCancellationRequested) return;
_logger.Info($"[线程启动] {captured.taskCode}");
ProcessOneAsync(captured, ct).GetAwaiter().GetResult();
}
catch (Exception ex) { _logger.Error($"任务 {captured.taskCode} 线程异常: {ex.Message}"); }
finally { TaskSemaphore.Release(); }
})
{ IsBackground = true };
t.Start();
threads.Add(t);
}
foreach (var t in threads)
t.Join();
}
private async Task ProcessOneAsync(LiveTaskQueue task, CancellationToken ct)
{
_logger.Info($"开始执行 {task.taskCode},共 {task.taskDetails.Count} 条明细");
lock (DbWriteLock) { task.taskStatus = 2; _taskQueueService.Update(task); }
foreach (var detail in task.taskDetails.OrderBy(d => d.objId))
{
if (ct.IsCancellationRequested) return;
if (detail.taskStatus is 2 or 3)
{
_logger.Info($" 明细 {detail.objId} 已处理,跳过");
continue;
}
lock (DbWriteLock) { detail.taskStatus = 2; _taskDetailService.Update(detail); }
var devName = detail.deviceType switch { 1 => "AGV", 2 => "提升机", _ => "输送线" };
_logger.Info($" → {devName}下发 {detail.taskCode}-{detail.objId}: {detail.startPoint} → {detail.endPoint}");
// 模拟执行在锁外,多任务并行不受影响
var ok = detail.deviceType switch
{
1 => await DispatchAgvAsync(detail),
2 => await DispatchHoistAsync(detail),
_ => await DispatchConveyorAsync(detail)
};
if (ok)
{
lock (DbWriteLock) { detail.taskStatus = 3; _taskDetailService.Update(detail); }
_logger.Info($" ✓ {detail.objId} 完成");
}
else
{
lock (DbWriteLock) { detail.taskStatus = 1; _taskDetailService.Update(detail); }
_logger.Error($" ✗ {detail.objId} 失败,中断任务");
return;
}
}
lock (DbWriteLock) { task.taskStatus = 3; _taskQueueService.Update(task); }
_logger.Info($"任务 {task.taskCode} 执行完成");
}
private async Task<bool> DispatchAgvAsync(LiveTaskDetail detail)
{
// AGV 不限并发,直接下发
_logger.Info($" [AGV] 开始 {detail.startPoint} → {detail.endPoint}");
await Task.Delay(10_000); // 模拟执行 10s
_logger.Info($" [AGV] 完成 {detail.startPoint} → {detail.endPoint}");
return true;
}
private async Task<bool> DispatchHoistAsync(LiveTaskDetail detail)
{
// 每栋楼提升机限 2 台并发
var building = ExtractBuilding(detail.startPoint);
var semaphore = HoistSemaphores.GetOrAdd(building, _ => new SemaphoreSlim(2, 2));
await semaphore.WaitAsync();
try
{
_logger.Info($" [提升机] {building}#楼 开始 (可用:{semaphore.CurrentCount}) {detail.startPoint} → {detail.endPoint}");
await Task.Delay(20_000); // 模拟执行 20s
_logger.Info($" [提升机] {building}#楼 完成 {detail.startPoint} → {detail.endPoint}");
return true;
}
finally
{
semaphore.Release();
}
}
private Task<bool> DispatchConveyorAsync(LiveTaskDetail detail)
{
_logger.Info($" [输送线] {detail.startPoint} → {detail.endPoint}");
return Task.FromResult(true);
}
/// <summary>从接驳位编码中提取楼栋号,如 "13#_L1_HOIST" → 13</summary>
private static int ExtractBuilding(string pointCode)
{
var match = System.Text.RegularExpressions.Regex.Match(pointCode, @"^(\d+)");
return match.Success ? int.Parse(match.Groups[1].Value) : 0;
}
}