update seata 1.5.2 => 1.6.0 适配升级

dev
疯狂的狮子li 3 years ago
parent 0bb385f6cf
commit efc4bf5a0a

@ -16,9 +16,8 @@
<properties> <properties>
<spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version> <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>
<sentinel.version>1.8.6</sentinel.version> <sentinel.version>1.8.6</sentinel.version>
<seata.version>1.5.2</seata.version> <seata.version>1.6.0</seata.version>
<nacos.client.version>2.1.2</nacos.client.version> <nacos.client.version>2.1.2</nacos.client.version>
<nacos.config.version>2.0.4</nacos.config.version>
<dubbo.version>3.1.3</dubbo.version> <dubbo.version>3.1.3</dubbo.version>
<spring.context.support.version>1.0.11</spring.context.support.version> <spring.context.support.version>1.0.11</spring.context.support.version>
</properties> </properties>
@ -36,11 +35,6 @@
<artifactId>nacos-client</artifactId> <artifactId>nacos-client</artifactId>
<version>${nacos.client.version}</version> <version>${nacos.client.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-config</artifactId>
<version>${nacos.config.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.alibaba.csp</groupId> <groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId> <artifactId>sentinel-core</artifactId>

@ -17,16 +17,16 @@
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot Seata --> <!-- SpringBoot Seata -->
<dependency> <dependency>
<groupId>com.alibaba.cloud</groupId> <groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
</dependencies> </dependencies>

@ -1,107 +0,0 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.integration.dubbo;
import io.seata.common.util.StringUtils;
import io.seata.core.constants.DubboConstants;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The type Transaction propagation filter.
*
* @author sharajava
*/
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class ApacheDubboTransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();
String rpcXid = getRpcXid();
String rpcBranchType = RpcContext.getServiceContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
}
boolean bind = false;
if (xid != null) {
RpcContext.getServiceContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getServiceContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
}
}
try {
return invoker.invoke(invocation);
} finally {
if (bind) {
BranchType previousBranchType = RootContext.getBranchType();
String unbindXid = RootContext.unbind();
if (BranchType.TCC == previousBranchType) {
RootContext.unbindBranchType();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid,
rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);
if (BranchType.TCC == previousBranchType) {
RootContext.bindBranchType(BranchType.TCC);
LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);
}
}
}
}
RpcContext.getServiceContext().removeAttachment(RootContext.KEY_XID);
RpcContext.getServiceContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID);
RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
}
}
/**
* get rpc xid
* @return
*/
private String getRpcXid() {
String rpcXid = RpcContext.getServiceContext().getAttachment(RootContext.KEY_XID);
if (rpcXid == null) {
rpcXid = RpcContext.getServiceContext().getAttachment(RootContext.KEY_XID.toLowerCase());
}
return rpcXid;
}
}

@ -27,7 +27,7 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<seata.version>1.5.2</seata.version> <seata.version>1.6.0</seata.version>
<jcommander.version>1.72</jcommander.version> <jcommander.version>1.72</jcommander.version>
<druid.version>1.2.12</druid.version> <druid.version>1.2.12</druid.version>
</properties> </properties>
@ -41,6 +41,13 @@
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-dependencies</artifactId>
<version>${seata.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@ -49,6 +56,12 @@
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j-to-slf4j</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.seata</groupId> <groupId>io.seata</groupId>
@ -63,6 +76,12 @@
<dependency> <dependency>
<groupId>io.seata</groupId> <groupId>io.seata</groupId>
<artifactId>seata-config-all</artifactId> <artifactId>seata-config-all</artifactId>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.seata</groupId> <groupId>io.seata</groupId>
@ -150,6 +169,10 @@
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
</dependency> </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<!-- logback appenders --> <!-- logback appenders -->
<dependency> <dependency>
<groupId>net.logstash.logback</groupId> <groupId>net.logstash.logback</groupId>

@ -20,25 +20,7 @@ import io.seata.core.exception.AbstractExceptionHandler;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.exception.TransactionExceptionCode; import io.seata.core.exception.TransactionExceptionCode;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.transaction.AbstractGlobalEndRequest; import io.seata.core.protocol.transaction.*;
import io.seata.core.protocol.transaction.AbstractGlobalEndResponse;
import io.seata.core.protocol.transaction.BranchRegisterRequest;
import io.seata.core.protocol.transaction.BranchRegisterResponse;
import io.seata.core.protocol.transaction.BranchReportRequest;
import io.seata.core.protocol.transaction.BranchReportResponse;
import io.seata.core.protocol.transaction.GlobalBeginRequest;
import io.seata.core.protocol.transaction.GlobalBeginResponse;
import io.seata.core.protocol.transaction.GlobalCommitRequest;
import io.seata.core.protocol.transaction.GlobalCommitResponse;
import io.seata.core.protocol.transaction.GlobalLockQueryRequest;
import io.seata.core.protocol.transaction.GlobalLockQueryResponse;
import io.seata.core.protocol.transaction.GlobalReportRequest;
import io.seata.core.protocol.transaction.GlobalReportResponse;
import io.seata.core.protocol.transaction.GlobalRollbackRequest;
import io.seata.core.protocol.transaction.GlobalRollbackResponse;
import io.seata.core.protocol.transaction.GlobalStatusRequest;
import io.seata.core.protocol.transaction.GlobalStatusResponse;
import io.seata.core.protocol.transaction.TCInboundHandler;
import io.seata.core.rpc.RpcContext; import io.seata.core.rpc.RpcContext;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHolder; import io.seata.server.session.SessionHolder;

@ -21,10 +21,9 @@ import com.beust.jcommander.ParameterException;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.server.env.ContainerHelper; import io.seata.server.env.ContainerHelper;
import io.seata.server.store.StoreConfig;
import static io.seata.common.DefaultValues.SERVER_DEFAULT_STORE_MODE;
import static io.seata.config.ConfigurationFactory.ENV_PROPERTY_KEY; import static io.seata.config.ConfigurationFactory.ENV_PROPERTY_KEY;
/** /**
@ -67,6 +66,10 @@ public class ParameterParser {
this.init(args); this.init(args);
} }
/**
* startup args > docker env
* @param args
*/
private void init(String[] args) { private void init(String[] args) {
try { try {
getCommandParameters(args); getCommandParameters(args);
@ -74,15 +77,7 @@ public class ParameterParser {
if (StringUtils.isNotBlank(seataEnv)) { if (StringUtils.isNotBlank(seataEnv)) {
System.setProperty(ENV_PROPERTY_KEY, seataEnv); System.setProperty(ENV_PROPERTY_KEY, seataEnv);
} }
if (StringUtils.isBlank(storeMode)) { StoreConfig.setStartupParameter(storeMode, sessionStoreMode, lockStoreMode);
storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);
}
if (StringUtils.isBlank(sessionStoreMode)) {
sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);
}
if (StringUtils.isBlank(lockStoreMode)) {
lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);
}
} catch (ParameterException e) { } catch (ParameterException e) {
printError(e); printError(e);
} }
@ -112,15 +107,6 @@ public class ParameterParser {
if (serverNode == null) { if (serverNode == null) {
serverNode = ContainerHelper.getServerNode(); serverNode = ContainerHelper.getServerNode();
} }
if (StringUtils.isBlank(storeMode)) {
storeMode = ContainerHelper.getStoreMode();
}
if (StringUtils.isBlank(sessionStoreMode)) {
sessionStoreMode = ContainerHelper.getSessionStoreMode();
}
if (StringUtils.isBlank(lockStoreMode)) {
lockStoreMode = ContainerHelper.getLockStoreMode();
}
} }
private void printError(ParameterException e) { private void printError(ParameterException e) {
@ -163,7 +149,7 @@ public class ParameterParser {
* @return the store mode * @return the store mode
*/ */
public String getLockStoreMode() { public String getLockStoreMode() {
return StringUtils.isNotEmpty(lockStoreMode) ? lockStoreMode : storeMode; return lockStoreMode;
} }
/** /**
@ -172,7 +158,7 @@ public class ParameterParser {
* @return the store mode * @return the store mode
*/ */
public String getSessionStoreMode() { public String getSessionStoreMode() {
return StringUtils.isNotEmpty(sessionStoreMode) ? sessionStoreMode : storeMode; return sessionStoreMode;
} }
/** /**

@ -15,24 +15,21 @@
*/ */
package io.seata.server; package io.seata.server;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.seata.common.XID; import io.seata.common.XID;
import io.seata.common.thread.NamedThreadFactory; import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil; import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.rpc.netty.NettyRemotingServer; import io.seata.core.rpc.netty.NettyRemotingServer;
import io.seata.core.rpc.netty.NettyServerConfig; import io.seata.core.rpc.netty.NettyServerConfig;
import io.seata.server.coordinator.DefaultCoordinator; import io.seata.server.coordinator.DefaultCoordinator;
import io.seata.server.lock.LockerManagerFactory; import io.seata.server.lock.LockerManagerFactory;
import io.seata.server.metrics.MetricsManager; import io.seata.server.metrics.MetricsManager;
import io.seata.server.session.SessionHolder; import io.seata.server.session.SessionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static io.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR; import static io.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR;
import static io.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS; import static io.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS;
@ -49,9 +46,6 @@ public class Server {
* @param args the input arguments * @param args the input arguments
*/ */
public static void start(String[] args) { public static void start(String[] args) {
// create logger
final Logger logger = LoggerFactory.getLogger(Server.class);
//initialize the parameter parser //initialize the parameter parser
//Note that the parameter parser should always be the first line to execute. //Note that the parameter parser should always be the first line to execute.
//Because, here we need to parse the parameters needed for startup. //Because, here we need to parse the parameters needed for startup.
@ -60,25 +54,11 @@ public class Server {
//initialize the metrics //initialize the metrics
MetricsManager.get().init(); MetricsManager.get().init();
System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(), ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
SessionHolder.init(parameterParser.getSessionStoreMode());
LockerManagerFactory.init(parameterParser.getLockStoreMode());
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here. //127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) { if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost()); XID.setIpAddress(parameterParser.getHost());
@ -90,6 +70,20 @@ public class Server {
XID.setIpAddress(NetUtil.getLocalIp()); XID.setIpAddress(NetUtil.getLocalIp());
} }
} }
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
XID.setPort(nettyRemotingServer.getListenPort());
UUIDGenerator.init(parameterParser.getServerNode());
//log store mode : file, db, redis
SessionHolder.init();
LockerManagerFactory.init();
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
nettyRemotingServer.init(); nettyRemotingServer.init();
} }
} }

@ -15,11 +15,9 @@
*/ */
package io.seata.server; package io.seata.server;
import java.util.Properties;
import io.seata.common.holder.ObjectHolder; import io.seata.common.holder.ObjectHolder;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.server.store.StoreConfig;
import io.seata.config.ConfigurationFactory;
import io.seata.spring.boot.autoconfigure.SeataCoreEnvironmentPostProcessor; import io.seata.spring.boot.autoconfigure.SeataCoreEnvironmentPostProcessor;
import io.seata.spring.boot.autoconfigure.SeataServerEnvironmentPostProcessor; import io.seata.spring.boot.autoconfigure.SeataServerEnvironmentPostProcessor;
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent; import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
@ -30,14 +28,11 @@ import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource; import org.springframework.core.env.PropertiesPropertySource;
import static io.seata.common.ConfigurationKeys.STORE_LOCK_MODE; import java.util.Properties;
import static io.seata.common.ConfigurationKeys.STORE_MODE;
import static io.seata.common.ConfigurationKeys.STORE_SESSION_MODE;
import static io.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; import static io.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static io.seata.common.DefaultValues.SERVICE_OFFSET_SPRING_BOOT; import static io.seata.common.DefaultValues.SERVICE_OFFSET_SPRING_BOOT;
import static io.seata.core.constants.ConfigurationKeys.ENV_SEATA_PORT_KEY; import static io.seata.core.constants.ConfigurationKeys.*;
import static io.seata.core.constants.ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL;
import static io.seata.core.constants.ConfigurationKeys.SERVER_SERVICE_PORT_CONFIG;
/** /**
* @author slievrly * @author slievrly
@ -61,12 +56,9 @@ public class ServerApplicationListener implements GenericApplicationListener {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment); ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment);
SeataCoreEnvironmentPostProcessor.init(); SeataCoreEnvironmentPostProcessor.init();
SeataServerEnvironmentPostProcessor.init(); SeataServerEnvironmentPostProcessor.init();
Configuration config = ConfigurationFactory.getInstance();
// Load by priority // Load by priority
System.setProperty("sessionMode", System.setProperty("sessionMode", StoreConfig.getSessionMode().getName());
config.getConfig(STORE_SESSION_MODE, config.getConfig(STORE_MODE, "file"))); System.setProperty("lockMode", StoreConfig.getLockMode().getName());
System.setProperty("lockMode",
config.getConfig(STORE_LOCK_MODE, config.getConfig(STORE_MODE, "file")));
String[] args = environmentPreparedEvent.getArgs(); String[] args = environmentPreparedEvent.getArgs();

@ -15,17 +15,16 @@
*/ */
package io.seata.server; package io.seata.server;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import io.seata.core.rpc.Disposable; import io.seata.core.rpc.Disposable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* @author spilledyear@outlook.com * @author spilledyear@outlook.com

