1.什么是XA模式
- XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了(全局)事务管理器(TM)和(局 部)资源管理器(RM)之间的接口。主流的关系型 数据库产品都是实现了XA接口的。
- XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。
- XA之所以需要引入事务管理器是因为,在分布 式系统中,从理论上讲两台机器理论上无法达 到一致的状态,需要引入一个单点进行协调。
- 由全局事务管理器管理和协调的事务,可以跨 越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。
2.seata的XA模式的工作机制
XA 模式 运行在 Seata 定义的事务框架内:
执行阶段(E xecute):XA start/XA end/XA prepare + SQL + 注册分支
完成阶段(F inish):XA commit/XA rollback
3.代码示例
3.1 模块
我们暂时使用http直接调用的方式,进行微服务之间的调用。业务流程是下单的同时去库存服务里扣库存。这里我们主要有 at-common,at-order,at-storage三个主要模块。demo依赖 spring boot,mybatis plus
6.2 表结构
这里需要2张表 ,order_tbl 订单表,storage_tbl 库存表,由于XA模式是基于数据库支持的,所以不需要undo_log表的参与
create table order_tbl
(
id int auto_increment
primary key,
user_id varchar(255) null,
commodity_code varchar(255) null,
count int default 0 null,
money int default 0 null
)
charset = utf8;
create table storage_tbl
(
id int auto_increment
primary key,
commodity_code varchar(255) null,
count int default 0 null,
constraint commodity_code
unique (commodity_code)
)
charset = utf8;
3.2 依赖
我们是一个spring boot的工程,所以除了seata-all 之外还要引入seata-spring-boot-starter
注意:如果不用seata-spring-boot-starter 需要自己配置 DataSourceProxy
compile group: 'io.seata', name: 'seata-all', version: '1.3.0'
compile group: 'io.seata', name: 'seata-spring-boot-starter', version: '1.3.0'
3.3 全局公共配置
根据上面的介绍,seata的事务是有一个全局标记的,这个标记是需要整个请求的生命周期直接流转的。我们在过滤器里获得传过来的值并赋给RootContext.
package com.ms.seata.at.filter;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import io.seata.core.context.RootContext;
@WebFilter(filterName = "seataFilter", urlPatterns = "/*")
public class SeataFilter implements Filter {
@Override public void init(FilterConfig filterConfig) throws ServletException {
}
@Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
boolean isBind = false;
if (StringUtils.isNotBlank(xid)) {
RootContext.bind(xid);
isBind = true;
}
try {
chain.doFilter(request, response);
} finally {
if (isBind) {
RootContext.unbind();
}
}
}
@Override public void destroy() {
}
}
在调用其他服务的时候,我们需要把这个标记值传递给下一个服务。示例里使用的是restTemplate来调用其他服务的。所以我们先给restTemplate一个拦截器 SeataRestTemplateInterceptor。
package com.ms.seata.at.interceptor;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;
import io.seata.core.context.RootContext;
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(request);
String xid = RootContext.getXID();
if (StringUtils.isNotEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return execution.execute(requestWrapper, body);
}
}
把拦截器加到restTemplate上。
package com.ms.seata.at.conf;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.web.client.RestTemplate;
import com.ms.seata.at.interceptor.SeataRestTemplateInterceptor;
@Configuration
public class SeataRestTemplateAutoConfiguration {
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
return new SeataRestTemplateInterceptor();
}
@PostConstruct
public void init() {
if (this.restTemplates != null) {
Iterator var1 = this.restTemplates.iterator();
while (var1.hasNext()) {
RestTemplate restTemplate = (RestTemplate) var1.next();
List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
}
由上可见,其实就是把全局事务的ID通过header赋值,在各个服务之间流转。
具体的使用就是在事务开始的地方加一个 @GlobalTransactional
3.4 DataSource的配置
由于seata-spring-boot-starter 默认的是DataSourceProxy,但是 我们需要用的是 DataSourceProxyXA,所以,我们需要把seata-spring-boot-starter默认的开启的数据源代理关闭
seata.enable-auto-data-source-proxy=false
代码配置数据源代理
@Configuration
public class XADataSourceConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxyXA(druidDataSource);
}
@Bean
@ConfigurationProperties(prefix = "mybatis")
public MybatisSqlSessionFactoryBean sqlSessionFactoryBean( DataSource dataSource) {
// 这里用 MybatisSqlSessionFactoryBean 代替了 SqlSessionFactoryBean,否则 MyBatisPlus 不会生效
MybatisSqlSessionFactoryBean mybatisSqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
mybatisSqlSessionFactoryBean.setDataSource(dataSource);
return mybatisSqlSessionFactoryBean;
}
}
3.5 测试
首先,我们需要启动seata-server服务(全局事务管理)。然后启动 order,storage服务。
我们的测试流程是 下单->扣库存->插入订单数据。
@GlobalTransactional
public void create(Order order){
StorageDTO storageDTO = StorageDTO.builder()
.commodityCode(order.getCommodityCode())
.count(order.getCount()).build();
storageClient.deduct(storageDTO);
orderDAO.save(order);
}
注意:在XA模式下使用了@GlobalTransactional 就不要再去用 @Transactional 了,因为XA模式 就是一个全局大事务。如果两个都用会造成事务叠加就会报如下错误:
Caused by: java.sql.SQLException: XAER_NOTA: Unknown XID
执行结果:达到预期,订单表插入一条数据,库存扣除。