update 导入异步执行导入任务、发送处理结果到回调接口

master
yinq 2 months ago
parent 94709d3a74
commit 789c5060f7

@ -1,5 +1,7 @@
package hw.tagApi.service.constant;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* API
*
@ -8,10 +10,16 @@ package hw.tagApi.service.constant;
*/
public class ApiConstants {
/**
*
*/
public static final String CALLBACK_URL = "https://www.kdocs.cn/chatflow/api/v2/func/webhook/2uZIRSpxLvtfaoCjRoLg4EvwIc5";
/**
*
*/
public static final String DATA_STATUS = "00";
/**
*
*/
@ -21,6 +29,7 @@ public class ApiConstants {
*
*/
public static final String DATA_QUERY = "20";
/**
*
*/

@ -5,16 +5,14 @@ import hw.tagApi.common.core.domain.AjaxResult;
import hw.tagApi.service.constant.ApiConstants;
import hw.tagApi.service.domain.ApiContent;
import hw.tagApi.service.domain.ApiRequest;
import hw.tagApi.service.domain.ApiResponse;
import hw.tagApi.service.service.IKDocsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Controller
@ -24,7 +22,7 @@ import java.util.Map;
*/
@RestController
@RequestMapping("/docs")
public class KDocsApiController extends BaseController {
public class KDocsApiController {
private static final Logger log = LoggerFactory.getLogger(KDocsApiController.class);
@ -38,88 +36,55 @@ public class KDocsApiController extends BaseController {
* @return
*/
@PostMapping("/api")
public AjaxResult handleRequest(@RequestHeader("API-Key") String apiKey, @RequestBody ApiRequest request) {
public ApiResponse handleRequest(@RequestHeader("API-Key") String apiKey, @RequestBody ApiRequest request) {
ApiResponse apiResponse = new ApiResponse();
// 验证API-Key
if (!validateApiKey(apiKey)) {
log.warn("无效的API-Key: {}", apiKey);
return AjaxResult.error("无效的API-Key");
apiResponse.setCaseCode("无效的API-Key:" + apiKey);
return apiResponse;
}
try {
String caseCode = request.getCASE();
List<ApiContent> content = request.getCONTENT();
apiResponse.setCaseCode(caseCode);
// 打印接收到的请求参数
log.info("接收到请求 - CASE: {}, CONTENT: {}", caseCode, content);
switch (caseCode) {
case ApiConstants.DATA_STATUS:
log.info("查询数据库状态");
return AjaxResult.success("数据库状态正常");
return apiResponse;
case ApiConstants.DATA_IMPORT:
// 数据导入
if (content == null || content.isEmpty()) {
log.warn("导入数据为空");
return AjaxResult.error(ApiConstants.ImportStatus.IMPORT_FAILED, "导入数据不能为空");
return apiResponse;
}
log.info("开始导入数据: {}", content);
// TODO: 调用导入服务
return AjaxResult.success(ApiConstants.ImportStatus.IMPORT_SUCCESS);
List<ApiContent> apiContents = ikDocsService.importDataList(content);
apiResponse.setResult(apiContents);
return apiResponse;
case ApiConstants.DATA_QUERY:
// 数据查询
if (content == null || content.isEmpty()) {
log.warn("查询条件为空");
return AjaxResult.error(ApiConstants.SearchStatus.NO_RECORD, "查询条件不能为空");
return apiResponse;
}
log.info("开始查询数据: {}", content);
// TODO: 调用查询服务
return AjaxResult.success(ApiConstants.SearchStatus.SEARCH_SUCCESS);
return apiResponse;
case ApiConstants.DATA_EXPORT:
// 数据导出
log.info("开始导出数据");
Map<String, Object> response = new HashMap<>();
response.put("CASE", ApiConstants.DATA_EXPORT);
List<Map<String, Object>> result = new ArrayList<>();
Map<String, Object> item1 = new HashMap<>();
Map<String, String> fields1 = new HashMap<>();
fields1.put("导出状态", "导出中");
fields1.put("情况说明", "预计需要3分钟");
item1.put("fields", fields1);
item1.put("id", "ci");
result.add(item1);
Map<String, Object> item2 = new HashMap<>();
Map<String, String> fields2 = new HashMap<>();
fields2.put("导出状态", "导出失败");
fields2.put("情况说明", "");
item2.put("fields", fields2);
item2.put("id", "56");
result.add(item2);
Map<String, Object> item3 = new HashMap<>();
Map<String, String> fields3 = new HashMap<>();
fields3.put("导出状态", "导出中");
fields3.put("情况说明", "预计需要10分钟");
item3.put("fields", fields3);
item3.put("id", "9a");
result.add(item3);
response.put("RESULT", result);
log.info("导出数据完成: {}", response);
return AjaxResult.success(response);
return apiResponse;
default:
log.warn("未知的功能代码: {}", caseCode);
return AjaxResult.error("未知的功能代码");
return apiResponse;
}
} catch (Exception e) {
log.error("处理请求时发生错误", e);
return AjaxResult.error("系统错误:" + e.getMessage());
return apiResponse;
}
}

@ -49,7 +49,7 @@ public class HwTagRecord extends BaseEntity
private Long totalQuantity;
/** 标签批次号(预留字段) */
@Excel(name = "标签批次号", readConverterExp = "预=留字段")
@Excel(name = "标签批次号")
private String tagBatch;
/** 型号,导入提供 */
@ -68,16 +68,16 @@ public class HwTagRecord extends BaseEntity
/** 数据状态标记 */
private Integer delFlag;
/** 预字段1 */
@Excel(name = "预览字段1")
/** 预字段1 */
// @Excel(name = "预留字段1")
private String fields1;
/** 预字段2 */
@Excel(name = "预览字段2")
/** 预字段2 */
// @Excel(name = "预留字段2")
private String fields2;
/** 预字段3 */
@Excel(name = "预览字段3")
/** 预字段3 */
// @Excel(name = "预留字段3")
private String fields3;
public void setrId(Long rId)

@ -58,4 +58,18 @@ public interface IHwTagRecordService
* @return
*/
public int deleteHwTagRecordByRId(Long rId);
/**
* TID
*
* @param record
*/
public void updateHwTagRecordByTID(HwTagRecord record);
/**
*
*
* @param batchList
*/
public void batchInsertHwTagRecord(List<HwTagRecord> batchList);
}

@ -96,6 +96,26 @@ public class HwTagRecordServiceImpl implements IHwTagRecordService
return hwTagRecordMapper.deleteHwTagRecordByRId(rId);
}
/**
* TID
*
* @param record
*/
@Override
public void updateHwTagRecordByTID(HwTagRecord record) {
hwTagRecordMapper.updateHwTagRecordByTID(record);
}
/**
*
*
* @param batchList
*/
@Override
public void batchInsertHwTagRecord(List<HwTagRecord> batchList) {
hwTagRecordMapper.batchInsertHwTagRecord(batchList);
}
/**
* hwTagRecord
* @param hwTagRecord hwTagRecord

@ -1,9 +1,13 @@
package hw.tagApi.service.service.impl;
import hw.tagApi.common.utils.uuid.IdGenerator;
import hw.tagApi.service.service.IHwTagRecordService;
import hw.tagApi.service.service.IKDocsService;
import hw.tagApi.service.utils.httpClientUtils;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
@ -15,6 +19,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import hw.tagApi.service.constant.ApiConstants;
import hw.tagApi.service.mapper.HwTagRecordMapper;
@ -22,11 +27,9 @@ import hw.tagApi.service.domain.HwTagRecord;
import hw.tagApi.common.utils.poi.TagExcelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import hw.tagApi.service.domain.ApiResponse;
import hw.tagApi.service.domain.ApiContent;
/**
*
@ -35,137 +38,187 @@ import org.apache.http.util.EntityUtils;
* @date 2025-04-15
*/
@Service
public class KDocsServiceImpl implements IKDocsService
{
public class KDocsServiceImpl implements IKDocsService {
private static final Logger log = LoggerFactory.getLogger(KDocsServiceImpl.class);
@Autowired
private HwTagRecordMapper hwTagRecordMapper;
private IHwTagRecordService tagRecordService;
@Autowired
private CloseableHttpClient httpClient;
private static final int BATCH_SIZE = 100;
private static final int BATCH_SIZE = 10;
private List<HwTagRecord> batchList = new ArrayList<>();
// 创建线程池
private static final ExecutorService executorService = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:由调用线程处理
);
/**
*
*
* @param contentList ApiContent
* @return ApiContent
*/
@Override
public String importData(String url) {
public List<ApiContent> importDataList(List<ApiContent> contentList) {
List<ApiContent> resApiContentList = getApiContents(contentList);
// 异步执行导入任务
executorService.execute(() -> {
List<ApiContent> resList = new ArrayList<>();
for (ApiContent apiContent : contentList) {
ApiContent resContent = new ApiContent();
resContent.setId(apiContent.getId());
try {
ApiContent result = this.importData(apiContent);
// 更新响应内容
resContent.setFields(result.getFields());
} catch (Exception e) {
log.error("导入任务执行失败: {}", e.getMessage(), e);
Map<String, String> errorFields = new HashMap<>();
errorFields.put(ApiConstants.IMPORT_STATUS, "系统错误");
errorFields.put(ApiConstants.DESCRIPTION, "导入任务执行失败: " + e.getMessage());
resContent.setFields(errorFields);
}
resList.add(resContent);
}
// 发送处理结果到回调接口
httpClientUtils.sendResultToCallback(resList);
});
return resApiContentList;
}
/**
* ApiContent
*
* @param contentList ApiContent
* @return ApiContent
*/
private static List<ApiContent> getApiContents(List<ApiContent> contentList) {
List<ApiContent> resApiContentList = new ArrayList<>();
for (ApiContent apiContent : contentList) {
// 创建初始响应
Map<String, String> resFields = new HashMap<>();
resFields.put(ApiConstants.IMPORT_STATUS, "导入中");
resFields.put(ApiConstants.DESCRIPTION, "后台处理中");
ApiContent responseContent = new ApiContent();
responseContent.setId(apiContent.getId());
responseContent.setFields(resFields);
resApiContentList.add(responseContent);
}
return resApiContentList;
}
/**
*
*
* @param apiContent ApiContent
* @return ApiContent
*/
public ApiContent importData(ApiContent apiContent) {
Map<String, String> fields = apiContent.getFields();
String importMode = fields.get("导入模式");
String model = fields.getOrDefault("型号", "");
String fileUrl = fields.get("文件链接");
List<HwTagRecord> batchList = new ArrayList<>();
Map<String, String> resFields = new HashMap<>();
apiContent.setFields(resFields);
try {
log.info("开始从金山文档下载Excel文件: {}", url);
// 创建HttpGet请求
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36");
httpGet.setHeader("Accept", "*/*");
httpGet.setHeader("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8");
httpGet.setHeader("Connection", "keep-alive");
// 执行请求
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
// 获取响应状态码
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
log.error("从金山文档获取文件失败,响应码: {}", statusCode);
return ApiConstants.SYSTEM_ERROR;
log.info("开始处理文件URL: {}", fileUrl);
// 获取文件字节数组
byte[] fileData = httpClientUtils.getByteArrayFromUrl(fileUrl);
if (fileData == null) {
log.error("获取文件失败");
resFields.put(ApiConstants.IMPORT_STATUS, "系统错误");
resFields.put(ApiConstants.DESCRIPTION, "获取文件失败");
return apiContent;
}
// 解析Excel数据
try (ByteArrayInputStream bis = new ByteArrayInputStream(fileData)) {
Map<String, Object> excelResult = TagExcelUtil.parseTagExcel(bis);
if (!excelResult.containsKey("tagList")) {
log.error("解析Excel文件失败未找到标签数据");
resFields.put(ApiConstants.IMPORT_STATUS, "导入失败");
resFields.put(ApiConstants.DESCRIPTION, "解析Excel文件失败未找到标签数据");
return apiContent;
}
// 获取响应实体
HttpEntity entity = response.getEntity();
if (entity == null) {
log.error("无法从金山文档获取文件或文件为空");
return ApiConstants.SYSTEM_ERROR;
}
// 将响应实体转换为字节数组
byte[] fileData = EntityUtils.toByteArray(entity);
if (fileData.length == 0) {
log.error("无法从金山文档获取文件或文件为空");
return ApiConstants.SYSTEM_ERROR;
}
// 直接使用字节流处理Excel数据
Map<String, Object> excelResult;
try (ByteArrayInputStream bis = new ByteArrayInputStream(fileData)) {
excelResult = TagExcelUtil.parseTagExcel(bis);
}
// 获取订单信息
Map<String, String> orderInfo = (Map<String, String>) excelResult.get("orderInfo");
String orderNo = orderInfo.get("orderNo");
String batchNo = orderInfo.get("batchNo");
String operatorId = orderInfo.get("operatorId");
String processTime = orderInfo.get("processTime");
log.info("解析订单信息 - 订单号: {}, 批次号: {}, 操作员: {}", orderNo, batchNo, operatorId);
// 获取标签数据列表
List<Map<String, String>> tagList = (List<Map<String, String>>) excelResult.get("tagList");
// 批量处理标签数据
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Map<String, String> tagData : tagList) {
HwTagRecord record = new HwTagRecord();
record.setOrderCode(orderNo);
record.setBatchNumber(batchNo);
record.setOperatorId(operatorId);
record.setProcessingTime(sdf.parse(processTime));
record.setOrderCode(orderInfo.get("orderNo"));
record.setBatchNumber(orderInfo.get("batchNo"));
record.setOperatorId(orderInfo.get("operatorId"));
record.setProcessingTime(sdf.parse(orderInfo.get("processTime")));
// 设置标签数据
record.setEpc(tagData.get("EPC"));
record.settId(tagData.get("TID"));
record.setModelCode(model);
record.setTagSequence(tagData.get("序号"));
// 添加到批处理列表
batchList.add(record);
record.settId(tagData.get("TID"));
record.setEpc(tagData.get("EPC"));
record.setPassword(tagData.get("密码"));
record.setFields1(tagData.get("测试结果"));
record.setFields2(tagData.get("测试值"));
record.setFields3(tagData.get("参考值"));
record.setRemark(tagData.get("测试时间"));
// 达到批量大小时执行插入
if ("更新".equals(importMode)) {
tagRecordService.updateHwTagRecordByTID(record);
continue;
}
// 添加到批处理列表
record.setrId(IdGenerator.nextId());
batchList.add(record);
if (batchList.size() >= BATCH_SIZE) {
hwTagRecordMapper.batchInsertHwTagRecord(batchList);
tagRecordService.batchInsertHwTagRecord(batchList);
log.info("批量插入{}条数据成功", batchList.size());
batchList.clear();
}
}
// 处理剩余的数据
if (!batchList.isEmpty()) {
hwTagRecordMapper.batchInsertHwTagRecord(batchList);
tagRecordService.batchInsertHwTagRecord(batchList);
log.info("批量插入剩余{}条数据成功", batchList.size());
batchList.clear();
}
return getSystemStatus();
resFields.put(ApiConstants.IMPORT_STATUS, "导入完成");
}
} catch (Exception e) {
log.error("导入数据失败: {}", e.getMessage(), e);
return ApiConstants.SYSTEM_ERROR;
log.error("处理文件失败: {}", e.getMessage(), e);
resFields.put(ApiConstants.IMPORT_STATUS, "系统错误");
resFields.put(ApiConstants.DESCRIPTION, e.getMessage());
}
return apiContent;
}
@Override
public String getSystemStatus() {
Map<String, Object> response = new HashMap<>();
response.put("CASE", ApiConstants.DATA_IMPORT);
List<Map<String, Object>> result = new ArrayList<>();
Map<String, Object> statusItem = new HashMap<>();
Map<String, String> statusFields = new HashMap<>();
statusFields.put("导入状态", ApiConstants.IMPORTING);
statusItem.put("fields", statusFields);
statusItem.put("id", "16");
result.add(statusItem);
Map<String, Object> errorItem = new HashMap<>();
Map<String, String> errorFields = new HashMap<>();
errorFields.put("导入模式", ApiConstants.SYSTEM_ERROR);
errorFields.put("情况说明", "");
errorItem.put("fields", errorFields);
errorItem.put("id", "s5");
result.add(errorItem);
response.put("RESULT", result);
return null;
try {
ApiResponse response = new ApiResponse();
response.setCaseCode("10");
// 添加导入模式信息
ApiContent modeContent = new ApiContent();
Map<String, String> modeFields = new HashMap<>();
modeFields.put(ApiConstants.DESCRIPTION, "");
// 使用ObjectMapper将对象转换为JSON字符串
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(response);
} catch (Exception e) {
log.error("生成系统状态响应失败", e);
return null;
}
}
@Override

@ -1,20 +1,40 @@
package hw.tagApi.service.utils;
import hw.tagApi.service.constant.ApiConstants;
import hw.tagApi.service.domain.ApiContent;
import hw.tagApi.service.domain.ApiResponse;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
public class httpClientUtils {
private static final Logger log = LoggerFactory.getLogger(httpClientUtils.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final CloseableHttpClient httpClient = HttpClients.createDefault();
// 超时配置
private static final int CONNECT_TIMEOUT = 5000;
private static final int SOCKET_TIMEOUT = 10000;
private static final RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT)
.build();
/**
* HttpGet
*
* @param url URL
* @return HttpGet
*/
@ -30,15 +50,12 @@ public class httpClientUtils {
/**
* URL
* @param httpClient HTTP
*
* @param url URL
* @return null
*/
public static byte[] getByteArrayFromUrl(CloseableHttpClient httpClient, String url) {
log.info("开始从URL下载文件: {}", url);
public static byte[] getByteArrayFromUrl(String url) {
HttpGet httpGet = createHttpGetRequest(url);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
// 获取响应状态码
int statusCode = response.getStatusLine().getStatusCode();
@ -46,28 +63,85 @@ public class httpClientUtils {
log.error("获取文件失败,响应码: {}", statusCode);
return null;
}
// 获取响应实体
HttpEntity entity = response.getEntity();
if (entity == null) {
log.error("无法获取响应实体");
return null;
}
// 将响应实体转换为字节数组
byte[] fileData = EntityUtils.toByteArray(entity);
if (fileData.length == 0) {
log.error("获取到的文件为空");
return null;
}
log.info("成功获取文件,大小: {} 字节", fileData.length);
return fileData;
} catch (IOException e) {
log.error("获取文件时发生错误: {}", e.getMessage(), e);
return null;
}
}
/**
*
*
* @param resList
*/
public static void sendResultToCallback(List<ApiContent> resList) {
if (resList == null || resList.isEmpty()) {
log.warn("处理结果列表为空,不发送回调请求");
return;
}
HttpPost httpPost = null;
try {
// 创建POST请求
httpPost = new HttpPost(ApiConstants.CALLBACK_URL);
httpPost.setConfig(requestConfig);
// 设置请求头
httpPost.setHeader("Content-Type", "application/json");
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Connection", "close");
// 创建响应对象
ApiResponse response = new ApiResponse();
response.setCaseCode(ApiConstants.DATA_IMPORT);
response.setResult(resList);
// 将对象转换为JSON字符串
String jsonBody = objectMapper.writeValueAsString(response);
log.debug("准备发送的数据: {}", jsonBody);
// 设置请求体
StringEntity entity = new StringEntity(jsonBody, "UTF-8");
httpPost.setEntity(entity);
// 发送请求
try (CloseableHttpResponse httpResponse = httpClient.execute(httpPost)) {
int statusCode = httpResponse.getStatusLine().getStatusCode();
HttpEntity responseEntity = httpResponse.getEntity();
String responseBody = responseEntity != null ? EntityUtils.toString(responseEntity, "UTF-8") : "";
if (statusCode == 200) {
log.info("发送处理结果到回调接口成功,响应状态码: {}", statusCode);
log.debug("回调接口响应内容: {}", responseBody);
} else {
log.error("发送处理结果到回调接口失败,响应状态码: {}, 响应内容: {}", statusCode, responseBody);
}
}
} catch (Exception e) {
log.error("发送处理结果到回调接口失败: {}", e.getMessage());
} finally {
if (httpPost != null) {
httpPost.releaseConnection();
}
}
}
}

Loading…
Cancel
Save