@ -15,14 +15,15 @@
*/ */
package io.seata.server.console.controller; package io.seata.server.console.controller;
import javax.annotation.Resource;
import io.seata.server.console.service.BranchSessionService; import io.seata.server.console.service.BranchSessionService;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/** /**
* Branch Session Controller * Branch Session Controller
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
@RestController @RestController
@RequestMapping("console/branchSession") @RequestMapping("console/branchSession")

@ -15,21 +15,21 @@
*/ */
package io.seata.server.console.controller; package io.seata.server.console.controller;
import javax.annotation.Resource;
import io.seata.server.console.param.GlobalLockParam;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.vo.GlobalLockVO; import io.seata.server.console.param.GlobalLockParam;
import io.seata.server.console.service.GlobalLockService; import io.seata.server.console.service.GlobalLockService;
import io.seata.server.console.vo.GlobalLockVO;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ModelAttribute; import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/** /**
* Global Lock Controller * Global Lock Controller
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
@RestController @RestController
@RequestMapping("/api/v1/console/globalLock") @RequestMapping("/api/v1/console/globalLock")

@ -15,20 +15,20 @@
*/ */
package io.seata.server.console.controller; package io.seata.server.console.controller;
import javax.annotation.Resource;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.vo.GlobalSessionVO; import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.console.service.GlobalSessionService; import io.seata.server.console.service.GlobalSessionService;
import io.seata.server.console.vo.GlobalSessionVO;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ModelAttribute; import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/** /**
* Global Session Controller * Global Session Controller
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
@RestController @RestController
@RequestMapping("/api/v1/console/globalSession") @RequestMapping("/api/v1/console/globalSession")

@ -15,15 +15,6 @@
*/ */
package io.seata.server.console.impl.db; package io.seata.server.console.impl.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import io.seata.common.ConfigurationKeys; import io.seata.common.ConfigurationKeys;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
@ -39,13 +30,21 @@ import io.seata.server.console.vo.BranchSessionVO;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_BRANCH_TABLE; import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_BRANCH_TABLE;
/** /**
* Branch Session DataBase ServiceImpl * Branch Session DataBase ServiceImpl
* *
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: lvekee 734843455@qq.com * @author lvekee 734843455@qq.com
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,15 +15,6 @@
*/ */
package io.seata.server.console.impl.db; package io.seata.server.console.impl.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import io.seata.common.ConfigurationKeys; import io.seata.common.ConfigurationKeys;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
@ -41,14 +32,22 @@ import io.seata.server.console.vo.GlobalLockVO;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import static io.seata.common.DefaultValues.DEFAULT_LOCK_DB_TABLE; import static io.seata.common.DefaultValues.DEFAULT_LOCK_DB_TABLE;
/** /**
* Global Lock DB ServiceImpl * Global Lock DB ServiceImpl
* *
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: lvekee 734843455@qq.com * @author lvekee 734843455@qq.com
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,18 +15,6 @@
*/ */
package io.seata.server.console.impl.db; package io.seata.server.console.impl.db;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import javax.annotation.Resource;
import javax.sql.DataSource;
import io.seata.common.ConfigurationKeys; import io.seata.common.ConfigurationKeys;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
@ -46,13 +34,24 @@ import io.seata.server.console.vo.GlobalSessionVO;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_GLOBAL_TABLE; import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_GLOBAL_TABLE;
/** /**
* Global Session DataBase ServiceImpl * Global Session DataBase ServiceImpl
* *
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: lvekee 734843455@qq.com * @author lvekee 734843455@qq.com
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -16,16 +16,16 @@
package io.seata.server.console.impl.file; package io.seata.server.console.impl.file;
import io.seata.common.exception.NotSupportYetException; import io.seata.common.exception.NotSupportYetException;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.service.BranchSessionService; import io.seata.server.console.service.BranchSessionService;
import io.seata.server.console.vo.BranchSessionVO;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* Branch Session File ServiceImpl * Branch Session File ServiceImpl
* *
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,6 +15,20 @@
*/ */
package io.seata.server.console.impl.file; package io.seata.server.console.impl.file;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.console.result.PageResult;
import io.seata.core.lock.RowLock;
import io.seata.server.console.param.GlobalLockParam;
import io.seata.server.console.service.GlobalLockService;
import io.seata.server.console.vo.GlobalLockVO;
import io.seata.server.lock.LockerManagerFactory;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHolder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -22,21 +36,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.server.console.param.GlobalLockParam;
import io.seata.console.result.PageResult;
import io.seata.server.console.vo.GlobalLockVO;
import io.seata.core.lock.RowLock;
import io.seata.server.console.service.GlobalLockService;
import io.seata.server.lock.LockerManagerFactory;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHolder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import static io.seata.common.util.StringUtils.isBlank; import static io.seata.common.util.StringUtils.isBlank;
import static io.seata.server.console.vo.GlobalLockVO.convert; import static io.seata.server.console.vo.GlobalLockVO.convert;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;

@ -15,22 +15,22 @@
*/ */
package io.seata.server.console.impl.file; package io.seata.server.console.impl.file;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.vo.GlobalSessionVO; import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.console.service.GlobalSessionService; import io.seata.server.console.service.GlobalSessionService;
import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHolder; import io.seata.server.session.SessionHolder;
import io.seata.server.storage.SessionConverter; import io.seata.server.storage.SessionConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static io.seata.common.util.StringUtils.isBlank; import static io.seata.common.util.StringUtils.isBlank;
import static java.util.Objects.isNull; import static java.util.Objects.isNull;

@ -15,24 +15,25 @@
*/ */
package io.seata.server.console.impl.redis; package io.seata.server.console.impl.redis;
import java.util.ArrayList;
import java.util.List;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.core.store.BranchTransactionDO; import io.seata.core.store.BranchTransactionDO;
import io.seata.server.console.service.BranchSessionService; import io.seata.server.console.service.BranchSessionService;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.server.storage.redis.store.RedisTransactionStoreManager; import io.seata.server.storage.redis.store.RedisTransactionStoreManager;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/** /**
* Branch Session Redis ServiceImpl * Branch Session Redis ServiceImpl
* *
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: doubleDimple * @author doubleDimple
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,32 +15,32 @@
*/ */
package io.seata.server.console.impl.redis; package io.seata.server.console.impl.redis;
import io.seata.common.util.BeanUtils;
import io.seata.common.util.CollectionUtils;
import io.seata.console.result.PageResult;
import io.seata.server.console.param.GlobalLockParam;
import io.seata.server.console.service.GlobalLockService;
import io.seata.server.console.vo.GlobalLockVO;
import io.seata.server.storage.redis.JedisPooledFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import io.seata.common.util.CollectionUtils;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import io.seata.common.util.BeanUtils;
import io.seata.server.console.param.GlobalLockParam;
import io.seata.console.result.PageResult;
import io.seata.server.console.vo.GlobalLockVO;
import io.seata.server.console.service.GlobalLockService;
import io.seata.server.storage.redis.JedisPooledFactory;
import redis.clients.jedis.Jedis;
import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR; import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR;
import static io.seata.common.exception.FrameworkErrorCode.ParameterRequired; import static io.seata.common.exception.FrameworkErrorCode.ParameterRequired;
import static io.seata.common.util.StringUtils.isNotBlank; import static io.seata.common.util.StringUtils.isNotBlank;
import static io.seata.console.result.PageResult.checkPage; import static io.seata.console.result.PageResult.checkPage;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX; import static io.seata.core.constants.RedisKeyConstants.*;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX;
import static io.seata.core.constants.RedisKeyConstants.SPLIT;
/** /**
* Global Lock Redis Service Impl * Global Lock Redis Service Impl
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: doubleDimple * @author doubleDimple
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,15 +15,12 @@
*/ */
package io.seata.server.console.impl.redis; package io.seata.server.console.impl.redis;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.console.service.GlobalSessionService; import io.seata.server.console.service.GlobalSessionService;
import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionCondition;
import io.seata.server.storage.redis.store.RedisTransactionStoreManager; import io.seata.server.storage.redis.store.RedisTransactionStoreManager;
@ -31,6 +28,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static io.seata.common.exception.FrameworkErrorCode.ParameterRequired; import static io.seata.common.exception.FrameworkErrorCode.ParameterRequired;
import static io.seata.common.util.StringUtils.isBlank; import static io.seata.common.util.StringUtils.isBlank;
import static io.seata.common.util.StringUtils.isNotBlank; import static io.seata.common.util.StringUtils.isNotBlank;
@ -39,8 +41,8 @@ import static io.seata.server.storage.SessionConverter.convertToGlobalSessionVo;
/** /**
* Global Session Redis ServiceImpl * Global Session Redis ServiceImpl
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author: doubleDimple * @author doubleDimple
*/ */
@Component @Component
@org.springframework.context.annotation.Configuration @org.springframework.context.annotation.Configuration

