定时任务新增多数据源并发执行

master
FCD 2 days ago
parent 62226981d0
commit f2c88e0ecf

@ -0,0 +1,50 @@
package com.op.job.config;
import lombok.Getter;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Getter
public enum DbIdentityEnum {
// 小榄工厂
DS_1000("ds_1000", buildHeader("ds_1000")),
// 雅黛
DS_1010("ds_1010", buildHeader("ds_1010")),
// 重庆工厂
DS_1020("ds_1020", buildHeader("ds_1020")),
// 江西工厂
DS_1030("ds_1030", buildHeader("ds_1030")),
// 安徽工厂
DS_1040("ds_1040", buildHeader("ds_1040"));
// 数据库唯一标识(服务端路由依据)
private final String poolName;
// 该数据库的专属请求头Feign调用时传入
private final Map<String, String> dbHeader;
DbIdentityEnum(String poolName, Map<String, String> dbHeader) {
this.poolName = poolName;
this.dbHeader = dbHeader;
}
/**
*
*/
private static Map<String, String> buildHeader(String poolName) {
Map<String, String> header = new HashMap<>(3);
header.put("poolName", poolName);
return header;
}
/**
* 线
*/
public static Map<String, Map<String, String>> getAllDbHeaders() {
return Stream.of(values())
.collect(Collectors.toMap(DbIdentityEnum::getPoolName, DbIdentityEnum::getDbHeader));
}
}

@ -0,0 +1,40 @@
package com.op.job.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线
* 线
*/
@Configuration
public class MultiDbThreadPoolConfig {
/**
* 线
* /CPU
*/
@Bean("multiDbTaskExecutor")
public ThreadPoolTaskExecutor multiDbTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int cpuCore = Runtime.getRuntime().availableProcessors();
int dbCount = DbIdentityEnum.values().length;
// 核心线程数:数据库数
executor.setCorePoolSize(Math.max(dbCount, cpuCore));
// 最大线程数:核心线程数*2
executor.setMaxPoolSize(Math.max(dbCount, cpuCore) * 2);
// 任务队列:数据库数*3
executor.setQueueCapacity(dbCount * 3);
// 线程名前缀:便于日志排查
executor.setThreadNamePrefix("multi-db-");
// 拒绝策略:调用线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
executor.initialize();
return executor;
}
}

@ -0,0 +1,76 @@
package com.op.job.util;
import com.op.job.config.DbIdentityEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* Feign
* execute
* 线+++
*/
@Component
@Slf4j
public class MultiDbFeignExecutor {
@Autowired
@Qualifier("multiDbTaskExecutor")
private ThreadPoolTaskExecutor multiDbTaskExecutor;
/**
* 线Feign线
* @param taskName
* @param feignCall Feign=
*/
public void execute(String taskName, FeignDbCall feignCall) {
// 1. 获取所有数据库的专属请求头一个库一个独立Map线程安全
Map<String, Map<String, String>> allDbHeaders = DbIdentityEnum.getAllDbHeaders();
if (allDbHeaders.isEmpty()) {
log.warn("【多库执行-{}】无配置的数据库,直接返回", taskName);
return;
}
log.info("【多库执行-{}】开始并行执行,数据库数量:{}", taskName, allDbHeaders.size());
// 2. 多线程并行执行:每个数据库一个独立线程,异常隔离
allDbHeaders.entrySet().stream()
.map(entry -> {
String dbId = entry.getKey();
Map<String, String> dbHeader = entry.getValue();
// 提交异步任务,绑定专属线程池
return CompletableFuture.runAsync(() -> {
try {
log.info("【多库执行-{}】开始执行数据库:{}", taskName, dbId);
// 执行Feign调用传入当前数据库的专属请求头
feignCall.call(dbHeader);
log.info("【多库执行-{}】数据库{}执行成功", taskName, dbId);
} catch (Exception e) {
// 单个数据库执行异常,隔离处理,不影响其他库
log.error("【多库执行-{}】数据库{}执行失败,异常原因:{}",
taskName, dbId, e.getMessage(), e);
}
}, multiDbTaskExecutor);
})
.collect(Collectors.toList())
// 3. 等待所有数据库执行完成(保证定时任务执行完整性)
.forEach(CompletableFuture::join);
log.info("【多库执行-{}】所有数据库执行完毕,总库数:{}", taskName, allDbHeaders.size());
}
/**
* Feign
* MapFeign
*/
@FunctionalInterface
public interface FeignDbCall {
void call(Map<String, String> dbHeader);
}
}
Loading…
Cancel
Save