You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wcs_core/Sln.Wcs.Strategy/MaterialInStoreExecutor.cs

466 lines
17 KiB
C#

using System.Collections.Concurrent;
using System.Net.Http;
using System.Net.Http.Json;
using Sln.Wcs.Model.Domain;
using Sln.Wcs.Repository.service;
using Sln.Wcs.Serilog;
namespace Sln.Wcs.Strategy;
/// <summary>
/// 包材入库任务执行器 (taskCategory=1, taskType=1)
/// 轮询待执行任务 → 多任务并行、单任务内串行逐条下发明细
/// </summary>
public class MaterialInStoreExecutor
{
private readonly SerilogHelper _logger;
private readonly ILiveTaskQueueService _taskQueueService;
private readonly ILiveTaskDetailService _taskDetailService;
// 每栋楼提升机限 2 台并发AGV 不限
private static readonly ConcurrentDictionary<int, SemaphoreSlim> HoistSemaphores = new();
// DB 写操作互斥锁SqlSugar Context 共享导致写入不能并发)
private static readonly object DbWriteLock = new();
// 最大同时执行任务数
private static readonly SemaphoreSlim TaskSemaphore = new(10, 10);
private readonly object _lock = new();
private CancellationTokenSource? _cts;
private volatile bool _isRunning;
public bool IsRunning => _isRunning;
public MaterialInStoreExecutor(
SerilogHelper logger,
ILiveTaskQueueService taskQueueService,
ILiveTaskDetailService taskDetailService)
{
_logger = logger;
_taskQueueService = taskQueueService;
_taskDetailService = taskDetailService;
}
public void Start()
{
lock (_lock)
{
if (_isRunning) return;
_cts = new CancellationTokenSource();
_isRunning = true;
}
Task.Run(() => RunLoopAsync(_cts!.Token));
_logger.Info("包材入库调度已启动");
}
public void Stop()
{
CancellationTokenSource? cts;
lock (_lock)
{
if (!_isRunning) return;
cts = _cts;
_isRunning = false;
}
cts?.Cancel();
_logger.Info("包材入库调度已停止");
}
private async Task RunLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await Task.Run(() => ExecuteAsync(ct), ct); }
catch (OperationCanceledException) { break; }
catch (Exception ex) { _logger.Error($"调度异常: {ex.Message}"); }
try { await Task.Delay(5000, ct); }
catch (OperationCanceledException) { break; }
}
}
private void ExecuteAsync(CancellationToken ct)
{
var tasks = _taskQueueService.getLiveTaskQueues(x =>
x.taskType == 1 && x.taskCategory == 1 && x.taskStatus == 1);
if (tasks.Count == 0) return;
_logger.Info($"查询到 {tasks.Count} 条待执行包材入库任务");
var threads = new List<Thread>();
foreach (var task in tasks)
{
var captured = task;
var t = new Thread(() =>
{
TaskSemaphore.Wait();
try
{
if (ct.IsCancellationRequested) return;
_logger.Info($"[线程启动] {captured.taskCode}");
ProcessOneAsync(captured, ct).GetAwaiter().GetResult();
}
catch (Exception ex) { _logger.Error($"任务 {captured.taskCode} 线程异常: {ex.Message}"); }
finally { TaskSemaphore.Release(); }
})
{ IsBackground = true };
t.Start();
threads.Add(t);
}
foreach (var t in threads)
t.Join();
}
private async Task ProcessOneAsync(LiveTaskQueue task, CancellationToken ct)
{
_logger.Info($"开始执行 {task.taskCode},共 {task.taskDetails.Count} 条明细");
lock (DbWriteLock) { task.taskStatus = 2; _taskQueueService.Update(task); }
var sortedDetails = task.taskDetails.OrderBy(d => d.objId).ToList();
for (int i = 0; i < sortedDetails.Count; i++)
{
if (ct.IsCancellationRequested) return;
var detail = sortedDetails[i];
if (detail.taskStatus is 2 or 3)
{
_logger.Info($" 明细 {detail.objId}-{detail.taskCode} 已处理,跳过");
continue;
}
lock (DbWriteLock) { detail.taskStatus = 2; _taskDetailService.Update(detail); }
var nextDetail = i + 1 < sortedDetails.Count ? sortedDetails[i + 1] : null;
var devName = detail.deviceType switch { 1 => "AGV", 2 => "提升机", _ => "输送线" };
_logger.Info($" → {devName}下发 {detail.taskCode}-{detail.objId}: {detail.startPoint} → {detail.endPoint}");
var ok = detail.deviceType switch
{
1 => await DispatchAgvAsync(detail, nextDetail),
2 => await DispatchHoistAsync(detail),
_ => await DispatchConveyorAsync(detail)
};
if (ok)
{
lock (DbWriteLock) { detail.taskStatus = 3; _taskDetailService.Update(detail); }
_logger.Info($" ✓ {detail.taskCode}-{detail.objId} 完成");
}
else
{
lock (DbWriteLock) { detail.taskStatus = 1; _taskDetailService.Update(detail); }
_logger.Error($" ✗ {detail.taskCode}-{detail.objId} 失败,中断任务");
return;
}
}
lock (DbWriteLock) { task.taskStatus = 3; _taskQueueService.Update(task); }
_logger.Info($"任务 {task.taskCode} 执行完成");
}
private async Task<bool> DispatchAgvAsync(LiveTaskDetail detail, LiveTaskDetail? nextDetail = null)
{
const string agvBaseUrl = "http://localhost:5200";
const string hoistBaseUrl = "http://localhost:5100";
const int pollIntervalMs = 2000;
using var httpClient = new HttpClient { Timeout = TimeSpan.FromSeconds(30) };
_logger.Info($" [AGV] 下发 {detail.taskCode}-{detail.objId}: {detail.startPoint} → {detail.endPoint}");
// 1. 调用 /task/receive 下发 AGV 任务
try
{
string[] startPointArray = detail.startPoint.Split("_");
string[] endPointArray = detail.endPoint.Split("_");
var res = await httpClient.PostAsJsonAsync($"{agvBaseUrl}/api/task/receive",
new { taskCode = detail.taskCode, startPoint = startPointArray[2], endPoint = endPointArray[2] });
res.EnsureSuccessStatusCode();
var receiveResult = await res.Content.ReadFromJsonAsync<AGVTaskReceiveResponse>();
if (receiveResult is not { isSuccess: true })
{
_logger.Error($" [AGV] {detail.taskCode} 下发失败: AGV 调度中心拒绝任务");
return false;
}
}
catch (Exception ex)
{
_logger.Error($" [AGV] {detail.taskCode} 下发失败: {ex.Message}");
return false;
}
_logger.Info($" [AGV] {detail.taskCode} 下发成功,开始轮询任务状态...");
// 2. 循环调用 /task/status 获取 AGV 任务状态
var hoistDispatched = false;
while (true)
{
await Task.Delay(pollIntervalMs);
try
{
var statusRes = await httpClient.GetFromJsonAsync<AGVTaskStatusResponse>(
$"{agvBaseUrl}/api/task/status?taskCode={Uri.EscapeDataString(detail.taskCode!)}");
var taskStatus = statusRes?.taskStatus ?? string.Empty;
_logger.Info($" [AGV] {detail.taskCode} 任务状态: {taskStatus}");
switch (taskStatus)
{
case "FINISHED":
case "MANUALED":
_logger.Info($" [AGV] {detail.taskCode} 完成 {detail.startPoint} → {detail.endPoint}");
return true;
case "CANCELLED":
_logger.Error($" [AGV] {detail.taskCode} 任务已取消 {detail.startPoint} → {detail.endPoint}");
return false;
case "WAIT":
_logger.Info($" [AGV] {detail.taskCode} 等待 {detail.startPoint} → {detail.endPoint}");
if (!hoistDispatched && nextDetail is { deviceType: 2 })
{
hoistDispatched = await TryDispatchHoistForWaitAsync(httpClient, hoistBaseUrl, nextDetail);
if (hoistDispatched)
await ContinueAgvAsync(httpClient, agvBaseUrl, nextDetail);
}
break;
}
}
catch (Exception ex)
{
_logger.Error($" [AGV] 状态查询异常: {ex.Message}");
}
}
}
/// <summary>AGV 等待时筛选空闲提升机并下发提升机任务</summary>
private async Task<bool> TryDispatchHoistForWaitAsync(HttpClient httpClient, string hoistBaseUrl, LiveTaskDetail detail)
{
var hostCode = DetermineHoistCode(detail);
_logger.Info($" [AGV] 任务等待中,筛选空闲提升机 ({hostCode})...");
try
{
FreeHoistResponse? freeRes;
while (true)
{
freeRes = await httpClient.GetFromJsonAsync<FreeHoistResponse>(
$"{hoistBaseUrl}/api/hoist/free?hostCode={Uri.EscapeDataString(hostCode!)}");
if (freeRes is { found: true })
break;
_logger.Info($" [AGV] 暂无空闲提升机 ({hostCode}),等待重试...");
await Task.Delay(1000);
}
var dispatchRes = await httpClient.PostAsJsonAsync($"{hoistBaseUrl}/api/task/dispatch",
new
{
hostCode = freeRes.hostCode,
serialNo = freeRes.deviceSerialNo,
taskCode = detail.taskCode,
startPoint = detail.startPoint,
endPoint = detail.endPoint
});
var result = await dispatchRes.Content.ReadFromJsonAsync<ApiResult>();
if (result is not { success: true })
{
_logger.Error($" [AGV] 提升机下发失败: {result?.msg ?? ""}");
return false;
}
var execDevice = $"{freeRes.hostCode}_{freeRes.deviceSerialNo}";
_logger.Info($" [AGV] 提升机 {execDevice} 已下发");
lock (DbWriteLock)
{
detail.execDevice = execDevice;
_taskDetailService.Update(detail);
}
return true;
}
catch (Exception ex)
{
_logger.Error($" [AGV] 提升机调度失败: {ex.Message}");
return false;
}
}
/// <summary>通知 AGV 调度中心继续执行</summary>
private async Task ContinueAgvAsync(HttpClient httpClient, string agvBaseUrl, LiveTaskDetail hoistDetail)
{
try
{
var res = await httpClient.PostAsJsonAsync($"{agvBaseUrl}/api/task/continue",
new
{
taskCode = hoistDetail.taskCode,
startPoint = hoistDetail.startPoint,
endPoint = hoistDetail.endPoint,
execDevice = hoistDetail.execDevice
});
res.EnsureSuccessStatusCode();
var result = await res.Content.ReadFromJsonAsync<AGVTaskReceiveResponse>();
if (result is { isSuccess: true })
_logger.Info($" [AGV] 继续执行已下发 (提升机 {hoistDetail.execDevice})");
else
_logger.Error($" [AGV] 继续执行下发失败: AGV 调度中心拒绝");
}
catch (Exception ex)
{
_logger.Error($" [AGV] 继续执行下发异常: {ex.Message}");
}
}
/// <summary>根据 startPoint 确定提升机编号15栋入库→1#Host, 15栋出库→2#Host, 14栋→3#Host, 13栋→4#Host</summary>
private static string DetermineHoistCode(LiveTaskDetail detail)
{
var pt = detail.startPoint ?? "";
if (detail.taskType == 2 && detail.taskCategory == 2 && pt.Contains("15#"))
return "2#Host";
if (pt.Contains("13#")) return "4#Host";
if (pt.Contains("14#")) return "3#Host";
if (pt.Contains("15#")) return "1#Host";
return "1#Host";
}
private async Task<bool> DispatchHoistAsync(LiveTaskDetail detail)
{
const string hoistBaseUrl = "http://localhost:5100";
var building = ExtractBuilding(detail.startPoint);
var semaphore = HoistSemaphores.GetOrAdd(building, _ => new SemaphoreSlim(2, 2));
await semaphore.WaitAsync();
try
{
if (string.IsNullOrWhiteSpace(detail.execDevice))
{
_logger.Error($" [提升机] {building}#楼 未分配执行设备");
return false;
}
var parts = detail.execDevice.Split('_');
if (parts.Length != 2 || !int.TryParse(parts[1], out var serialNo))
{
_logger.Error($" [提升机] {building}#楼 设备编号格式错误: {detail.execDevice}");
return false;
}
var hostCode = parts[0];
_logger.Info($" [提升机] {building}#楼 启动 {detail.startPoint} → {detail.endPoint} (设备:{detail.execDevice})");
using var httpClient = new HttpClient { Timeout = TimeSpan.FromSeconds(10) };
var res = await httpClient.PostAsJsonAsync($"{hoistBaseUrl}/api/hoist/receive-pallet",
new
{
hostCode = hostCode,
serialNo = serialNo,
taskCode = detail.taskCode,
palletBarcode = detail.palletBarcode ?? "",
startPoint = detail.startPoint,
endPoint = detail.endPoint
});
var result = await res.Content.ReadFromJsonAsync<ApiResult>();
if (result is not { success: true })
{
_logger.Error($" [提升机] {building}#楼 启动失败: {result?.msg ?? ""}");
return false;
}
_logger.Info($" [提升机] {building}#楼 已启动 {detail.startPoint} → {detail.endPoint}");
lock (DbWriteLock)
{
detail.taskStatus = 2;
detail.execDevice = $"{hostCode}_{serialNo}";
_taskDetailService.Update(detail);
}
// 轮询等待提升机任务完成
_logger.Info($" [提升机] {building}#楼 等待任务完成...");
using var waitClient = new HttpClient { Timeout = TimeSpan.FromSeconds(10) };
while (true)
{
var waitRes = await waitClient.GetFromJsonAsync<WaitCompleteResponse>(
$"{hoistBaseUrl}/api/hoist/wait-complete?hostCode={Uri.EscapeDataString(hostCode)}&deviceSerialNo={serialNo}");
if (waitRes is { isComplete: true })
{
_logger.Info($" [提升机] {building}#楼 任务完成 {detail.startPoint} → {detail.endPoint}");
return true;
}
await Task.Delay(2000);
}
}
catch (Exception ex)
{
_logger.Error($" [提升机] {building}#楼 异常: {ex.Message}");
return false;
}
finally
{
semaphore.Release();
}
}
private Task<bool> DispatchConveyorAsync(LiveTaskDetail detail)
{
_logger.Info($" [输送线] {detail.startPoint} → {detail.endPoint}");
return Task.FromResult(true);
}
/// <summary>从接驳位编码中提取楼栋号,如 "13#_L1_HOIST" → 13</summary>
private static int ExtractBuilding(string pointCode)
{
var match = System.Text.RegularExpressions.Regex.Match(pointCode, @"^(\d+)");
return match.Success ? int.Parse(match.Groups[1].Value) : 0;
}
// ---- API 响应 DTO ----
private class AGVTaskReceiveResponse
{
public bool isSuccess { get; set; } = false;
}
private class AGVTaskStatusResponse
{
public string taskStatus { get; set; } = string.Empty;
}
private class FreeHoistResponse
{
public bool found { get; set; }
public string? deviceCode { get; set; }
public string? deviceName { get; set; }
public string? hostCode { get; set; }
public int deviceSerialNo { get; set; }
}
private class WaitCompleteResponse
{
public bool isComplete { get; set; }
}
private class ApiResult
{
public bool success { get; set; }
public string? msg { get; set; }
}
}