@ -15,13 +15,13 @@
*/ */
package io.seata.server.console.param; package io.seata.server.console.param;
import java.io.Serializable;
import io.seata.console.param.BaseParam; import io.seata.console.param.BaseParam;
import java.io.Serializable;
/** /**
* @description: Global lock param * Global lock param
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
public class GlobalLockParam extends BaseParam implements Serializable { public class GlobalLockParam extends BaseParam implements Serializable {

@ -15,13 +15,13 @@
*/ */
package io.seata.server.console.param; package io.seata.server.console.param;
import java.io.Serializable;
import io.seata.console.param.BaseParam; import io.seata.console.param.BaseParam;
import java.io.Serializable;
/** /**
* @description: Global session param * Global session param
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
public class GlobalSessionParam extends BaseParam implements Serializable { public class GlobalSessionParam extends BaseParam implements Serializable {

@ -15,8 +15,8 @@
*/ */
package io.seata.server.console.service; package io.seata.server.console.service;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.console.result.PageResult; import io.seata.console.result.PageResult;
import io.seata.server.console.vo.BranchSessionVO;
/** /**
* Branch session service * Branch session service

@ -15,9 +15,9 @@
*/ */
package io.seata.server.console.service; package io.seata.server.console.service;
import io.seata.console.result.PageResult;
import io.seata.server.console.param.GlobalLockParam; import io.seata.server.console.param.GlobalLockParam;
import io.seata.server.console.vo.GlobalLockVO; import io.seata.server.console.vo.GlobalLockVO;
import io.seata.console.result.PageResult;
/** /**

@ -15,9 +15,9 @@
*/ */
package io.seata.server.console.service; package io.seata.server.console.service;
import io.seata.console.result.PageResult;
import io.seata.server.console.param.GlobalSessionParam; import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.console.vo.GlobalSessionVO; import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.console.result.PageResult;
/** /**
* Global session service * Global session service

@ -15,16 +15,16 @@
*/ */
package io.seata.server.console.vo; package io.seata.server.console.vo;
import io.seata.core.constants.ServerTableColumnsName;
import java.sql.Date; import java.sql.Date;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Objects; import java.util.Objects;
import io.seata.core.constants.ServerTableColumnsName;
/** /**
* BranchSessionVO * BranchSessionVO
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
public class BranchSessionVO { public class BranchSessionVO {

@ -15,6 +15,10 @@
*/ */
package io.seata.server.console.vo; package io.seata.server.console.vo;
import io.seata.common.util.CollectionUtils;
import io.seata.core.constants.ServerTableColumnsName;
import io.seata.core.lock.RowLock;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
@ -22,13 +26,9 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import io.seata.common.util.CollectionUtils;
import io.seata.core.constants.ServerTableColumnsName;
import io.seata.core.lock.RowLock;
/** /**
* GlobalLockVO * GlobalLockVO
* @author: zhongxiang.wang * @author zhongxiang.wang
* @author miaoxueyu * @author miaoxueyu
*/ */
public class GlobalLockVO { public class GlobalLockVO {

@ -15,16 +15,16 @@
*/ */
package io.seata.server.console.vo; package io.seata.server.console.vo;
import io.seata.core.constants.ServerTableColumnsName;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Set; import java.util.Set;
import io.seata.core.constants.ServerTableColumnsName;
/** /**
* GlobalSessionVO * GlobalSessionVO
* @author: zhongxiang.wang * @author zhongxiang.wang
*/ */
public class GlobalSessionVO { public class GlobalSessionVO {

@ -15,9 +15,6 @@
*/ */
package io.seata.server.coordinator; package io.seata.server.coordinator;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import io.seata.core.context.RootContext; import io.seata.core.context.RootContext;
import io.seata.core.exception.BranchTransactionException; import io.seata.core.exception.BranchTransactionException;
import io.seata.core.exception.GlobalTransactionException; import io.seata.core.exception.GlobalTransactionException;
@ -41,12 +38,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import static io.seata.core.exception.TransactionExceptionCode.BranchTransactionNotExist; import java.io.IOException;
import static io.seata.core.exception.TransactionExceptionCode.FailedToAddBranch; import java.util.concurrent.TimeoutException;
import static io.seata.core.exception.TransactionExceptionCode.GlobalTransactionNotActive;
import static io.seata.core.exception.TransactionExceptionCode.GlobalTransactionStatusInvalid; import static io.seata.core.exception.TransactionExceptionCode.*;
import static io.seata.core.exception.TransactionExceptionCode.FailedToSendBranchCommitRequest;
import static io.seata.core.exception.TransactionExceptionCode.FailedToSendBranchRollbackRequest;
/** /**
* The type abstract core. * The type abstract core.

@ -15,14 +15,6 @@
*/ */
package io.seata.server.coordinator; package io.seata.server.coordinator;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.seata.common.thread.NamedThreadFactory; import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
@ -34,25 +26,7 @@ import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.AbstractMessage; import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage; import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.transaction.AbstractTransactionRequestToTC; import io.seata.core.protocol.transaction.*;
import io.seata.core.protocol.transaction.AbstractTransactionResponse;
import io.seata.core.protocol.transaction.BranchRegisterRequest;
import io.seata.core.protocol.transaction.BranchRegisterResponse;
import io.seata.core.protocol.transaction.BranchReportRequest;
import io.seata.core.protocol.transaction.BranchReportResponse;
import io.seata.core.protocol.transaction.GlobalBeginRequest;
import io.seata.core.protocol.transaction.GlobalBeginResponse;
import io.seata.core.protocol.transaction.GlobalCommitRequest;
import io.seata.core.protocol.transaction.GlobalCommitResponse;
import io.seata.core.protocol.transaction.GlobalLockQueryRequest;
import io.seata.core.protocol.transaction.GlobalLockQueryResponse;
import io.seata.core.protocol.transaction.GlobalReportRequest;
import io.seata.core.protocol.transaction.GlobalReportResponse;
import io.seata.core.protocol.transaction.GlobalRollbackRequest;
import io.seata.core.protocol.transaction.GlobalRollbackResponse;
import io.seata.core.protocol.transaction.GlobalStatusRequest;
import io.seata.core.protocol.transaction.GlobalStatusResponse;
import io.seata.core.protocol.transaction.UndoLogDeleteRequest;
import io.seata.core.rpc.Disposable; import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.RemotingServer; import io.seata.core.rpc.RemotingServer;
import io.seata.core.rpc.RpcContext; import io.seata.core.rpc.RpcContext;
@ -61,20 +35,21 @@ import io.seata.core.rpc.netty.ChannelManager;
import io.seata.core.rpc.netty.NettyRemotingServer; import io.seata.core.rpc.netty.NettyRemotingServer;
import io.seata.server.AbstractTCInboundHandler; import io.seata.server.AbstractTCInboundHandler;
import io.seata.server.metrics.MetricsPublisher; import io.seata.server.metrics.MetricsPublisher;
import io.seata.server.session.BranchSession; import io.seata.server.session.*;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionHelper;
import io.seata.server.session.SessionHolder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import static io.seata.common.Constants.ASYNC_COMMITTING; import java.time.Duration;
import static io.seata.common.Constants.RETRY_COMMITTING; import java.util.Collection;
import static io.seata.common.Constants.RETRY_ROLLBACKING; import java.util.Map;
import static io.seata.common.Constants.TX_TIMEOUT_CHECK; import java.util.concurrent.ArrayBlockingQueue;
import static io.seata.common.Constants.UNDOLOG_DELETE; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static io.seata.common.Constants.*;
import static io.seata.common.DefaultValues.*;
/** /**
* The type Default coordinator. * The type Default coordinator.
@ -89,30 +64,31 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
* The constant COMMITTING_RETRY_PERIOD. * The constant COMMITTING_RETRY_PERIOD.
*/ */
protected static final long COMMITTING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.COMMITING_RETRY_PERIOD, protected static final long COMMITTING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.COMMITING_RETRY_PERIOD,
1000L); DEFAULT_COMMITING_RETRY_PERIOD);
/** /**
* The constant ASYNC_COMMITTING_RETRY_PERIOD. * The constant ASYNC_COMMITTING_RETRY_PERIOD.
*/ */
protected static final long ASYNC_COMMITTING_RETRY_PERIOD = CONFIG.getLong( protected static final long ASYNC_COMMITTING_RETRY_PERIOD = CONFIG.getLong(
ConfigurationKeys.ASYN_COMMITING_RETRY_PERIOD, 1000L); ConfigurationKeys.ASYNC_COMMITING_RETRY_PERIOD, DEFAULT_ASYNC_COMMITTING_RETRY_PERIOD);
/** /**
* The constant ROLLBACKING_RETRY_PERIOD. * The constant ROLLBACKING_RETRY_PERIOD.
*/ */
protected static final long ROLLBACKING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD, protected static final long ROLLBACKING_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.ROLLBACKING_RETRY_PERIOD,
1000L); DEFAULT_ROLLBACKING_RETRY_PERIOD);
/** /**
* The constant TIMEOUT_RETRY_PERIOD. * The constant TIMEOUT_RETRY_PERIOD.
*/ */
protected static final long TIMEOUT_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.TIMEOUT_RETRY_PERIOD, 1000L); protected static final long TIMEOUT_RETRY_PERIOD = CONFIG.getLong(ConfigurationKeys.TIMEOUT_RETRY_PERIOD,
DEFAULT_TIMEOUT_RETRY_PERIOD);
/** /**
* The Transaction undo log delete period. * The Transaction undo log delete period.
*/ */
protected static final long UNDO_LOG_DELETE_PERIOD = CONFIG.getLong( protected static final long UNDO_LOG_DELETE_PERIOD = CONFIG.getLong(
ConfigurationKeys.TRANSACTION_UNDO_LOG_DELETE_PERIOD, 24 * 60 * 60 * 1000); ConfigurationKeys.TRANSACTION_UNDO_LOG_DELETE_PERIOD, DEFAULT_UNDO_LOG_DELETE_PERIOD);
/** /**
* The Transaction undo log delay delete period * The Transaction undo log delay delete period
@ -132,13 +108,13 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration( private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, 100); ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, DEFAULT_MAX_COMMIT_RETRY_TIMEOUT);
private static final Duration MAX_ROLLBACK_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration( private static final Duration MAX_ROLLBACK_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, 100); ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT);
private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean( private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, false); ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE);
private final ScheduledThreadPoolExecutor retryRollbacking = private final ScheduledThreadPoolExecutor retryRollbacking =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1)); new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1));
@ -357,7 +333,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> { SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try { try {
// prevent repeated rollback // prevent repeated rollback
if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking) if (rollbackingSession.getStatus() == GlobalStatus.Rollbacking
&& !rollbackingSession.isDeadSession()) { && !rollbackingSession.isDeadSession()) {
// The function of this 'return' is 'continue'. // The function of this 'return' is 'continue'.
return; return;
@ -370,10 +346,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession); SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid()); LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
SessionHelper.endRollbackFailed(rollbackingSession, true); SessionHelper.endRollbackFailed(rollbackingSession, true, true);
// rollback retry timeout event
MetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);
//The function of this 'return' is 'continue'. //The function of this 'return' is 'continue'.
return; return;
@ -401,7 +374,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
SessionHelper.forEach(committingSessions, committingSession -> { SessionHelper.forEach(committingSessions, committingSession -> {
try { try {
// prevent repeated commit // prevent repeated commit
if (committingSession.getStatus().equals(GlobalStatus.Committing) if (committingSession.getStatus() == GlobalStatus.Committing
&& !committingSession.isDeadSession()) { && !committingSession.isDeadSession()) {
// The function of this 'return' is 'continue'. // The function of this 'return' is 'continue'.
return; return;
@ -412,7 +385,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
LOGGER.error("Global transaction commit retry timeout and has removed [{}]", committingSession.getXid()); LOGGER.error("Global transaction commit retry timeout and has removed [{}]", committingSession.getXid());
// commit retry timeout event // commit retry timeout event
MetricsPublisher.postSessionDoneEvent(committingSession, GlobalStatus.CommitRetryTimeout, true, false); SessionHelper.endCommitFailed(committingSession, true, true);
//The function of this 'return' is 'continue'. //The function of this 'return' is 'continue'.
return; return;

@ -15,10 +15,6 @@
*/ */
package io.seata.server.coordinator; package io.seata.server.coordinator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.seata.common.DefaultValues; import io.seata.common.DefaultValues;
import io.seata.common.exception.NotSupportYetException; import io.seata.common.exception.NotSupportYetException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
@ -40,6 +36,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static io.seata.core.constants.ConfigurationKeys.XAER_NOTA_RETRY_TIMEOUT; import static io.seata.core.constants.ConfigurationKeys.XAER_NOTA_RETRY_TIMEOUT;
import static io.seata.server.session.BranchSessionHandler.CONTINUE; import static io.seata.server.session.BranchSessionHandler.CONTINUE;
@ -128,8 +128,7 @@ public class DefaultCore implements Core {
@Override @Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException { throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid()); MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
@ -141,12 +140,20 @@ public class DefaultCore implements Core {
return session.getXid(); return session.getXid();
} }
@Override @Override
public GlobalStatus commit(String xid) throws TransactionException { public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid); GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) { if (globalSession == null) {
return GlobalStatus.Finished; return GlobalStatus.Finished;
} }
if (globalSession.isTimeout()) {
LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());
return GlobalStatus.TimeoutRollbacking;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus // just lock changeStatus
@ -251,14 +258,14 @@ public class DefaultCore implements Core {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid()); LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false; return false;
} }
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
} }
// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is // if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
// executed to improve concurrency performance, and the global transaction ends.. // executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty()) { if (success && globalSession.getBranchSessions().isEmpty()) {
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
SessionHelper.endCommitted(globalSession, retrying); SessionHelper.endCommitted(globalSession, retrying);
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid()); LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
} }
@ -388,5 +395,4 @@ public class DefaultCore implements Core {
return false; return false;
} }
} }
} }

