快捷搜索:

「Seata源码」——TCC模式

 

前言

本章分析Seata(1.5.0)在TCC模式下,TM、RM、TC角色的原理。

一、案例

1、business-service

business-service服务同时担任TM和RM角色。

TM角色下@GlobalTransactional负责管理全局事务。

@DubboReference private StorageTccAction storageTccAction; @Autowired private LocalTccAction localTccAction; @GlobalTransactional(timeoutMills = 300000, name = "tcc-demo-commit") @GetMapping(value = "/seata/tcc/commit", produces = "application/json") public String tccCommit() { // local tcc localTccAction.prepare(null, COMMODITY_CODE, ORDER_COUNT); // rpc(storage-service) tcc storageTccAction.freeze(null, COMMODITY_CODE, ORDER_COUNT); return SUCCESS; } 复制代码

RM角色下LocalTccAction被@LocalTCC+@TwoPhaseBusinessAction标注,作为一个TCC资源向TC注册,管理分支事务注册、提交和回滚。(忽略LocalTccActionImpl实现)

@LocalTCC public interface LocalTccAction { @TwoPhaseBusinessAction(name = "local-tcc-action", // 资源名称 commitMethod = "commit", // 二阶段提交方法 rollbackMethod = "rollback", // 二阶段回滚方法 commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 提交方法参数列表 rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 回滚方法参数列表 useTCCFence = true // 是否启用TCCFence,由SeataTCC框架处理TCC三大问题(幂等、悬挂、空回滚) ) void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void commit(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void rollback(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); } 复制代码

StorageTccAction是Dubbo客户端ReferenceBean,freeze方法被@TwoPhaseBusinessAction标注,会在发送RPC请求前注册分支事务。

public interface StorageTccAction { @TwoPhaseBusinessAction(name = "storage-tcc-action", // 资源名称 commitMethod = "deduct", // 二阶段提交方法 rollbackMethod = "unFreeze", // 二阶段回滚方法 commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 提交方法参数列表 rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class} // 回滚方法参数列表 ) void freeze(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void deduct(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void unFreeze(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); } 复制代码

2、storage-service

storage-service是纯粹的一个RM,负责管理分支事务的提交和回滚。

StorageTccAction与business-service客户端一致。

public interface StorageTccAction { @TwoPhaseBusinessAction(name = "storage-tcc-action", // 资源名称 commitMethod = "deduct", // 二阶段提交方法 rollbackMethod = "unFreeze", // 二阶段回滚方法 commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 提交方法参数列表 rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class} // 回滚方法参数列表 ) void freeze(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void deduct(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); void unFreeze(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); } 复制代码

StorageTccActionImpl作为Dubbo的ServiceBean暴露远程服务。

在启动阶段被@TwoPhaseBusinessAction标注,识别为TCC资源,向TC注册,负责分支事务提交和回滚。

注意RPC的分支事务注册是由客户端完成的,即business-service调用storageTccAction.freeze时,由business-service注册了分支事务。

@DubboService(protocol = "dubbo") @Component public class StorageTccActionImpl implements StorageTccAction { // ... } 复制代码

二、启动

在应用启动阶段,TCC模式也是利用GlobalTransactionScanner创建代理(继承AbstractAutoProxyCreator,在postProcessAfterInitialization阶段创建代理),代理逻辑在TccActionInterceptor中。

// GlobalTransactionScanner extends AbstractAutoProxyCreator // postProcessAfterInitialization阶段 protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // ... // 注册RM、判断是否需要代理 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // tcc_fence_log清理任务 TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); // 代理逻辑TccActionInterceptor interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } // ... } 复制代码

TCC模式下有三种特殊的SpringBean。

  • LocalTCC注释接口的Bean:如案例中的LocalTccAction;
  • RPC服务提供方ServiceBean:如Dubbo中被@DubboService注释的服务实现类,如案例中的StorageTccActionImpl;
  • RPC服务消费方ReferenceBean:如Dubbo中被@DubboReference注入的Bean,如案例中的StorageTccAction;

针对于三种不同的Bean,在TCCBeanParserUtils.isTccAutoProxy做了很多逻辑,比如注册RM、判断是否需要被TccActionInterceptor拦截等。

LocalTCC类进入1;ServiceBean进入2返回false;ReferenceBean由于是个FactoryBean,进入3。返回true代表需要被TccActionInterceptor代理。

// TCCBeanParserUtils public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) { // dubbo:service 和 LocalTCC 注册为 RM boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName); RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName); if (isRemotingBean) { if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) { // LocalTCC 需要被代理 TccActionInterceptor return isTccProxyTargetBean(remotingDesc); // 1 } else { // dubbo:service(ServiceBean) 不需要被代理 return false; // 2 } } else { if (remotingDesc == null) { if (isRemotingFactoryBean(bean, beanName, applicationContext)) { // dubbo:reference(Dubbo ReferenceBean) 需要被代理 TccActionInterceptor remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName); return isTccProxyTargetBean(remotingDesc); // 3 } else { return false; } } else { return isTccProxyTargetBean(remotingDesc); } } } 复制代码

isTccProxyTargetBean判断LocalTCC和ReferenceBean具体是否会被代理,只有接口里有TwoPhaseBusinessAction注解方法的类,才会返回true,被TccActionInterceptor拦截。

// TCCBeanParserUtils public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) { if (remotingDesc == null) { return false; } boolean isTccClazz = false; Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass(); Method[] methods = tccInterfaceClazz.getMethods(); TwoPhaseBusinessAction twoPhaseBusinessAction; for (Method method : methods) { twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class); if (twoPhaseBusinessAction != null) { isTccClazz = true; break; } } if (!isTccClazz) { return false; } short protocols = remotingDesc.getProtocol(); if (Protocols.IN_JVM == protocols) { return true; // local tcc } return remotingDesc.isReference(); // dubbo:reference } 复制代码

此外,在parserRemotingServiceInfo方法中,识别所有LocalTCC和ServiceBean中被TwoPhaseBusinessAction注解标注的方法,每个TwoPhaseBusinessAction注解的方法都作为一个TCCResource注册到TC。

注意TCCResource.actionName会作为当前应用(spring.application.name)的唯一资源标识,所以在同一个应用内部TwoPhaseBusinessAction的name属性不能重复

这一步保证了TCC二阶段TC能找到对应的commit或rollback方法,调用对应RM做二阶段提交或回滚。

// TCCBeanParserUtils protected static boolean parserRemotingServiceInfo(Object bean, String beanName) { RemotingParser remotingParser = DefaultRemotingParser.get().isRemoting(bean, beanName); if (remotingParser != null) { return DefaultRemotingParser.get().parserRemotingServiceInfo(bean, beanName, remotingParser) != null; } return false; } // DefaultRemotingParser public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) { RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName); if (remotingBeanDesc == null) { return null; } remotingServiceMap.put(beanName, remotingBeanDesc); Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass(); Method[] methods = interfaceClass.getMethods(); if (remotingParser.isService(bean, beanName)) { // localTcc or ServiceBean try { Object targetBean = remotingBeanDesc.getTargetBean(); for (Method m : methods) { TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class); // 所有TwoPhaseBusinessAction注解标注的方法注册为一个Resource if (twoPhaseBusinessAction != null) { TCCResource tccResource = new TCCResource(); tccResource.setActionName(twoPhaseBusinessAction.name()); tccResource.setTargetBean(targetBean); tccResource.setPrepareMethod(m); tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod()); tccResource.setCommitMethod(interfaceClass.getMethod(twoPhaseBusinessAction.commitMethod(), twoPhaseBusinessAction.commitArgsClasses())); tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod()); tccResource.setRollbackMethod(interfaceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(), twoPhaseBusinessAction.rollbackArgsClasses())); tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses()); tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses()); tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(), twoPhaseBusinessAction.commitArgsClasses())); tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(), twoPhaseBusinessAction.rollbackArgsClasses())); DefaultResourceManager.get().registerResource(tccResource); } } } catch (Throwable t) { throw new FrameworkException(t, "parser remoting service error"); } } if (remotingParser.isReference(bean, beanName)) { remotingBeanDesc.setReference(true); } return remotingBeanDesc; } 复制代码

综上,在三种特殊bean且有方法被标注TwoPhaseBusinessAction的情况下,是否需要被代理和RM注册逻辑如下。

Bean类型

是否需要代理

是否需要注册为RM

LocalTCC Bean

Service Bean

Reference Bean

三、一阶段(Try)

一阶段由TM开启全局事务,这里逻辑在AT模式下已经讲过,不再赘述。全局事务不关心分支事务使用的是何种模式,AT模式与TCC模式也可以混合使用。

1、RM

TccActionInterceptor在proceed方法中拦截所有被TwoPhaseBusinessAction注解标注的方法,环绕处理一下当前分支事务类型RootContext.branchType。实际调用ActionInterceptorHandler执行TCC一阶段try逻辑。

// TccActionInterceptor private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler(); @Override public Object invoke(final MethodInvocation invocation) throws Throwable { if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) { return invocation.proceed(); } Method method = getActionInterfaceMethod(invocation); TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class); if (businessAction != null) { String xid = RootContext.getXID(); BranchType previousBranchType = RootContext.getBranchType(); if (BranchType.TCC != previousBranchType) { RootContext.bindBranchType(BranchType.TCC); } try { return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction, invocation::proceed); } finally { if (BranchType.TCC != previousBranchType) { RootContext.unbindBranchType(); } MDC.remove(RootContext.MDC_KEY_BRANCH_ID); } } return invocation.proceed(); } 复制代码

ActionInterceptorHandler逻辑大致分为三步:

  1. 准备BusinessActionContext;
  2. 向TC注册分支事务BranchRegisterRequest,返回branchId,这说明在非LocalTCC模式下,分支事务注册在服务调用方,分支事务提交在服务提供方;
  3. 执行业务方法,LocalTCC就是本地try方法(可被TCCFence环绕,暂时忽略),DubboReference就是进行远程RPC调用;

// ActionInterceptorHandler public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable { // 从入参列表中获取BusinessActionContext 或 直接new一个 BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments); actionContext.setXid(xid); // 全局事务id String actionName = businessAction.name(); actionContext.setActionName(actionName); actionContext.setDelayReport(businessAction.isDelayReport()); // 填充actionContext并注册分支事务 String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); actionContext.setBranchId(branchId); BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext(); try { BusinessActionContextUtil.setContext(actionContext); // TCCFence处理 if (businessAction.useTCCFence()) { try { return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { Throwable originException = e.getCause(); if (originException instanceof FrameworkException) { LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage()); } throw originException; } } else { // 执行业务方法,即try方法 return targetCallback.execute(); } } finally { // BusinessActionContextUtil恢复原有BusinessActionContext } } 复制代码

BusinessActionContext用于存储TCC两个阶段的上下文,其中actionName就是resourceId资源id。

public class BusinessActionContext { // 全局事务id private String xid; // 分支事务id private String branchId; // @TwoPhaseBusinessAction.name private String actionName; // 是否延迟Report,用于二阶段提高性能 private Boolean isDelayReport; // 标识actionContext是否被业务修改 private Boolean isUpdated; // actionContext private Map<String, Object> actionContext; } 复制代码

BusinessActionContext.actionContext存储了包括:try方法名(sys::prepare)、commit方法名(sys::commit)、rollback方法名(sys::rollback)、actionName(@TwoPhaseBusinessAction.name)、是否开启tccFence(@TwoPhaseBusinessAction.useTCCFence)、参数名称和参数值。

2、TC

TC接收BranchRegisterRequest,由TccCore处理,TccCore未重写任何AbstractCore中的方法。

public class TccCore extends AbstractCore { public TccCore(RemotingServer remotingServer) { super(remotingServer); } @Override public BranchType getHandleBranchType() { return BranchType.TCC; } } 复制代码

TC逻辑还是和AT模式一样,在AbstractCore中,TCC模式下没有获取全局锁逻辑。只是将分支事务的所有信息持久化

// AbstractCore public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { // Step1 根据xid查询global_table得到GlobalSession GlobalSession globalSession = assertGlobalSessionNotNull(xid, false); // 对于存储模式=file的情况,由于GlobalSession在内存中,所以需要获取锁后再执行 // 对于存储模式=db/redis的情况,不需要获取锁 return SessionHolder.lockAndExecute(globalSession, () -> { // 状态校验 必须为begin globalSessionStatusCheck(globalSession); globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId); MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId())); // Step2 获取全局锁(只有AT模式需要) branchSessionLock(globalSession, branchSession); try { // Step3 保存分支事务 globalSession.addBranch(branchSession); } catch (RuntimeException ex) { branchSessionUnlock(branchSession); throw new BranchTransactionException(FailedToAddBranch, String .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex); } return branchSession.getBranchId(); }); } 复制代码

四、二阶段提交(Commit)

1、TC

TM发起GlobalCommitRequest给TC,TC负责执行每个分支事务提交。

DefaultCore.commit(xid)方法执行全局事务提交。这里与AT模式有两个区别:

  1. globalSession.closeAndClean:如果全局事务中包含AT模式分支事务,需要先删除AT模式中获取的全局锁,而TCC模式的分支事务在这里什么都不做;
  2. globalSession.canBeCommittedAsync:如果全局事务中都是AT模式的分支事务,可以执行异步提交;否则需要执行同步提交。即存在TCC分支事务的情况下,都要同步提交

// DefaultCore public GlobalStatus commit(String xid) throws TransactionException { GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> { if (globalSession.getStatus() == GlobalStatus.Begin) { // 如果分支事务存在AT模式,先释放全局锁,delete from lock_table where xid = ? globalSession.closeAndClean(); // 如果分支事务都是AT模式,则可以执行异步提交 if (globalSession.canBeCommittedAsync()) { // 执行异步提交,更新全局事务状态为AsyncCommitting,update global_table set status = AsyncCommitting where xid = ? globalSession.asyncCommit(); MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false); return false; } else { // TCC globalSession.changeGlobalStatus(GlobalStatus.Committing); return true; } } return false; }); if (shouldCommit) { // 同步提交(TCC) boolean success = doGlobalCommit(globalSession, false); if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) { globalSession.asyncCommit(); return GlobalStatus.Committed; } else { return globalSession.getStatus(); } } else { // 异步提交(AT) return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus(); } } 复制代码

DefaultCore.doGlobalCommit执行全局事务提交核心逻辑,如果二阶段提交失败,会重试至成功为止。

如果所有分支事务都是AT模式,这个方法会被异步调用,因为AT模式下在释放完全局锁后,全局事务和分支事务只是做数据清理工作,比如删除global_table、branch_table、undo_log。

如果考虑分支事务是TCC模式,这个方法将被同步调用,如果中间存在分支事务提交失败,会异步重试直至成功。

如果AT模式和TCC模式混合使用,AT模式的分支事务会在异步任务中再次执行doGlobalCommit异步提交,TCC模式的分支事务还是会在第一次调用doGlobalCommit时同步提交。

// DefaultCore public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException { boolean success = true; if (globalSession.isSaga()) { success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying); } else { Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> { // AT模式和TCC模式共存的情况下,AT模式跳过同步提交,只对TCC模式分支事务同步提交 if (!retrying && branchSession.canBeCommittedAsync()) { return CONTINUE; } try { // Step1 发送BranchCommitRequest给RM,AT模式RM会删除undo_log,TCC模式RM执行二阶段提交 BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession); switch (branchStatus) { case PhaseTwo_Committed: // Step2 删除branch_table中的分支事务记录 SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; case PhaseTwo_CommitFailed_Unretryable: // 不可重试(XA中有实现) return false; default: if (!retrying) { // 更新全局事务为二阶段提交重试状态,异步重试至成功位置 globalSession.queueToRetryCommit(); return false; } if (globalSession.canBeCommittedAsync()) { return CONTINUE; } else { return false; } } } catch (Exception ex) { if (!retrying) { globalSession.queueToRetryCommit(); throw new TransactionException(ex); } } // 某个分支事务处理失败,继续处理后续分支事务 return CONTINUE; }); // 如果是同步提交,某个分支事务处理失败,直接返回false if (result != null) { return result; } if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) { return false; } if (!retrying) { globalSession.setStatus(GlobalStatus.Committed); } } if (success && globalSession.getBranchSessions().isEmpty()) { // Step3 删除全局事务 delete from global_table where xid = ? SessionHelper.endCommitted(globalSession, retrying); } return success; } 复制代码

这里再考虑一个问题,如果TCC分支事务是LocalTCC注解标识的资源,那么分支事务BranchSession注册和提交是在同一个服务中,如果TCC分支事务是像DubboRPC一样的资源,分支事务BranchSession是由ReferenceBean客户端注册的,而分支事务提交是由ServiceBean服务提供方提交的。

对于前者LocalTCC还比较好理解,LocalTCC注册和提交BranchSession是同一个服务实例,注册时BranchSession就携带了当前服务实例的相关信息,比如applicationId、ip、port;但是Dubbo这种怎么做呢,只能通过资源注册时的resourceId(actionName)才能定位到,但是也不是BranchSession注册时的那个应用。

// AbstractCore protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( branchSession.getResourceId(), branchSession.getClientId(), request); return response.getBranchStatus(); } // AbstractNettyRemotingServer public Object sendSyncRequest(String resourceId, String clientId, Object msg) throws TimeoutException { // 定位客户端Channel Channel channel = ChannelManager.getChannel(resourceId, clientId); if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } 复制代码

关注ChannelManager.getChannel方法,入参resourceId就是TCC资源的actionName,clientId是applicationId+ip+port拼接而成的字符串,由BranchSession注册时RM给到。

ChannelManager用一个Map存储资源+应用+ip+port到Rpc上下文的映射关系,其中RpcContext持有RMClient注册时的Channel。

public class ChannelManager { /** * resourceId -> applicationId -> ip -> port -> RpcContext */ private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>(); } 复制代码

对于LocalTCC或者AT模式,分支事务注册与提交是同一个服务实例,通过resourceId+applicationId+ip+port一般就能定位到二阶段通讯的服务实例,但是可能对应服务宕机或者宕机后重连,这边会降级去找同一个ip不同port的,或者同一个applicationId的不同ip:port。

对于TCC模式下二阶段要找ServiceBean服务提供方的情况,直接进入Step2-fallback,找同一个resourceId下的其他applicationId注册的RM,这里就能找到storage-service进行二阶段提交,所以resourceId(actionName)最好全局唯一。

// ChannelManager public static Channel getChannel(String resourceId, String clientId) { Channel resultChannel = null; String[] clientIdInfo = readClientId(clientId); String targetApplicationId = clientIdInfo[0]; String targetIP = clientIdInfo[1]; int targetPort = Integer.parseInt(clientIdInfo[2]); ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId); // Step1 根据resourceId找对应applicationId-ip-port对应channel if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) { return null; } ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId); // Step2 根据BranchSession注册的applicationId应用 if (ipMap != null && !ipMap.isEmpty()) { // Step3 根据BranchSession注册的ip ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP); if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) { // Step4 根据BranchSession注册的port RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort); if (exactRpcContext != null) { Channel channel = exactRpcContext.getChannel(); if (channel.isActive()) { resultChannel = channel; } } // Step4-fallback 可能原始channel关闭了,遍历BranchSession注册的ip对应的其他port(resourceId+applicationId+ip) if (resultChannel == null) { for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP .entrySet()) { Channel channel = portMapOnTargetIPEntry.getValue().getChannel(); if (channel.isActive()) { resultChannel = channel; break; } } } } // Step3-fallback BranchSession注册的ip没有对应Channel,从resourceId+applicationId找对应channel if (resultChannel == null) { for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap .entrySet()) { if (ipMapEntry.getKey().equals(targetIP)) { continue; } ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue(); if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) { continue; } for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) { Channel channel = portMapOnOtherIPEntry.getValue().getChannel(); if (channel.isActive()) { resultChannel = channel; break; } } if (resultChannel != null) { break; } } } } // Step2-fallback BranchSession注册的applicationId没有对应channel,从resourceId中找一个Channel if (resultChannel == null) { resultChannel = tryOtherApp(applicationIdMap, targetApplicationId); } return resultChannel; } 复制代码

2、RM

TCCResourceManager.branchCommit方法解析BranchCommitRequest里的数据和本地缓存TCCResource里的数据,通过反射调用二阶段提交方法。

其中TCCResource主要包含commit方法,BranchCommitRequest主要包含commit方法参数(一阶段RM带给TC,二阶段TC再给到RM)。

// TCCResourceManager public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // Step1 从本地缓存tccResourceMap中定位到资源对应本地commit方法 TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); Object targetTCCBean = tccResource.getTargetBean(); Method commitMethod = tccResource.getCommitMethod(); try { // Step2 反序列化BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // Step3 解析commit方法入参列表 Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext); Object ret; boolean result; // Step4 执行commit方法 if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { // Step4-1 开启useTCCFence try { result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { throw e.getCause(); } } else { // Step4-2 未开启useTCCFence ret = commitMethod.invoke(targetTCCBean, args); if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } } return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable; } catch (Throwable t) { return BranchStatus.PhaseTwo_CommitFailed_Retryable; } } 复制代码

五、二阶段回滚(Rollback)

1、TC

TC二阶段回滚逻辑基本与纯AT模式一致,细节上比如Step2-1处对于TCC分支不需要删除全局锁(branch_table)。如果某个TCC分支事务回滚失败,会重试到成功为止。

// DefaultCore public GlobalStatus rollback(String xid) throws TransactionException { GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> { globalSession.close(); if (globalSession.getStatus() == GlobalStatus.Begin) { // 将全局锁lock_table状态更新为Rollbacking // 将全局事务global_table状态更新为Rollbacking globalSession.changeGlobalStatus(GlobalStatus.Rollbacking); return true; } return false; }); if (!shouldRollBack) { return globalSession.getStatus(); } // 执行全局回滚 boolean rollbackSuccess = doGlobalRollback(globalSession, false); return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus(); } // DefaultCore public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException { boolean success = true; Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> { BranchStatus currentBranchStatus = branchSession.getStatus(); if (currentBranchStatus == BranchStatus.PhaseOne_Failed) { SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; } try { // Step1 发送BranchRollbackRequest BranchStatus branchStatus = branchRollback(globalSession, branchSession); switch (branchStatus) { case PhaseTwo_Rollbacked: // Step2-1 释放全局锁,删除分支事务 SessionHelper.removeBranch(globalSession, branchSession, !retrying); return CONTINUE; case PhaseTwo_RollbackFailed_Unretryable: // 回滚失败且无法重试成功 SessionHelper.endRollbackFailed(globalSession, retrying); return false; default: // Step2-2 如果RM回滚失败 全局事务状态变为RollbackRetrying 等待重试 if (!retrying) { globalSession.queueToRetryRollback(); } return false; } } catch (Exception ex) { if (!retrying) { // 如果Step1或Step2步骤异常 全局事务状态变为RollbackRetrying 等待重试 globalSession.queueToRetryRollback(); } throw new TransactionException(ex); } }); // 如果存在一个分支事务回滚失败,则返回false if (result != null) { return result; } // Step3 // 对于file模式,直接删除全局事务 // 对于db/redis模式,异步再次执行doGlobalRollback,这里不做任何处理 // 防止由于各种网络波动造成分支事务注册成功lock_table和branch_table中始终有残留数据 // 导致全局锁一直被占用,无法释放 if (success) { SessionHelper.endRollbacked(globalSession, retrying); } return success; } // SessionHelper public static void endRollbacked(GlobalSession globalSession, boolean retryGlobal) throws TransactionException { // 如果是重试 或 file模式 if (retryGlobal || !DELAY_HANDLE_SESSION) { long beginTime = System.currentTimeMillis(); GlobalStatus currentStatus = globalSession.getStatus(); boolean retryBranch = currentStatus == GlobalStatus.TimeoutRollbackRetrying || currentStatus == GlobalStatus.RollbackRetrying; if (isTimeoutGlobalStatus(currentStatus)) { globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked); } else { globalSession.changeGlobalStatus(GlobalStatus.Rollbacked); } // 删除全局事务global_table globalSession.end(); } } 复制代码

2、RM

RM侧处理BranchRollbackRequest

TCCResourceManager.branchRollback方法实际处理RM二阶段回滚,流程与二阶段提交类似。

// TCCResourceManager public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // Step1 从本地缓存tccResourceMap中定位到资源对应本地rollback方法 TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId); Object targetTCCBean = tccResource.getTargetBean(); Method rollbackMethod = tccResource.getRollbackMethod(); try { // Step2 反序列化BusinessActionContext //BusinessActionContext BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData); // Step3 解析rollback方法入参列表 Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext); Object ret; boolean result; // Step4 执行rollback方法 if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) { try { result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId, args, tccResource.getActionName()); } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) { throw e.getCause(); } } else { ret = rollbackMethod.invoke(targetTCCBean, args); if (ret != null) { if (ret instanceof TwoPhaseResult) { result = ((TwoPhaseResult)ret).isSuccess(); } else { result = (boolean)ret; } } else { result = true; } } return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable; } catch (Throwable t) { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } 复制代码

六、useTCCFence

在LocalTCC模式下,可以选择开启useTCCFence=true,通过seata框架内置的tcc分支事务状态表解决TCC的三大问题:

  • 幂等:由于网络波动,TC未在超时时间内收到RM二阶段响应,重试导致RM收到多次二阶段rollback或commit请求;
  • 资源悬挂:由于网络波动,RM在收到二阶段rollback请求之后,再收到try请求;
  • 空回滚:RM由于各种原因未成功执行try,TM回滚全局事务,RM在没有执行try的情况下rollback;

@LocalTCC public interface LocalTccAction { @TwoPhaseBusinessAction(name = "local-tcc-action", // 资源名称 commitMethod = "commit", // 二阶段提交方法 rollbackMethod = "rollback", // 二阶段回滚方法 commitArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 提交方法参数列表 rollbackArgsClasses = {BusinessActionContext.class, String.class, Integer.class}, // 回滚方法参数列表 useTCCFence = true // 是否启用TCCFence,由SeataTCC框架处理TCC三大问题(幂等、悬挂、空回滚) ) void prepare(BusinessActionContext actionContext, @BusinessActionContextParameter("commodityCode") String commodityCode, @BusinessActionContextParameter("count") Integer count); } 复制代码

所有处理逻辑都在TCCFenceHandler类中,在执行prepare、commit、rollback业务方法之前,用tccFenceLog事务状态表拦截非法请求。

在try阶段,TCCFenceHandler首先开启事务保证与业务在同一个事务中提交,然后插入一条tcc_fence_log状态为STATUS_TRIED,最后执行业务try方法。

// TCCFenceHandler public static Object prepareFence(String xid, Long branchId, String actionName, Callback<Object> targetCallback) { // 开启事务 return transactionTemplate.execute(status -> { try { Connection conn = DataSourceUtils.getConnection(dataSource); // 插入tcc_fence_log status = STATUS_TRIED boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED); LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId); if (result) { // 业务try方法 return targetCallback.execute(); } else { throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId), FrameworkErrorCode.InsertRecordError); } } catch (TCCFenceException e) { if (e.getErrcode() == FrameworkErrorCode.DuplicateKeyException) { LOGGER.error("Branch transaction has already rollbacked before,prepare fence failed. xid= {},branchId = {}", xid, branchId); addToLogCleanQueue(xid, branchId); } status.setRollbackOnly(); throw new SkipCallbackWrapperException(e); } catch (Throwable t) { status.setRollbackOnly(); throw new SkipCallbackWrapperException(t); } }); } 复制代码

在二阶段commit方法执行前,先查询tcc_fence_log(select for update)中是否有记录,如果有则做幂等状态校验,只有当状态为STATUS_TRIED时,才执行二阶段业务commit方法。

// TCCFenceHandler public static boolean commitFence(Method commitMethod, Object targetTCCBean, String xid, Long branchId, Object[] args) { // 开启事务 return transactionTemplate.execute(status -> { try { Connection conn = DataSourceUtils.getConnection(dataSource); // select for update TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId); if (tccFenceDO == null) { throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId), FrameworkErrorCode.RecordAlreadyExists); } // (1)幂等 if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { return true; } // 之前已经收到二阶段回滚请求 if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { return false; } // 通过幂等校验后,更新状态STATUS_COMMITTED并执行目标方法 return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args); } catch (Throwable t) { status.setRollbackOnly(); throw new SkipCallbackWrapperException(t); } }); } 复制代码

TCCFenceHandler处理二阶段回滚逻辑就比较复杂了。开启事务后查询tcc_fence_log(select for update)中是否有记录。

如果无记录,代表这是一次空回滚,那么尝试插入一条tcc_fence_log记录,且status=STATUS_SUSPENDED:

  1. 如果插入成功,代表二阶段回滚执行完成,直接返回;如果后续收到try请求,在prepareFence中直接抛出异常(唯一约束xid+branchId),不执行一阶段try业务方法,阻止资源悬挂
  2. 如果插入失败(唯一约束xid+branchId),可能发生了在rollback阶段收到了一阶段try请求,此时抛出异常,等待后续TC执行二阶段回滚重试,解决资源悬挂

如果有记录,做幂等校验后,执行业务rollback方法。

// TCCFenceHandler public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, String xid, Long branchId, Object[] args, String actionName) { // 开启事务 return transactionTemplate.execute(status -> { try { Connection conn = DataSourceUtils.getConnection(dataSource); // select for update TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId); // (2) 资源悬挂,插入tcc_fence_log status=STATUS_SUSPENDED,防止一阶段try请求在回滚之后到达 // (3) 空回滚,如果当前没有tcc_fence_log,代表是一次空回滚,不执行二阶段rollback方法 if (tccFenceDO == null) { boolean result = insertTCCFenceLog(conn, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED); if (!result) { throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), FrameworkErrorCode.InsertRecordError); } return true; } else { // (1) 幂等 if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { return true; } if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { return false; } } // 通过幂等和空回滚校验后,更新状态并执行目标方法 return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args); } catch (Throwable t) { status.setRollbackOnly(); throw new SkipCallbackWrapperException(t); } }); } 复制代码

总结

1、三种特殊bean

1)@LocalTCC+@TwoPhaseBusinessAction注解的LocalTCCBean,在启动阶段注册为RM,执行TCC事务中被seata逻辑代理注册分支事务,并负责提交/回滚分支事务

2)@DubboReference+@TwoPhaseBusinessAction注解的ReferenceBean,仅在执行阶段,执行TCC事务中被seata逻辑代理注册分支事务

3)@DubboService+@TwoPhaseBusinessAction注解的ServiceBean,在启动阶段注册为RM,仅提交/回滚分支事务

2、try阶段

1)RM被TccActionInterceptor拦截,将TwoPhaseBusinessAction注解方法和入参封装为BranchRegisterRequest请求TC,注册分支事务

2)TC收到BranchRegisterRequest请求,持久化分支事务数据,返回分支事务id。与AT不同,AT在持久化分支事务前还需要先获取全局锁

3)RM执行一阶段try业务逻辑,对于LocalTCC是本地事务,对于DubboReference是远程RPC调用

3、commit阶段

1)TM提交全局事务,发送GlobalCommitRequest给TC

2)TC收到GlobalCommitRequest

  • 如果全局事务中存在AT分支事务,先删除AT全局锁;
  • 同步提交TCC分支事务,发送BranchCommitRequest给RM,如果RM响应失败,异步重试至成功为止,如果成功,删除分支事务;
  • 异步提交AT分支事务,发送BranchCommitRequest给RM,RM异步删除undo_log;

3)RM收到BranchCommitRequest

无论是LocalTCC还是Dubbo,收到BranchCommitRequest的都是启动时注册TCC资源的服务实例。

RM解析TC传来的数据(commit方法名,一阶段try的参数),通过反射调用TCC的commit方法。

如果是LocalTCC可以开启useTCCFence,利用seata解决tcc三大问题。

4、rollback阶段

与commit类似

1)TM回滚全局事务,请求TC

2)TC处理全局回滚

  • 更新global_table全局事务为Rollbacking,如果有AT分支,更新lock_table为Rollbacking
  • 如果有AT分支,释放全局锁lock_table
  • 发送BranchRollbackRequest给RM,如果失败,异步重试至成功为止
  • 如果成功,同步删除branch_table中的分支事务,db/redis模式异步删除global_table全局事务

