From f2c88e0ecf2c4e1112a8261a779b0e038203de67 Mon Sep 17 00:00:00 2001 From: FCD <2453864257@qq.com> Date: Tue, 3 Feb 2026 10:04:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=A4=9A=E6=95=B0=E6=8D=AE=E6=BA=90=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/op/job/config/DbIdentityEnum.java | 50 ++++++++++++ .../job/config/MultiDbThreadPoolConfig.java | 40 ++++++++++ .../com/op/job/util/MultiDbFeignExecutor.java | 76 +++++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 op-modules/op-job/src/main/java/com/op/job/config/DbIdentityEnum.java create mode 100644 op-modules/op-job/src/main/java/com/op/job/config/MultiDbThreadPoolConfig.java create mode 100644 op-modules/op-job/src/main/java/com/op/job/util/MultiDbFeignExecutor.java diff --git a/op-modules/op-job/src/main/java/com/op/job/config/DbIdentityEnum.java b/op-modules/op-job/src/main/java/com/op/job/config/DbIdentityEnum.java new file mode 100644 index 000000000..53a9bb102 --- /dev/null +++ b/op-modules/op-job/src/main/java/com/op/job/config/DbIdentityEnum.java @@ -0,0 +1,50 @@ +package com.op.job.config; + +import lombok.Getter; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@Getter +public enum DbIdentityEnum { + // 小榄工厂 + DS_1000("ds_1000", buildHeader("ds_1000")), + // 雅黛 + DS_1010("ds_1010", buildHeader("ds_1010")), + // 重庆工厂 + DS_1020("ds_1020", buildHeader("ds_1020")), + // 江西工厂 + DS_1030("ds_1030", buildHeader("ds_1030")), + // 安徽工厂 + DS_1040("ds_1040", buildHeader("ds_1040")); + + // 数据库唯一标识(服务端路由依据) + private final String poolName; + + // 该数据库的专属请求头(Feign调用时传入) + private final Map dbHeader; + + DbIdentityEnum(String poolName, Map dbHeader) { + this.poolName = poolName; + this.dbHeader = dbHeader; + } + + /** + * 构建数据库专属请求头(统一格式,可按需扩展请求头字段) + */ + private static Map buildHeader(String poolName) { + Map header = new HashMap<>(3); + header.put("poolName", poolName); + return header; + } + + /** + * 获取所有数据库的请求头(用于多线程批量执行) + */ + public static Map> getAllDbHeaders() { + return Stream.of(values()) + .collect(Collectors.toMap(DbIdentityEnum::getPoolName, DbIdentityEnum::getDbHeader)); + } +} \ No newline at end of file diff --git a/op-modules/op-job/src/main/java/com/op/job/config/MultiDbThreadPoolConfig.java b/op-modules/op-job/src/main/java/com/op/job/config/MultiDbThreadPoolConfig.java new file mode 100644 index 000000000..cea4650f1 --- /dev/null +++ b/op-modules/op-job/src/main/java/com/op/job/config/MultiDbThreadPoolConfig.java @@ -0,0 +1,40 @@ +package com.op.job.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 多数据库定时任务专用线程池配置 + * 与其他业务线程池隔离,避免多库并行执行占用核心资源 + */ +@Configuration +public class MultiDbThreadPoolConfig { + + /** + * 多数据库任务执行线程池 + * 核心参数:根据数据库数量/CPU核心数动态调整,避免资源浪费 + */ + @Bean("multiDbTaskExecutor") + public ThreadPoolTaskExecutor multiDbTaskExecutor() { + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + int cpuCore = Runtime.getRuntime().availableProcessors(); + int dbCount = DbIdentityEnum.values().length; + // 核心线程数:数据库数 + executor.setCorePoolSize(Math.max(dbCount, cpuCore)); + // 最大线程数:核心线程数*2 + executor.setMaxPoolSize(Math.max(dbCount, cpuCore) * 2); + // 任务队列:数据库数*3 + executor.setQueueCapacity(dbCount * 3); + // 线程名前缀:便于日志排查 + executor.setThreadNamePrefix("multi-db-"); + // 拒绝策略:调用线程执行 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 初始化线程池 + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/op-modules/op-job/src/main/java/com/op/job/util/MultiDbFeignExecutor.java b/op-modules/op-job/src/main/java/com/op/job/util/MultiDbFeignExecutor.java new file mode 100644 index 000000000..ce0472f9b --- /dev/null +++ b/op-modules/op-job/src/main/java/com/op/job/util/MultiDbFeignExecutor.java @@ -0,0 +1,76 @@ +package com.op.job.util; + +import com.op.job.config.DbIdentityEnum; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * 多数据库Feign调用通用工具类 + * 所有需要多库并行执行的定时任务,直接调用此工具的execute方法 + * 核心:多线程+请求头传递+异常隔离+通用化 + */ +@Component +@Slf4j +public class MultiDbFeignExecutor { + + @Autowired + @Qualifier("multiDbTaskExecutor") + private ThreadPoolTaskExecutor multiDbTaskExecutor; + + /** + * 通用执行方法:多线程并行调用Feign接口,为每个线程传入对应数据库的请求头 + * @param taskName 定时任务名称(用于日志排查) + * @param feignCall 函数式接口:封装Feign调用逻辑(入参=数据库请求头,无返回值) + */ + public void execute(String taskName, FeignDbCall feignCall) { + // 1. 获取所有数据库的专属请求头(一个库一个独立Map,线程安全) + Map> allDbHeaders = DbIdentityEnum.getAllDbHeaders(); + if (allDbHeaders.isEmpty()) { + log.warn("【多库执行-{}】无配置的数据库,直接返回", taskName); + return; + } + + log.info("【多库执行-{}】开始并行执行,数据库数量:{}", taskName, allDbHeaders.size()); + + // 2. 多线程并行执行:每个数据库一个独立线程,异常隔离 + allDbHeaders.entrySet().stream() + .map(entry -> { + String dbId = entry.getKey(); + Map dbHeader = entry.getValue(); + // 提交异步任务,绑定专属线程池 + return CompletableFuture.runAsync(() -> { + try { + log.info("【多库执行-{}】开始执行数据库:{}", taskName, dbId); + // 执行Feign调用,传入当前数据库的专属请求头 + feignCall.call(dbHeader); + log.info("【多库执行-{}】数据库{}执行成功", taskName, dbId); + } catch (Exception e) { + // 单个数据库执行异常,隔离处理,不影响其他库 + log.error("【多库执行-{}】数据库{}执行失败,异常原因:{}", + taskName, dbId, e.getMessage(), e); + } + }, multiDbTaskExecutor); + }) + .collect(Collectors.toList()) + // 3. 等待所有数据库执行完成(保证定时任务执行完整性) + .forEach(CompletableFuture::join); + + log.info("【多库执行-{}】所有数据库执行完毕,总库数:{}", taskName, allDbHeaders.size()); + } + + /** + * 函数式接口:封装「数据库请求头→Feign调用」的逻辑 + * 入参:当前数据库的专属请求头Map,适配所有Feign接口的请求头参数 + */ + @FunctionalInterface + public interface FeignDbCall { + void call(Map dbHeader); + } +} \ No newline at end of file