@ -15,14 +15,6 @@
*/ */
package io.seata.server.env; package io.seata.server.env;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.MapUtil; import io.seata.common.util.MapUtil;
import io.seata.common.util.NumberUtils; import io.seata.common.util.NumberUtils;
@ -30,6 +22,10 @@ import io.seata.common.util.StringUtils;
import org.springframework.util.ResourceUtils; import org.springframework.util.ResourceUtils;
import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.Yaml;
import java.io.*;
import java.util.Map;
import java.util.Properties;
/** /**
* @author wang.liang * @author wang.liang
*/ */
@ -54,8 +50,8 @@ public class PortHelper {
* get config from configFile * get config from configFile
* -Dspring.config.location > classpath:application.properties > classpath:application.yml * -Dspring.config.location > classpath:application.properties > classpath:application.yml
* *
* @return * @return the port
* @throws IOException * @throws IOException the io exception
*/ */
public static int getPortFromConfigFile() throws IOException { public static int getPortFromConfigFile() throws IOException {

@ -15,9 +15,6 @@
*/ */
package io.seata.server.lock; package io.seata.server.lock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import io.seata.common.XID; import io.seata.common.XID;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
@ -29,6 +26,10 @@ import io.seata.server.session.BranchSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** /**
* The type Abstract lock manager. * The type Abstract lock manager.
* *

@ -15,14 +15,14 @@
*/ */
package io.seata.server.lock; package io.seata.server.lock;
import java.util.List;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.lock.RowLock; import io.seata.core.lock.RowLock;
import io.seata.core.model.LockStatus; import io.seata.core.model.LockStatus;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import java.util.List;
/** /**
* The interface Lock manager. * The interface Lock manager.
* *

@ -16,13 +16,10 @@
package io.seata.server.lock; package io.seata.server.lock;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys; import io.seata.server.store.StoreConfig;
import io.seata.core.store.StoreMode; import io.seata.server.store.StoreConfig.LockMode;
import static io.seata.common.DefaultValues.SERVER_DEFAULT_STORE_MODE;
/** /**
* The type Lock manager factory. * The type Lock manager factory.
@ -54,16 +51,16 @@ public class LockerManagerFactory {
init(null); init(null);
} }
public static void init(String lockMode) { public static void init(LockMode lockMode) {
if (LOCK_MANAGER == null) { if (LOCK_MANAGER == null) {
synchronized (LockerManagerFactory.class) { synchronized (LockerManagerFactory.class) {
if (LOCK_MANAGER == null) { if (LOCK_MANAGER == null) {
if (StringUtils.isBlank(lockMode)) { if (null == lockMode) {
lockMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, lockMode = StoreConfig.getLockMode();
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
} }
if (StoreMode.contains(lockMode)) { //if not exist the lock mode, throw exception
LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode); if (null != StoreConfig.StoreMode.get(lockMode.name())) {
LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode.getName());
} }
} }
} }

@ -23,8 +23,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Distributed locker factory
* @author zhongxiang.wang * @author zhongxiang.wang
* @description Distributed locker factory
*/ */
public class DistributedLockerFactory { public class DistributedLockerFactory {

@ -22,9 +22,8 @@ import ch.qos.logback.core.CoreConstants;
/** /**
* {@link ExtendedThrowableProxyConverter} that adds some additional whitespace around the * {@link ExtendedThrowableProxyConverter} that adds some additional whitespace around the
* stack trace. * stack trace.
* * Copied from spring-boot-xxx.jar by wang.liang
* @author Phillip Webb * @author Phillip Webb
* @origin Copied from spring-boot-xxx.jar by wang.liang
*/ */
public class ExtendedWhitespaceThrowableProxyConverter extends ExtendedThrowableProxyConverter { public class ExtendedWhitespaceThrowableProxyConverter extends ExtendedThrowableProxyConverter {

@ -15,12 +15,12 @@
*/ */
package io.seata.server.logging.logback.appender; package io.seata.server.logging.logback.appender;
import java.util.ArrayList;
import net.logstash.logback.composite.JsonProvider; import net.logstash.logback.composite.JsonProvider;
import net.logstash.logback.composite.JsonProviders; import net.logstash.logback.composite.JsonProviders;
import net.logstash.logback.encoder.LogstashEncoder; import net.logstash.logback.encoder.LogstashEncoder;
import java.util.ArrayList;
/** /**
* The type Enhanced logstash encoder * The type Enhanced logstash encoder
* *

@ -15,8 +15,8 @@
*/ */
package io.seata.server.metrics; package io.seata.server.metrics;
import io.seata.metrics.IdConstants;
import io.seata.metrics.Id; import io.seata.metrics.Id;
import io.seata.metrics.IdConstants;
/** /**
* Constants for meter id in tc * Constants for meter id in tc

@ -25,6 +25,8 @@ import io.seata.server.event.EventBusManager;
import java.util.List; import java.util.List;
import static io.seata.common.DefaultValues.DEFAULT_METRICS_ENABLED;
/** /**
* Metrics manager for init * Metrics manager for init
* *
@ -47,7 +49,7 @@ public class MetricsManager {
public void init() { public void init() {
boolean enabled = ConfigurationFactory.getInstance().getBoolean( boolean enabled = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.METRICS_PREFIX + ConfigurationKeys.METRICS_ENABLED, false); ConfigurationKeys.METRICS_PREFIX + ConfigurationKeys.METRICS_ENABLED, DEFAULT_METRICS_ENABLED);
if (enabled) { if (enabled) {
registry = RegistryFactory.getInstance(); registry = RegistryFactory.getInstance();
if (registry != null) { if (registry != null) {

@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.seata.server.metrics; package io.seata.server.metrics;
import io.seata.core.event.EventBus; import io.seata.core.event.EventBus;

@ -15,11 +15,6 @@
*/ */
package io.seata.server.metrics; package io.seata.server.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import io.seata.core.event.GlobalTransactionEvent; import io.seata.core.event.GlobalTransactionEvent;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
@ -28,10 +23,12 @@ import io.seata.server.event.EventBusManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static io.seata.metrics.IdConstants.APP_ID_KEY; import java.util.HashMap;
import static io.seata.metrics.IdConstants.GROUP_KEY; import java.util.Map;
import static io.seata.metrics.IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY; import java.util.concurrent.TimeUnit;
import static io.seata.metrics.IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY; import java.util.function.Consumer;
import static io.seata.metrics.IdConstants.*;
/** /**
* Event subscriber for metrics * Event subscriber for metrics
@ -208,7 +205,7 @@ public class MetricsSubscriber {
/** /**
* PMD check * PMD check
* SuppressWarnings("checkstyle:EqualsHashCode") * SuppressWarnings("checkstyle:EqualsHashCode")
* @return * @return the hash code
*/ */
@Override @Override
public int hashCode() { public int hashCode() {

@ -15,11 +15,6 @@
*/ */
package io.seata.server.session; package io.seata.server.session;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.seata.common.util.CompressUtil; import io.seata.common.util.CompressUtil;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchStatus;
@ -32,6 +27,11 @@ import io.seata.server.store.StoreConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static io.seata.core.model.LockStatus.Locked; import static io.seata.core.model.LockStatus.Locked;

@ -15,17 +15,6 @@
*/ */
package io.seata.server.session; package io.seata.server.session;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.seata.common.Constants; import io.seata.common.Constants;
import io.seata.common.DefaultValues; import io.seata.common.DefaultValues;
import io.seata.common.XID; import io.seata.common.XID;
@ -46,9 +35,13 @@ import io.seata.server.store.StoreConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static io.seata.core.model.GlobalStatus.AsyncCommitting; import java.nio.ByteBuffer;
import static io.seata.core.model.GlobalStatus.CommitRetrying; import java.util.*;
import static io.seata.core.model.GlobalStatus.Committing; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static io.seata.core.model.GlobalStatus.*;
/** /**
* The type Global session. * The type Global session.
@ -756,7 +749,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
public void queueToRetryRollback() throws TransactionException { public void queueToRetryRollback() throws TransactionException {
this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager()); this.addSessionLifecycleListener(SessionHolder.getRetryRollbackingSessionManager());
GlobalStatus currentStatus = this.getStatus(); GlobalStatus currentStatus = this.getStatus();
if (SessionHelper.isTimeoutGlobalStatus(currentStatus)) { if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
this.setStatus(GlobalStatus.TimeoutRollbackRetrying); this.setStatus(GlobalStatus.TimeoutRollbackRetrying);
} else { } else {
this.setStatus(GlobalStatus.RollbackRetrying); this.setStatus(GlobalStatus.RollbackRetrying);

@ -89,6 +89,7 @@ public class SessionCondition {
*/ */
public void setStatus(GlobalStatus status) { public void setStatus(GlobalStatus status) {
this.status = status; this.status = status;
this.statuses = new GlobalStatus[] {status};
} }
/** /**

@ -15,12 +15,7 @@
*/ */
package io.seata.server.session; package io.seata.server.session;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys; import io.seata.core.constants.ConfigurationKeys;
@ -28,15 +23,22 @@ import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchType; import io.seata.core.model.BranchType;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.store.StoreMode;
import io.seata.metrics.IdConstants; import io.seata.metrics.IdConstants;
import io.seata.server.UUIDGenerator; import io.seata.server.UUIDGenerator;
import io.seata.server.coordinator.DefaultCoordinator; import io.seata.server.coordinator.DefaultCoordinator;
import io.seata.server.metrics.MetricsPublisher; import io.seata.server.metrics.MetricsPublisher;
import io.seata.server.store.StoreConfig;
import io.seata.server.store.StoreConfig.SessionMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import static io.seata.common.DefaultValues.DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE;
/** /**
* The type Session helper. * The type Session helper.
* *
@ -51,16 +53,14 @@ public class SessionHelper {
private static final Configuration CONFIG = ConfigurationFactory.getInstance(); private static final Configuration CONFIG = ConfigurationFactory.getInstance();
private static final Boolean ENABLE_BRANCH_ASYNC_REMOVE = CONFIG.getBoolean( private static final Boolean ENABLE_BRANCH_ASYNC_REMOVE = CONFIG.getBoolean(
ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, false); ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE);
/** /**
* The instance of DefaultCoordinator * The instance of DefaultCoordinator
*/ */
private static final DefaultCoordinator COORDINATOR = DefaultCoordinator.getInstance(); private static final DefaultCoordinator COORDINATOR = DefaultCoordinator.getInstance();
private static final boolean DELAY_HANDLE_SESSION = private static final boolean DELAY_HANDLE_SESSION = StoreConfig.getSessionMode() != SessionMode.FILE;
!StringUtils.equalsIgnoreCase(ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_SESSION_MODE,
ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_MODE)), StoreMode.FILE.getName());
private SessionHelper() { private SessionHelper() {
} }
@ -146,7 +146,24 @@ public class SessionHelper {
* @throws TransactionException the transaction exception * @throws TransactionException the transaction exception
*/ */
public static void endCommitFailed(GlobalSession globalSession, boolean retryGlobal) throws TransactionException { public static void endCommitFailed(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {
endCommitFailed(globalSession, retryGlobal, false);
}
/**
* End commit failed.
*
* @param globalSession the global session
* @param retryGlobal the retry global
* @param isRetryTimeout is retry timeout
* @throws TransactionException the transaction exception
*/
public static void endCommitFailed(GlobalSession globalSession, boolean retryGlobal, boolean isRetryTimeout)
throws TransactionException {
if (isRetryTimeout) {
globalSession.changeGlobalStatus(GlobalStatus.CommitRetryTimeout);
} else {
globalSession.changeGlobalStatus(GlobalStatus.CommitFailed); globalSession.changeGlobalStatus(GlobalStatus.CommitFailed);
}
LOGGER.error("The Global session {} has changed the status to {}, need to be handled it manually.", LOGGER.error("The Global session {} has changed the status to {}, need to be handled it manually.",
globalSession.getXid(), globalSession.getStatus()); globalSession.getXid(), globalSession.getStatus());
@ -164,22 +181,27 @@ public class SessionHelper {
public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException { public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {
if (retryGlobal || !DELAY_HANDLE_SESSION) { if (retryGlobal || !DELAY_HANDLE_SESSION) {
long beginTime = System.currentTimeMillis(); long beginTime = System.currentTimeMillis();
boolean timeoutDone = false;
GlobalStatus currentStatus = globalSession.getStatus(); GlobalStatus currentStatus = globalSession.getStatus();
if (currentStatus == GlobalStatus.TimeoutRollbacking) {
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.TimeoutRollbacked, false, false);
timeoutDone = true;
}
boolean retryBranch = boolean retryBranch =
currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying; currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying;
if (isTimeoutGlobalStatus(currentStatus)) { if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked); globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
} else { } else {
globalSession.changeGlobalStatus(GlobalStatus.Rollbacked); globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
} }
globalSession.end(); globalSession.end();
if (!DELAY_HANDLE_SESSION) { if (!DELAY_HANDLE_SESSION && !timeoutDone) {
MetricsPublisher.postSessionDoneEvent(globalSession, false, false); MetricsPublisher.postSessionDoneEvent(globalSession, false, false);
} }
MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY, true, MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY, true,
beginTime, retryBranch); beginTime, retryBranch);
} else { } else {
MetricsPublisher.postSessionDoneEvent(globalSession, false, false); MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Rollbacked, false, false);
} }
} }
@ -191,8 +213,22 @@ public class SessionHelper {
* @throws TransactionException the transaction exception * @throws TransactionException the transaction exception
*/ */
public static void endRollbackFailed(GlobalSession globalSession, boolean retryGlobal) throws TransactionException { public static void endRollbackFailed(GlobalSession globalSession, boolean retryGlobal) throws TransactionException {
endRollbackFailed(globalSession, retryGlobal, false);
}
/**
* End rollback failed.
*
* @param globalSession the global session
* @param retryGlobal the retry global
* @param isRetryTimeout is retry timeout
* @throws TransactionException the transaction exception
*/
public static void endRollbackFailed(GlobalSession globalSession, boolean retryGlobal, boolean isRetryTimeout) throws TransactionException {
GlobalStatus currentStatus = globalSession.getStatus(); GlobalStatus currentStatus = globalSession.getStatus();
if (isTimeoutGlobalStatus(currentStatus)) { if (isRetryTimeout) {
globalSession.changeGlobalStatus(GlobalStatus.RollbackRetryTimeout);
} else if (SessionStatusValidator.isTimeoutGlobalStatus(currentStatus)) {
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed); globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed);
} else { } else {
globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed); globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed);
@ -202,13 +238,6 @@ public class SessionHelper {
MetricsPublisher.postSessionDoneEvent(globalSession, retryGlobal, false); MetricsPublisher.postSessionDoneEvent(globalSession, retryGlobal, false);
} }
public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.TimeoutRollbacked
|| status == GlobalStatus.TimeoutRollbackFailed
|| status == GlobalStatus.TimeoutRollbacking
|| status == GlobalStatus.TimeoutRollbackRetrying;
}
/** /**
* Foreach global sessions. * Foreach global sessions.
* *

@ -15,16 +15,7 @@
*/ */
package io.seata.server.session; package io.seata.server.session;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import io.seata.common.ConfigurationKeys; import io.seata.common.ConfigurationKeys;
import io.seata.core.model.LockStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.seata.common.XID; import io.seata.common.XID;
import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
@ -35,12 +26,21 @@ import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.model.LockStatus;
import io.seata.core.store.DistributedLockDO; import io.seata.core.store.DistributedLockDO;
import io.seata.core.store.DistributedLocker; import io.seata.core.store.DistributedLocker;
import io.seata.server.lock.distributed.DistributedLockerFactory; import io.seata.server.lock.distributed.DistributedLockerFactory;
import io.seata.core.store.StoreMode; import io.seata.server.store.StoreConfig;
import io.seata.server.store.StoreConfig.SessionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.seata.common.DefaultValues.SERVER_DEFAULT_STORE_MODE; import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static io.seata.common.DefaultValues.DEFAULT_DISTRIBUTED_LOCK_EXPIRE_TIME;
/** /**
* The type Session holder. * The type Session holder.
@ -80,7 +80,7 @@ public class SessionHolder {
/** /**
* The redis distributed lock expire time * The redis distributed lock expire time
*/ */
private static long DISTRIBUTED_LOCK_EXPIRE_TIME = CONFIG.getLong(ConfigurationKeys.DISTRIBUTED_LOCK_EXPIRE_TIME, 10000); private static long DISTRIBUTED_LOCK_EXPIRE_TIME = CONFIG.getLong(ConfigurationKeys.DISTRIBUTED_LOCK_EXPIRE_TIME, DEFAULT_DISTRIBUTED_LOCK_EXPIRE_TIME);
private static SessionManager ROOT_SESSION_MANAGER; private static SessionManager ROOT_SESSION_MANAGER;
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER; private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
@ -89,56 +89,57 @@ public class SessionHolder {
private static DistributedLocker DISTRIBUTED_LOCKER; private static DistributedLocker DISTRIBUTED_LOCKER;
public static void init() {
init(null);
}
/** /**
* Init. * Init.
* *
* @param mode the store mode: file, db, redis * @param sessionMode the store mode: file, db, redis
* @throws IOException the io exception * @throws IOException the io exception
*/ */
public static void init(String mode) { public static void init(SessionMode sessionMode) {
if (StringUtils.isBlank(mode)) { if (null == sessionMode) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, sessionMode = StoreConfig.getSessionMode();
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
} }
StoreMode storeMode = StoreMode.get(mode); if (SessionMode.DB.equals(sessionMode)) {
if (StoreMode.DB.equals(storeMode)) { ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName());
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName(),
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME}); new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName(),
new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME}); new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName(),
new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName()); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(SessionMode.DB.getName());
} else if (StoreMode.FILE.equals(storeMode)) { } else if (SessionMode.FILE.equals(sessionMode)) {
String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
DEFAULT_SESSION_STORE_FILE_DIR); DEFAULT_SESSION_STORE_FILE_DIR);
if (StringUtils.isBlank(sessionStorePath)) { if (StringUtils.isBlank(sessionStorePath)) {
throw new StoreException("the {store.file.dir} is empty."); throw new StoreException("the {store.file.dir} is empty.");
} }
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(), ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.FILE.getName(),
new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath}); new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER; ASYNC_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER; RETRY_COMMITTING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER; RETRY_ROLLBACKING_SESSION_MANAGER = ROOT_SESSION_MANAGER;
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName()); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(SessionMode.FILE.getName());
} else if (StoreMode.REDIS.equals(storeMode)) { } else if (SessionMode.REDIS.equals(sessionMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName()); ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.REDIS.getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME}); SessionMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME}); SessionMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); SessionMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName()); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(SessionMode.REDIS.getName());
} else { } else {
// unknown store // unknown store
throw new IllegalArgumentException("unknown store mode:" + mode); throw new IllegalArgumentException("unknown store mode:" + sessionMode.getName());
} }
reload(storeMode); reload(sessionMode);
} }
//region reload //region reload
@ -146,15 +147,15 @@ public class SessionHolder {
/** /**
* Reload. * Reload.
* *
* @param storeMode the mode of store * @param sessionMode the mode of store
*/ */
protected static void reload(StoreMode storeMode) { protected static void reload(SessionMode sessionMode) {
if (ROOT_SESSION_MANAGER instanceof Reloadable) { if (ROOT_SESSION_MANAGER instanceof Reloadable) {
((Reloadable) ROOT_SESSION_MANAGER).reload(); ((Reloadable) ROOT_SESSION_MANAGER).reload();
} }
if (storeMode == StoreMode.FILE) { if (SessionMode.FILE.equals(sessionMode)) {
Collection<GlobalSession> allSessions = ROOT_SESSION_MANAGER.allSessions(); Collection<GlobalSession> allSessions = ROOT_SESSION_MANAGER.allSessions();
if (CollectionUtils.isNotEmpty(allSessions)) { if (CollectionUtils.isNotEmpty(allSessions)) {
for (GlobalSession globalSession : allSessions) { for (GlobalSession globalSession : allSessions) {

@ -15,13 +15,14 @@
*/ */
package io.seata.server.session; package io.seata.server.session;
import java.util.Collection;
import java.util.List;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchStatus;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.rpc.Disposable; import io.seata.core.rpc.Disposable;
import java.util.Collection;
import java.util.List;
/** /**
* The interface Session manager. * The interface Session manager.
* *
@ -119,6 +120,7 @@ public interface SessionManager extends SessionLifecycleListener, Disposable {
* @param globalSession the global session * @param globalSession the global session
* @param lockCallable the lock Callable * @param lockCallable the lock Callable
* @return the value * @return the value
* @throws TransactionException the transaction exception
*/ */
<T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable) <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable)
throws TransactionException; throws TransactionException;

@ -0,0 +1,87 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.server.session;
import io.seata.core.model.GlobalStatus;
/**
* The type change status validator.
*
* @author Bughue
*/
public class SessionStatusValidator {
/**
* is timeout global status
*
* @param status the global session
*/
public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.TimeoutRollbacked
|| status == GlobalStatus.TimeoutRollbackFailed
|| status == GlobalStatus.TimeoutRollbacking
|| status == GlobalStatus.TimeoutRollbackRetrying;
}
/**
* is rollback global status
*
* @param status the global session
*/
public static boolean isRollbackGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.Rollbacking
|| status == GlobalStatus.RollbackRetrying
|| status == GlobalStatus.Rollbacked
|| status == GlobalStatus.RollbackFailed
|| status == GlobalStatus.RollbackRetryTimeout;
}
/**
* is commit global status
*
* @param status the global session
*/
public static boolean isCommitGlobalStatus(GlobalStatus status) {
return status == GlobalStatus.Committing
|| status == GlobalStatus.AsyncCommitting
|| status == GlobalStatus.CommitRetrying
|| status == GlobalStatus.Committed
|| status == GlobalStatus.CommitFailed
|| status == GlobalStatus.CommitRetryTimeout;
}
/**
* check the relation of before status and after status
*
* @param before the global session
* @param after the global session
*/
public static boolean validateUpdateStatus(GlobalStatus before, GlobalStatus after) {
if (isTimeoutGlobalStatus(before) && isCommitGlobalStatus(after)) {
return false;
}
if (isCommitGlobalStatus(before) && isTimeoutGlobalStatus(after)) {
return false;
}
if (isRollbackGlobalStatus(before) && isCommitGlobalStatus(after)) {
return false;
}
if (isCommitGlobalStatus(before) && isRollbackGlobalStatus(after)) {
return false;
}
return true;
}
}

@ -15,25 +15,22 @@
*/ */
package io.seata.server.storage; package io.seata.server.storage;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Collections;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType; import io.seata.core.model.BranchType;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.store.BranchTransactionDO; import io.seata.core.store.BranchTransactionDO;
import io.seata.core.store.GlobalTransactionDO; import io.seata.core.store.GlobalTransactionDO;
import io.seata.server.console.vo.BranchSessionVO;
import io.seata.server.console.vo.GlobalSessionVO;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.store.SessionStorable; import io.seata.server.store.SessionStorable;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import java.util.*;
/** /**
* The session converter * The session converter
* *

@ -16,23 +16,13 @@
package io.seata.server.storage.db.lock; package io.seata.server.storage.db.lock;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import javax.sql.DataSource;
import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
import io.seata.common.loader.Scope; import io.seata.common.loader.Scope;
import io.seata.common.util.IOUtil; import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.config.*;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationChangeEvent;
import io.seata.config.ConfigurationChangeListener;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys; import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.constants.ServerTableColumnsName; import io.seata.core.constants.ServerTableColumnsName;
import io.seata.core.store.DistributedLockDO; import io.seata.core.store.DistributedLockDO;
@ -42,6 +32,13 @@ import io.seata.core.store.db.sql.distributed.lock.DistributedLockSqlFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;
import static io.seata.core.constants.ConfigurationKeys.DISTRIBUTED_LOCK_DB_TABLE; import static io.seata.core.constants.ConfigurationKeys.DISTRIBUTED_LOCK_DB_TABLE;
/** /**

@ -15,7 +15,6 @@
*/ */
package io.seata.server.storage.db.lock; package io.seata.server.storage.db.lock;
import javax.sql.DataSource;
import io.seata.common.executor.Initialize; import io.seata.common.executor.Initialize;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
@ -28,6 +27,8 @@ import io.seata.server.lock.AbstractLockManager;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import javax.sql.DataSource;
/** /**
* The type db lock manager. * The type db lock manager.
* *

@ -15,8 +15,6 @@
*/ */
package io.seata.server.storage.db.lock; package io.seata.server.storage.db.lock;
import java.util.List;
import javax.sql.DataSource;
import io.seata.common.exception.DataAccessException; import io.seata.common.exception.DataAccessException;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
@ -25,6 +23,9 @@ import io.seata.core.lock.RowLock;
import io.seata.core.model.LockStatus; import io.seata.core.model.LockStatus;
import io.seata.core.store.LockStore; import io.seata.core.store.LockStore;
import javax.sql.DataSource;
import java.util.List;
/** /**
* The type Data base locker. * The type Data base locker.
* *
@ -89,7 +90,7 @@ public class DataBaseLocker extends AbstractLocker {
@Override @Override
public boolean releaseLock(String xid, Long branchId) { public boolean releaseLock(String xid, Long branchId) {
try { try {
return lockStore.unLock(xid, branchId); return lockStore.unLock(branchId);
} catch (StoreException e) { } catch (StoreException e) {
throw e; throw e;
} catch (Exception t) { } catch (Exception t) {

@ -15,16 +15,6 @@
*/ */
package io.seata.server.storage.db.lock; package io.seata.server.storage.db.lock;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import io.seata.common.exception.DataAccessException; import io.seata.common.exception.DataAccessException;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
@ -43,6 +33,13 @@ import io.seata.core.store.db.sql.lock.LockStoreSqlFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static io.seata.common.DefaultValues.DEFAULT_LOCK_DB_TABLE; import static io.seata.common.DefaultValues.DEFAULT_LOCK_DB_TABLE;
import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast; import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast;
@ -239,27 +236,6 @@ public class LockStoreDataBaseDAO implements LockStore {
return true; return true;
} }
@Override
public boolean unLock(String xid, Long branchId) {
Connection conn = null;
PreparedStatement ps = null;
try {
conn = lockStoreDataSource.getConnection();
conn.setAutoCommit(true);
//batch release lock by branch
String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByBranch(lockTable);
ps = conn.prepareStatement(batchDeleteSQL);
ps.setString(1, xid);
ps.setLong(2, branchId);
ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
return true;
}
@Override @Override
public boolean unLock(String xid) { public boolean unLock(String xid) {
Connection conn = null; Connection conn = null;
@ -280,6 +256,26 @@ public class LockStoreDataBaseDAO implements LockStore {
return true; return true;
} }
@Override
public boolean unLock(Long branchId) {
Connection conn = null;
PreparedStatement ps = null;
try {
conn = lockStoreDataSource.getConnection();
conn.setAutoCommit(true);
//batch release lock by branchId
String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSqlByBranchId(lockTable);
ps = conn.prepareStatement(batchDeleteSQL);
ps.setLong(1, branchId);
ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
return true;
}
@Override @Override
public boolean isLockable(List<LockDO> lockDOs) { public boolean isLockable(List<LockDO> lockDOs) {
Connection conn = null; Connection conn = null;
@ -335,6 +331,9 @@ public class LockStoreDataBaseDAO implements LockStore {
ps.setInt(8, LockStatus.Locked.getCode()); ps.setInt(8, LockStatus.Locked.getCode());
return ps.executeUpdate() > 0; return ps.executeUpdate() > 0;
} catch (SQLException e) { } catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
return false;
}
throw new StoreException(e); throw new StoreException(e);
} finally { } finally {
IOUtil.close(ps); IOUtil.close(ps);
@ -348,7 +347,7 @@ public class LockStoreDataBaseDAO implements LockStore {
* @param lockDOs the lock do list * @param lockDOs the lock do list
* @return the boolean * @return the boolean
*/ */
protected boolean doAcquireLocks(Connection conn, List<LockDO> lockDOs) { protected boolean doAcquireLocks(Connection conn, List<LockDO> lockDOs) throws SQLException {
PreparedStatement ps = null; PreparedStatement ps = null;
try { try {
//insert //insert
@ -366,10 +365,12 @@ public class LockStoreDataBaseDAO implements LockStore {
ps.addBatch(); ps.addBatch();
} }
return ps.executeBatch().length == lockDOs.size(); return ps.executeBatch().length == lockDOs.size();
} catch (SQLException e) { } catch (SQLIntegrityConstraintViolationException e) {
LOGGER.error("Global lock batch acquire error: {}", e.getMessage(), e); LOGGER.error("Global lock batch acquire error: {}", e.getMessage(), e);
//return false,let the caller go to conn.rollabck() //return false,let the caller go to conn.rollabck()
return false; return false;
} catch (SQLException e) {
throw e;
} finally { } finally {
IOUtil.close(ps); IOUtil.close(ps);
} }

@ -15,8 +15,6 @@
*/ */
package io.seata.server.storage.db.session; package io.seata.server.storage.db.session;
import java.util.Collection;
import java.util.List;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.executor.Initialize; import io.seata.common.executor.Initialize;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
@ -25,16 +23,15 @@ import io.seata.common.util.StringUtils;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchStatus;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.server.session.AbstractSessionManager; import io.seata.server.session.*;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionHolder;
import io.seata.server.storage.db.store.DataBaseTransactionStoreManager; import io.seata.server.storage.db.store.DataBaseTransactionStoreManager;
import io.seata.server.store.TransactionStoreManager.LogOperation; import io.seata.server.store.TransactionStoreManager.LogOperation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
/** /**
* The Data base session manager. * The Data base session manager.
* *
@ -108,7 +105,7 @@ public class DataBaseSessionManager extends AbstractSessionManager
* 1. rootSessionManager remove normal globalSession * 1. rootSessionManager remove normal globalSession
* 2. retryCommitSessionManager and retryRollbackSessionManager remove retry expired globalSession * 2. retryCommitSessionManager and retryRollbackSessionManager remove retry expired globalSession
* @param session the session * @param session the session
* @throws TransactionException * @throws TransactionException the transaction exception
*/ */
@Override @Override
public void removeGlobalSession(GlobalSession session) throws TransactionException { public void removeGlobalSession(GlobalSession session) throws TransactionException {

@ -15,14 +15,6 @@
*/ */
package io.seata.server.storage.db.store; package io.seata.server.storage.db.store;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
@ -37,10 +29,14 @@ import io.seata.core.store.LogStore;
import io.seata.core.store.db.DataSourceProvider; import io.seata.core.store.db.DataSourceProvider;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionCondition;
import io.seata.server.storage.SessionConverter;
import io.seata.server.store.AbstractTransactionStoreManager; import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable; import io.seata.server.store.SessionStorable;
import io.seata.server.store.TransactionStoreManager; import io.seata.server.store.TransactionStoreManager;
import io.seata.server.storage.SessionConverter;
import javax.sql.DataSource;
import java.util.*;
import java.util.stream.Collectors;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_LOG_QUERY_LIMIT; import static io.seata.core.constants.RedisKeyConstants.DEFAULT_LOG_QUERY_LIMIT;
@ -165,6 +161,11 @@ public class DataBaseTransactionStoreManager extends AbstractTransactionStoreMan
return getGlobalSession(globalTransactionDO, branchTransactionDOs); return getGlobalSession(globalTransactionDO, branchTransactionDOs);
} }
@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return readSession(new GlobalStatus[] {GlobalStatus.Begin}, withBranchSessions);
}
/** /**
* Read session list. * Read session list.
* *

@ -15,15 +15,6 @@
*/ */
package io.seata.server.storage.db.store; package io.seata.server.storage.db.store;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import io.seata.common.exception.DataAccessException; import io.seata.common.exception.DataAccessException;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.util.IOUtil; import io.seata.common.util.IOUtil;
@ -39,6 +30,11 @@ import io.seata.core.store.db.sql.log.LogStoreSqlsFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_BRANCH_TABLE; import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_BRANCH_TABLE;
import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_GLOBAL_TABLE; import static io.seata.common.DefaultValues.DEFAULT_STORE_DB_GLOBAL_TABLE;

@ -15,14 +15,14 @@
*/ */
package io.seata.server.storage.file; package io.seata.server.storage.file;
import java.nio.ByteBuffer;
import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.store.SessionStorable; import io.seata.server.store.SessionStorable;
import io.seata.server.store.TransactionStoreManager.LogOperation; import io.seata.server.store.TransactionStoreManager.LogOperation;
import java.nio.ByteBuffer;
/** /**
* The type Transaction write store. * The type Transaction write store.
* *

@ -15,8 +15,6 @@
*/ */
package io.seata.server.storage.file.lock; package io.seata.server.storage.file.lock;
import java.util.List;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.lock.Locker; import io.seata.core.lock.Locker;
@ -25,6 +23,8 @@ import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import org.slf4j.MDC; import org.slf4j.MDC;
import java.util.List;
import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID; import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID;
/** /**

@ -15,12 +15,6 @@
*/ */
package io.seata.server.storage.file.lock; package io.seata.server.storage.file.lock;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import io.seata.common.exception.FrameworkException; import io.seata.common.exception.FrameworkException;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
@ -31,6 +25,11 @@ import io.seata.core.lock.RowLock;
import io.seata.core.model.LockStatus; import io.seata.core.model.LockStatus;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast; import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflictFailFast;
@ -196,7 +195,7 @@ public class FileLocker extends AbstractLocker {
/** /**
* Because bucket lock map will be key of HashMap(lockHolder), however {@link ConcurrentHashMap} overwrites * Because bucket lock map will be key of HashMap(lockHolder), however {@link ConcurrentHashMap} overwrites
* {@link Object##hashCode()} and {@link Object##equals(Object)}, that leads to hash key conflict in lockHolder. * {@link Object#hashCode()} and {@link Object#equals(Object)}, that leads to hash key conflict in lockHolder.
* We define a {@link BucketLockMap} to hold the ConcurrentHashMap(bucketLockMap) and replace it as key of * We define a {@link BucketLockMap} to hold the ConcurrentHashMap(bucketLockMap) and replace it as key of
* HashMap(lockHolder). * HashMap(lockHolder).
*/ */

@ -15,19 +15,6 @@
*/ */
package io.seata.server.storage.file.session; package io.seata.server.storage.file.session;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
import io.seata.common.loader.Scope; import io.seata.common.loader.Scope;
@ -37,11 +24,7 @@ import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys; import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.server.session.AbstractSessionManager; import io.seata.server.session.*;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.Reloadable;
import io.seata.server.session.SessionCondition;
import io.seata.server.storage.file.ReloadableStore; import io.seata.server.storage.file.ReloadableStore;
import io.seata.server.storage.file.TransactionWriteStore; import io.seata.server.storage.file.TransactionWriteStore;
import io.seata.server.storage.file.store.FileTransactionStoreManager; import io.seata.server.storage.file.store.FileTransactionStoreManager;
@ -49,6 +32,13 @@ import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable; import io.seata.server.store.SessionStorable;
import io.seata.server.store.TransactionStoreManager; import io.seata.server.store.TransactionStoreManager;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static io.seata.common.DefaultValues.DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE;
/** /**
* The type File based session manager. * The type File based session manager.
@ -59,7 +49,7 @@ import io.seata.server.store.TransactionStoreManager;
public class FileSessionManager extends AbstractSessionManager implements Reloadable { public class FileSessionManager extends AbstractSessionManager implements Reloadable {
private static final int READ_SIZE = ConfigurationFactory.getInstance().getInt( private static final int READ_SIZE = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, 100); ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE);
/** /**
* The Session map. * The Session map.
*/ */

@ -15,6 +15,24 @@
*/ */
package io.seata.server.storage.file.store; package io.seata.server.storage.file.store;
import io.seata.common.exception.StoreException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionManager;
import io.seata.server.storage.file.FlushDiskMode;
import io.seata.server.storage.file.ReloadableStore;
import io.seata.server.storage.file.TransactionWriteStore;
import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.StoreConfig;
import io.seata.server.store.TransactionStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
@ -24,32 +42,10 @@ import java.nio.file.Files;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import io.seata.common.exception.StoreException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionManager;
import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.storage.file.FlushDiskMode;
import io.seata.server.storage.file.ReloadableStore;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.StoreConfig;
import io.seata.server.store.TransactionStoreManager;
import io.seata.server.storage.file.TransactionWriteStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID; import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID;
/** /**
@ -335,7 +331,9 @@ public class FileTransactionStoreManager extends AbstractTransactionStoreManager
} }
} }
try { try {
if (currFileChannel.isOpen()) {
currFileChannel.force(true); currFileChannel.force(true);
}
} catch (IOException e) { } catch (IOException e) {
LOGGER.error("fileChannel force error: {}", e.getMessage(), e); LOGGER.error("fileChannel force error: {}", e.getMessage(), e);
} }

@ -15,10 +15,6 @@
*/ */
package io.seata.server.storage.redis; package io.seata.server.storage.redis;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import io.seata.common.exception.RedisException; import io.seata.common.exception.RedisException;
import io.seata.common.util.ConfigTools; import io.seata.common.util.ConfigTools;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
@ -27,11 +23,11 @@ import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys; import io.seata.core.constants.ConfigurationKeys;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis; import redis.clients.jedis.*;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolAbstract; import java.util.Arrays;
import redis.clients.jedis.JedisPoolConfig; import java.util.HashSet;
import redis.clients.jedis.JedisSentinelPool; import java.util.Set;
/** /**
* @author funkye * @author funkye

@ -27,7 +27,7 @@ import redis.clients.jedis.Transaction;
import redis.clients.jedis.params.SetParams; import redis.clients.jedis.params.SetParams;
/** /**
* @description Redis distributed lock * Redis distributed lock
* @author zhongxiang.wang * @author zhongxiang.wang
*/ */
@LoadLevel(name = "redis", scope = Scope.SINGLETON) @LoadLevel(name = "redis", scope = Scope.SINGLETON)
@ -39,18 +39,15 @@ public class RedisDistributedLocker implements DistributedLocker {
/** /**
* Acquire the distributed lock * Acquire the distributed lock
* *
* @param distributedLockDO * @param distributedLockDO the distributed lock info
* @return * @return the distributed lock info
*/ */
@Override @Override
public boolean acquireLock(DistributedLockDO distributedLockDO) { public boolean acquireLock(DistributedLockDO distributedLockDO) {
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
//Don't need retry,if can't acquire the lock,let the other get the lock //Don't need retry,if can't acquire the lock,let the other get the lock
String result = jedis.set(distributedLockDO.getLockKey(), distributedLockDO.getLockValue(), SetParams.setParams().nx().px(distributedLockDO.getExpireTime())); String result = jedis.set(distributedLockDO.getLockKey(), distributedLockDO.getLockValue(), SetParams.setParams().nx().px(distributedLockDO.getExpireTime()));
if (SUCCESS.equalsIgnoreCase(result)) { return SUCCESS.equalsIgnoreCase(result);
return true;
}
return false;
} catch (Exception ex) { } catch (Exception ex) {
LOGGER.error("The {} acquired the {} distributed lock failed.", distributedLockDO.getLockValue(), distributedLockDO.getLockKey(), ex); LOGGER.error("The {} acquired the {} distributed lock failed.", distributedLockDO.getLockValue(), distributedLockDO.getLockKey(), ex);
return false; return false;
@ -61,8 +58,8 @@ public class RedisDistributedLocker implements DistributedLocker {
/** /**
* Release the distributed lock * Release the distributed lock
* *
* @param distributedLockDO * @param distributedLockDO the distributed lock info
* @return * @return the boolean
*/ */
@Override @Override
public boolean releaseLock(DistributedLockDO distributedLockDO) { public boolean releaseLock(DistributedLockDO distributedLockDO) {

@ -15,20 +15,6 @@
*/ */
package io.seata.server.storage.redis.lock; package io.seata.server.storage.redis.lock;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.io.FileLoader; import io.seata.common.io.FileLoader;
@ -46,6 +32,10 @@ import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline; import redis.clients.jedis.Pipeline;
import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR; import static io.seata.common.Constants.ROW_LOCK_KEY_SPLIT_CHAR;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX; import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_GLOBAL_LOCK_PREFIX;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX; import static io.seata.core.constants.RedisKeyConstants.DEFAULT_REDIS_SEATA_ROW_LOCK_PREFIX;
@ -359,7 +349,7 @@ public class RedisLocker extends AbstractLocker {
Map<String, String> rowKeyMap = jedis.hgetAll(xidLockKey); Map<String, String> rowKeyMap = jedis.hgetAll(xidLockKey);
rowKeyMap.forEach((branch, rowKey) -> rowKeys.add(rowKey)); rowKeyMap.forEach((branch, rowKey) -> rowKeys.add(rowKey));
} else { } else {
rowKeys.addAll(jedis.hmget(xidLockKey, branchId.toString())); rowKeys.add(jedis.hget(xidLockKey, branchId.toString()));
} }
if (CollectionUtils.isNotEmpty(rowKeys)) { if (CollectionUtils.isNotEmpty(rowKeys)) {
Pipeline pipelined = jedis.pipelined(); Pipeline pipelined = jedis.pipelined();

@ -15,9 +15,6 @@
*/ */
package io.seata.server.storage.redis.session; package io.seata.server.storage.redis.session;
import java.util.Collection;
import java.util.List;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.executor.Initialize; import io.seata.common.executor.Initialize;
import io.seata.common.loader.LoadLevel; import io.seata.common.loader.LoadLevel;
@ -26,16 +23,15 @@ import io.seata.common.util.StringUtils;
import io.seata.core.exception.TransactionException; import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus; import io.seata.core.model.BranchStatus;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.server.session.AbstractSessionManager; import io.seata.server.session.*;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionHolder;
import io.seata.server.storage.redis.store.RedisTransactionStoreManager; import io.seata.server.storage.redis.store.RedisTransactionStoreManager;
import io.seata.server.store.TransactionStoreManager.LogOperation; import io.seata.server.store.TransactionStoreManager.LogOperation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
/** /**
* @author funkye * @author funkye
*/ */
@ -167,16 +163,15 @@ public class RedisSessionManager extends AbstractSessionManager
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) { if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting)); return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) { } else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing})); return findGlobalSessions(new SessionCondition(GlobalStatus.CommitRetrying, GlobalStatus.Committing));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) { } else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying, return findGlobalSessions(new SessionCondition(GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying})); GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying));
} else { } else {
// all data // all data
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.UnKnown, GlobalStatus.Begin, return findGlobalSessions(new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking,
GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));
GlobalStatus.AsyncCommitting}));
} }
} }

