seata-XA模式
分布式事务seataXA模式

1.什么是XA模式

  • XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了(全局)事务管理器(TM)和(局 部)资源管理器(RM)之间的接口。主流的关系型 数据库产品都是实现了XA接口的。
  • XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。
  • XA之所以需要引入事务管理器是因为,在分布 式系统中,从理论上讲两台机器理论上无法达 到一致的状态,需要引入一个单点进行协调。
  • 由全局事务管理器管理和协调的事务,可以跨 越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。



2.seata的XA模式的工作机制
XA 模式 运行在 Seata 定义的事务框架内:
TB1uM2OaSslXu8jSZFuXXXg7FXa-1330-958.png

执行阶段(E xecute):XA start/XA end/XA prepare + SQL + 注册分支
完成阶段(F inish):XA commit/XA rollback

3.代码示例
3.1 模块
我们暂时使用http直接调用的方式,进行微服务之间的调用。业务流程是下单的同时去库存服务里扣库存。这里我们主要有 at-common,at-order,at-storage三个主要模块。demo依赖 spring boot,mybatis plus
6.2 表结构
这里需要2张表 ,order_tbl 订单表,storage_tbl 库存表,由于XA模式是基于数据库支持的,所以不需要undo_log表的参与

create table order_tbl
(
    id             int auto_increment
        primary key,
    user_id        varchar(255)  null,
    commodity_code varchar(255)  null,
    count          int default 0 null,
    money          int default 0 null
)
    charset = utf8;

create table storage_tbl
(
    id             int auto_increment
        primary key,
    commodity_code varchar(255)  null,
    count          int default 0 null,
    constraint commodity_code
        unique (commodity_code)
)
    charset = utf8;



3.2 依赖
我们是一个spring boot的工程,所以除了seata-all 之外还要引入seata-spring-boot-starter
注意:如果不用seata-spring-boot-starter 需要自己配置 DataSourceProxy

compile group: 'io.seata', name: 'seata-all', version: '1.3.0'
compile group: 'io.seata', name: 'seata-spring-boot-starter', version: '1.3.0'



3.3 全局公共配置
根据上面的介绍,seata的事务是有一个全局标记的,这个标记是需要整个请求的生命周期直接流转的。我们在过滤器里获得传过来的值并赋给RootContext.

package com.ms.seata.at.filter;

import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang3.StringUtils;

import io.seata.core.context.RootContext;

@WebFilter(filterName = "seataFilter", urlPatterns = "/*")
public class SeataFilter implements Filter {

    @Override public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) request;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            chain.doFilter(request, response);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

    @Override public void destroy() {

    }
}



在调用其他服务的时候,我们需要把这个标记值传递给下一个服务。示例里使用的是restTemplate来调用其他服务的。所以我们先给restTemplate一个拦截器 SeataRestTemplateInterceptor。

package com.ms.seata.at.interceptor;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;

import io.seata.core.context.RootContext;

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
            throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(request);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return execution.execute(requestWrapper, body);
    }
}



把拦截器加到restTemplate上。

package com.ms.seata.at.conf;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.web.client.RestTemplate;

import com.ms.seata.at.interceptor.SeataRestTemplateInterceptor;

@Configuration
public class SeataRestTemplateAutoConfiguration {

    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;

    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
        return new SeataRestTemplateInterceptor();
    }

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
            Iterator var1 = this.restTemplates.iterator();

            while (var1.hasNext()) {
                RestTemplate restTemplate = (RestTemplate) var1.next();
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }

    }
}



由上可见,其实就是把全局事务的ID通过header赋值,在各个服务之间流转。
具体的使用就是在事务开始的地方加一个 @GlobalTransactional

3.4 DataSource的配置
由于seata-spring-boot-starter 默认的是DataSourceProxy,但是 我们需要用的是 DataSourceProxyXA,所以,我们需要把seata-spring-boot-starter默认的开启的数据源代理关闭

seata.enable-auto-data-source-proxy=false

代码配置数据源代理

@Configuration
public class XADataSourceConfiguration {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxyXA(druidDataSource);
    }

    @Bean
    @ConfigurationProperties(prefix = "mybatis")
    public MybatisSqlSessionFactoryBean sqlSessionFactoryBean( DataSource dataSource) {
        // 这里用 MybatisSqlSessionFactoryBean 代替了 SqlSessionFactoryBean,否则 MyBatisPlus 不会生效
        MybatisSqlSessionFactoryBean mybatisSqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        mybatisSqlSessionFactoryBean.setDataSource(dataSource);
        return mybatisSqlSessionFactoryBean;
    }
}

3.5 测试
首先,我们需要启动seata-server服务(全局事务管理)。然后启动 order,storage服务。
我们的测试流程是 下单->扣库存->插入订单数据。

@GlobalTransactional
public void create(Order order){
    StorageDTO storageDTO = StorageDTO.builder()
            .commodityCode(order.getCommodityCode())
            .count(order.getCount()).build();
    storageClient.deduct(storageDTO);
    orderDAO.save(order);
}



注意:在XA模式下使用了@GlobalTransactional 就不要再去用 @Transactional 了,因为XA模式 就是一个全局大事务。如果两个都用会造成事务叠加就会报如下错误:

Caused by: java.sql.SQLException: XAER_NOTA: Unknown XID



执行结果:达到预期,订单表插入一条数据,库存扣除。

暂无评论