3)RM收到BranchRollbackRequest

和commit一致,反射执行rollback方法,如果是LocalTCC可以开启useTCCFence,利用seata解决tcc三大问题。

5、useTCCFence

在LocalTCC模式下,可以选择开启useTCCFence=true,通过seata框架内置的tcc分支事务状态表解决TCC的三大问题:

幂等:RM在try阶段会插入一条STATUS_TRIED状态的分支事务状态记录。收到rollback和commit请求时,RM会通过select for update查询分支事务状态记录,如果状态为STATUS_TRIED才会执行二阶段方法。

空回滚:如果rollback和commit时,RM通过select for update查询分支事务状态记录为空,则代表发生空回滚,这里尝试插入一条STATUS_SUSPENDED状态的分支事务记录。如果发生唯一约束冲突,代表try方法被同时执行,返回TC失败,TC会重试;如果没发生唯一约束冲突,返回成功。

资源悬挂:由于处理空回滚的时候会插入STATUS_SUSPENDED状态的分支事务记录,RM当rollback后收到try,插入STATUS_TRIED状态记录会发生唯一约束冲突,RM返回TC失败,避免了资源悬挂。

链接:https://juejin.cn/post/7162425024732332040

[注:本文部分图片来自互联网!未经授权,不得转载!每天跟着我们读更多的书]


互推传媒文章转载自第三方或本站原创生产,如需转载,请联系版权方授权,如有内容如侵犯了你的权益,请联系我们进行删除!

如若转载,请注明出处:http://www.hfwlcm.com/info/68479.html