@ -15,21 +15,6 @@
*/ */
package io.seata.server.storage.redis.store; package io.seata.server.storage.redis.store;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.util.Optional;
import java.util.Collections;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.seata.common.XID; import io.seata.common.XID;
import io.seata.common.exception.RedisException; import io.seata.common.exception.RedisException;
@ -37,29 +22,32 @@ import io.seata.common.exception.StoreException;
import io.seata.common.util.BeanUtils; import io.seata.common.util.BeanUtils;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
import io.seata.server.console.param.GlobalSessionParam; import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.GlobalStatus; import io.seata.core.model.GlobalStatus;
import io.seata.core.store.BranchTransactionDO; import io.seata.core.store.BranchTransactionDO;
import io.seata.core.store.GlobalTransactionDO; import io.seata.core.store.GlobalTransactionDO;
import io.seata.server.console.param.GlobalSessionParam;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionStatusValidator;
import io.seata.server.storage.SessionConverter; import io.seata.server.storage.SessionConverter;
import io.seata.server.storage.redis.JedisPooledFactory; import io.seata.server.storage.redis.JedisPooledFactory;
import io.seata.server.store.AbstractTransactionStoreManager; import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable; import io.seata.server.store.SessionStorable;
import io.seata.server.store.TransactionStoreManager; import io.seata.server.store.TransactionStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline; import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction; import redis.clients.jedis.Transaction;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.seata.common.ConfigurationKeys.STORE_REDIS_QUERY_LIMIT; import static io.seata.common.ConfigurationKeys.STORE_REDIS_QUERY_LIMIT;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_BRANCH_XID; import static io.seata.core.constants.RedisKeyConstants.*;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_GLOBAL_XID;
import static io.seata.core.constants.RedisKeyConstants.DEFAULT_LOG_QUERY_LIMIT;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_BRANCH_STATUS;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_GLOBAL_STATUS;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_GLOBAL_GMT_MODIFIED;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_BRANCH_GMT_MODIFIED;
import static io.seata.core.constants.RedisKeyConstants.REDIS_KEY_BRANCH_APPLICATION_DATA;
/** /**
* The redis transaction store manager * The redis transaction store manager
@ -84,6 +72,9 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
/**the prefix of the global transaction status*/ /**the prefix of the global transaction status*/
private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_"; private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_";
/**the key of global transaction status for begin*/
private static final String REDIS_SEATA_BEGIN_TRANSACTIONS_KEY = "SEATA_BEGIN_TRANSACTIONS";
private static volatile RedisTransactionStoreManager instance; private static volatile RedisTransactionStoreManager instance;
private static final String OK = "OK"; private static final String OK = "OK";
@ -120,13 +111,6 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
initGlobalMap(); initGlobalMap();
initBranchMap(); initBranchMap();
logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT); logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
/**
* redis mode: if DEFAULT_LOG_QUERY_LIMIT < STORE_REDIS_QUERY_LIMIT get DEFAULT_LOG_QUERY_LIMIT if
* DEFAULT_LOG_QUERY_LIMIT >= STORE_REDIS_QUERY_LIMIT get STORE_REDIS_QUERY_LIMIT
*/
if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) {
logQueryLimit = DEFAULT_LOG_QUERY_LIMIT;
}
} }
/** /**
@ -143,7 +127,6 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
/** /**
* init globalMap * init globalMap
* *
* @return void
*/ */
public void initGlobalMap() { public void initGlobalMap() {
if (CollectionUtils.isEmpty(branchMap)) { if (CollectionUtils.isEmpty(branchMap)) {
@ -158,7 +141,6 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
/** /**
* init branchMap * init branchMap
* *
* @return void
*/ */
public void initBranchMap() { public void initBranchMap() {
if (CollectionUtils.isEmpty(branchMap)) { if (CollectionUtils.isEmpty(branchMap)) {
@ -264,7 +246,10 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
globalTransactionDO.setGmtCreate(now); globalTransactionDO.setGmtCreate(now);
globalTransactionDO.setGmtModified(now); globalTransactionDO.setGmtModified(now);
pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO)); pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid()); String xid = globalTransactionDO.getXid();
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
pipelined.zadd(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY,
globalTransactionDO.getBeginTime() + globalTransactionDO.getTimeout(), globalKey);
pipelined.sync(); pipelined.sync();
return true; return true;
} catch (Exception ex) { } catch (Exception ex) {
@ -293,6 +278,10 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
try (Pipeline pipelined = jedis.pipelined()) { try (Pipeline pipelined = jedis.pipelined()) {
pipelined.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, globalTransactionDO.getXid()); pipelined.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, globalTransactionDO.getXid());
pipelined.del(globalKey); pipelined.del(globalKey);
if (GlobalStatus.Begin.getCode() == globalTransactionDO.getStatus()
|| GlobalStatus.UnKnown.getCode() == globalTransactionDO.getStatus()) {
pipelined.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
}
pipelined.sync(); pipelined.sync();
} }
return true; return true;
@ -326,6 +315,12 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
jedis.unwatch(); jedis.unwatch();
return true; return true;
} }
GlobalStatus before = GlobalStatus.get(Integer.parseInt(previousStatus));
GlobalStatus after = GlobalStatus.get(globalTransactionDO.getStatus());
if (!SessionStatusValidator.validateUpdateStatus(before, after)) {
throw new StoreException("Illegal changing of global status, update global transaction failed."
+ " beforeStatus[" + before.name() + "] cannot be changed to afterStatus[" + after.name() + "]");
}
String previousGmtModified = statusAndGmtModified.get(1); String previousGmtModified = statusAndGmtModified.get(1);
Transaction multi = jedis.multi(); Transaction multi = jedis.multi();
@ -335,6 +330,7 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
multi.hmset(globalKey, map); multi.hmset(globalKey, map);
multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid); multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid);
multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid); multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
multi.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
List<Object> exec = multi.exec(); List<Object> exec = multi.exec();
if (CollectionUtils.isEmpty(exec)) { if (CollectionUtils.isEmpty(exec)) {
//The data has changed by another tc, so we still think the modification is successful. //The data has changed by another tc, so we still think the modification is successful.
@ -420,10 +416,8 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
*/ */
@Override @Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) { public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {
List<GlobalSession> globalSessions = Collections.synchronizedList(new ArrayList<>()); List<GlobalSession> globalSessions = Collections.synchronizedList(new ArrayList<>());
List<String> statusKeys = convertStatusKeys(statuses); List<String> statusKeys = convertStatusKeys(statuses);
Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys); Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) { if (targetMap.size() == 0 || logQueryLimit <= 0) {
return globalSessions; return globalSessions;
@ -446,6 +440,45 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
return globalSessions; return globalSessions;
} }
@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
List<GlobalSession> list = Collections.emptyList();
List<String> statusKeys = convertStatusKeys(GlobalStatus.Begin);
Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) {
return list;
}
final long countGlobalSessions = targetMap.values().stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum();
// queryCount
final long queryCount = Math.min(logQueryLimit, countGlobalSessions);
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
Set<String> values =
jedis.zrangeByScore(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, 0, System.currentTimeMillis(), 0,
(int) queryCount);
List<Map<String, String>> rep;
try (Pipeline pipeline = jedis.pipelined()) {
for (String value : values) {
pipeline.hgetAll(value);
}
rep = (List<Map<String, String>>) (List) pipeline.syncAndReturnAll();
}
list = rep.stream().map(map -> {
GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
GlobalTransactionDO.class);
if (globalTransactionDO != null) {
String xid = globalTransactionDO.getXid();
List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
if (withBranchSessions) {
branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
}
return list;
}
/** /**
* get everyone keys limit * get everyone keys limit
* *
@ -483,9 +516,11 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
} }
return globalSessions; return globalSessions;
} else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) { } else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
if (sessionCondition.getStatuses().length == 1 && sessionCondition.getStatuses()[0] == GlobalStatus.Begin) {
return this.readSortByTimeoutBeginSessions(!sessionCondition.isLazyLoadBranch());
} else {
return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch()); return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
} else if (sessionCondition.getStatus() != null) { }
return readSession(new GlobalStatus[] {sessionCondition.getStatus()}, !sessionCondition.isLazyLoadBranch());
} }
return null; return null;
} }
@ -657,7 +692,7 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
* @return * @return
*/ */
private Map<String, Integer> calculateStatuskeysHasData(List<String> statusKeys) { private Map<String, Integer> calculateStatuskeysHasData(List<String> statusKeys) {
Map<String, Integer> resultMap = Collections.synchronizedMap(new LinkedHashMap<>()); Map<String, Integer> resultMap = new LinkedHashMap<>();
Map<String, Integer> keysMap = new HashMap<>(statusKeys.size()); Map<String, Integer> keysMap = new HashMap<>(statusKeys.size());
try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
statusKeys.forEach(key -> pipelined.llen(key)); statusKeys.forEach(key -> pipelined.llen(key));
@ -699,7 +734,7 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
} }
} }
private List<String> convertStatusKeys(GlobalStatus[] statuses) { private List<String> convertStatusKeys(GlobalStatus... statuses) {
List<String> statusKeys = new ArrayList<>(); List<String> statusKeys = new ArrayList<>();
for (int i = 0; i < statuses.length; i++) { for (int i = 0; i < statuses.length; i++) {
statusKeys.add(buildGlobalStatus(statuses[i].getCode())); statusKeys.add(buildGlobalStatus(statuses[i].getCode()));
@ -725,7 +760,9 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
} }
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
for (String key : targetMap.keySet()) { Iterator<Map.Entry<String, Integer>> iterator = targetMap.entrySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next().getKey();
final long sum = listList.stream().mapToLong(List::size).sum(); final long sum = listList.stream().mapToLong(List::size).sum();
final long diffCount = queryCount - sum; final long diffCount = queryCount - sum;
if (diffCount <= 0) { if (diffCount <= 0) {
@ -739,12 +776,10 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
list = jedis.lrange(key, start, end); list = jedis.lrange(key, start, end);
} }
if (list.size() > 0 && sum < queryCount) { if (list.size() > 0) {
listList.add(list); listList.add(list);
} else { } else {
if (list.size() == 0) { iterator.remove();
targetMap.remove(key);
}
} }
} }
} }

@ -19,6 +19,7 @@ import io.seata.core.model.GlobalStatus;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionCondition;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@ -38,14 +39,19 @@ public abstract class AbstractTransactionStoreManager implements TransactionStor
return null; return null;
} }
@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return Collections.emptyList();
}
@Override @Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) { public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {
return null; return Collections.emptyList();
} }
@Override @Override
public List<GlobalSession> readSession(SessionCondition sessionCondition) { public List<GlobalSession> readSession(SessionCondition sessionCondition) {
return null; return Collections.emptyList();
} }
@Override @Override

@ -50,6 +50,8 @@ public class DruidDataSourceProvider extends AbstractDataSourceProvider {
ds.setMaxPoolPreparedStatementPerConnectionSize(20); ds.setMaxPoolPreparedStatementPerConnectionSize(20);
ds.setValidationQuery(getValidationQuery(getDBType())); ds.setValidationQuery(getValidationQuery(getDBType()));
ds.setDefaultAutoCommit(true); ds.setDefaultAutoCommit(true);
// fix issue 5030
ds.setUseOracleImplicitCache(false);
return ds; return ds;
} }
} }

@ -15,20 +15,44 @@
*/ */
package io.seata.server.store; package io.seata.server.store;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration; import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory; import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.server.env.ContainerHelper;
import io.seata.server.storage.file.FlushDiskMode; import io.seata.server.storage.file.FlushDiskMode;
import static io.seata.common.DefaultValues.SERVER_DEFAULT_STORE_MODE;
import static io.seata.core.constants.ConfigurationKeys.STORE_FILE_PREFIX; import static io.seata.core.constants.ConfigurationKeys.STORE_FILE_PREFIX;
/** /**
* @author lizhao * @author lizhao
*/ */
public class StoreConfig { public class StoreConfig {
private static final Configuration CONFIGURATION = ConfigurationFactory.getInstance(); private static final Configuration CONFIGURATION = ConfigurationFactory.getInstance();
private static StoreMode storeMode;
private static SessionMode sessionMode;
private static LockMode lockMode;
/**
* set storeMode sessionMode lockMode from StartupParameter
*
* @param storeMode storeMode
* @param sessionMode sessionMode
* @param lockMode lockMode
*/
public static void setStartupParameter(String storeMode, String sessionMode, String lockMode) {
if (StringUtils.isNotBlank(storeMode)) {
StoreConfig.storeMode = StoreMode.get(storeMode);
}
if (StringUtils.isNotBlank(sessionMode)) {
StoreConfig.sessionMode = SessionMode.get(sessionMode);
}
if (StringUtils.isNotBlank(lockMode)) {
StoreConfig.lockMode = LockMode.get(lockMode);
}
}
/** /**
* Default 16kb. * Default 16kb.
@ -60,4 +84,165 @@ public class StoreConfig {
public static FlushDiskMode getFlushDiskMode() { public static FlushDiskMode getFlushDiskMode() {
return FlushDiskMode.findDiskMode(CONFIGURATION.getConfig(STORE_FILE_PREFIX + "flushDiskMode")); return FlushDiskMode.findDiskMode(CONFIGURATION.getConfig(STORE_FILE_PREFIX + "flushDiskMode"));
} }
/**
* only for inner call
*
* @return
*/
private static StoreMode getStoreMode() {
//startup
if (null != storeMode) {
return storeMode;
}
//env
String storeModeEnv = ContainerHelper.getStoreMode();
if (StringUtils.isNotBlank(storeModeEnv)) {
return StoreMode.get(storeModeEnv);
}
//config
String storeModeConfig = CONFIGURATION.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);
return StoreMode.get(storeModeConfig);
}
public static SessionMode getSessionMode() {
//startup
if (null != sessionMode) {
return sessionMode;
}
//env
String sessionModeEnv = ContainerHelper.getSessionStoreMode();
if (StringUtils.isNotBlank(sessionModeEnv)) {
return SessionMode.get(sessionModeEnv);
}
//config
String sessionModeConfig = CONFIGURATION.getConfig(ConfigurationKeys.STORE_SESSION_MODE);
if (StringUtils.isNotBlank(sessionModeConfig)) {
return SessionMode.get(sessionModeConfig);
}
// complication old config
return SessionMode.get(getStoreMode().name());
}
public static LockMode getLockMode() {
//startup
if (null != lockMode) {
return lockMode;
}
//env
String lockModeEnv = ContainerHelper.getLockStoreMode();
if (StringUtils.isNotBlank(lockModeEnv)) {
return LockMode.get(lockModeEnv);
}
//config
String lockModeConfig = CONFIGURATION.getConfig(ConfigurationKeys.STORE_LOCK_MODE);
if (StringUtils.isNotBlank(lockModeConfig)) {
return LockMode.get(lockModeConfig);
}
// complication old config
return LockMode.get(getStoreMode().name());
}
public enum StoreMode {
/**
* The File store mode.
*/
FILE("file"),
/**
* The Db store mode.
*/
DB("db"),
/**
* The Redis store mode.
*/
REDIS("redis");
private String name;
StoreMode(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static StoreMode get(String name) {
for (StoreMode mode : StoreMode.values()) {
if (mode.getName().equalsIgnoreCase(name)) {
return mode;
}
}
throw new IllegalArgumentException("unknown store mode:" + name);
}
}
public enum SessionMode {
/**
* The File store mode.
*/
FILE("file"),
/**
* The Db store mode.
*/
DB("db"),
/**
* The Redis store mode.
*/
REDIS("redis");
private String name;
SessionMode(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static SessionMode get(String name) {
for (SessionMode mode : SessionMode.values()) {
if (mode.getName().equalsIgnoreCase(name)) {
return mode;
}
}
throw new IllegalArgumentException("unknown session mode:" + name);
}
}
public enum LockMode {
/**
* The File store mode.
*/
FILE("file"),
/**
* The Db store mode.
*/
DB("db"),
/**
* The Redis store mode.
*/
REDIS("redis");
private String name;
LockMode(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static LockMode get(String name) {
for (LockMode mode : LockMode.values()) {
if (mode.getName().equalsIgnoreCase(name)) {
return mode;
}
}
throw new IllegalArgumentException("unknown lock mode:" + name);
}
}
} }

@ -55,6 +55,13 @@ public interface TransactionStoreManager {
*/ */
GlobalSession readSession(String xid, boolean withBranchSessions); GlobalSession readSession(String xid, boolean withBranchSessions);
/**
* Read session global session by sort by timeout begin status.
*
* @param withBranchSessions the withBranchSessions
* @return the global session
*/
List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions);
/** /**
* Read session global session. * Read session global session.
* *

@ -15,9 +15,6 @@
*/ */
package io.seata.server.transaction.at; package io.seata.server.transaction.at;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.seata.common.exception.StoreException; import io.seata.common.exception.StoreException;
import io.seata.common.util.StringUtils; import io.seata.common.util.StringUtils;
@ -29,6 +26,9 @@ import io.seata.server.coordinator.AbstractCore;
import io.seata.server.session.BranchSession; import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession; import io.seata.server.session.GlobalSession;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static io.seata.common.Constants.AUTO_COMMIT; import static io.seata.common.Constants.AUTO_COMMIT;
import static io.seata.common.Constants.SKIP_CHECK_LOCK; import static io.seata.common.Constants.SKIP_CHECK_LOCK;
@ -41,7 +41,7 @@ import static io.seata.core.exception.TransactionExceptionCode.LockKeyConflict;
*/ */
public class ATCore extends AbstractCore { public class ATCore extends AbstractCore {
private ObjectMapper objectMapper; private final ObjectMapper objectMapper = new ObjectMapper();
public ATCore(RemotingServer remotingServer) { public ATCore(RemotingServer remotingServer) {
super(remotingServer); super(remotingServer);
@ -59,9 +59,6 @@ public class ATCore extends AbstractCore {
boolean autoCommit = true; boolean autoCommit = true;
boolean skipCheckLock = false; boolean skipCheckLock = false;
if (StringUtils.isNotBlank(applicationData)) { if (StringUtils.isNotBlank(applicationData)) {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
try { try {
Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class); Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);
Object clientAutoCommit = data.get(AUTO_COMMIT); Object clientAutoCommit = data.get(AUTO_COMMIT);

@ -15,9 +15,6 @@
*/ */
package io.seata.server.transaction.saga; package io.seata.server.transaction.saga;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.seata.common.util.CollectionUtils; import io.seata.common.util.CollectionUtils;
import io.seata.core.exception.GlobalTransactionException; import io.seata.core.exception.GlobalTransactionException;
@ -37,6 +34,10 @@ import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHelper; import io.seata.server.session.SessionHelper;
import io.seata.server.session.SessionHolder; import io.seata.server.session.SessionHolder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/** /**
* The type saga core. * The type saga core.
* *

Loading…
Cancel
Save