seata-AT模式
分布式事务seata

1.什么是AT模式
seata的AT模式是一个无侵入的分布式事务。

2.AT模式的工作原理
AT模式是基于一个全局的事务管理器来管理的。例如,在全局事务下,当我们存在A,B两个服务的时候,A和B的提交都会生成一个undo_log的记录,会记录下执行前和执行后的镜像,如果一段提交没有错误的话,则会进行二段提交,如果在全局事务中有异常抛出,则根据undo_log进行回滚。

3.全局事务的标记
在seata中,全局事务会有一个TX_ID来标记。这个主要体现在 io.seata.core.context.RootContext#bind() 。在分布式系统中不管分成多少个子系统,我们用请求生命周期的概念来定义一次请求,就是从页面请求发起,到返回页面结果整个过程是一个请求的生命周期。在seata中也是同样是思路,会有一个TX_ID来标记一个全局事务。例如在通过http调用的模式下,我们可以把这个TX_ID放在header中,在每个服务接受到请求的时候,通过过滤器,将这个TX_ID绑定到当前的RootContext中。此处的RootContext也是基于ThreadLocal来实现的,所以在过滤器的结尾要执行一次解绑。

4.写隔离
在seata的框架下,本地事务提交要先拿到全局锁,拿不到全局锁,不能提交本地事务,拿全局锁的尝试会被限制在一定的范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

5.读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

6.代码示例
6.1 模块
我们暂时使用http直接调用的方式,进行微服务之间的调用。业务流程是下单的同时去库存服务里扣库存。这里我们主要有 at-common,at-order,at-storage三个主要模块。demo依赖 spring boot,mybatis plus
6.2 表结构
这里需要3张表 ,order_tbl 订单表,storage_tbl 库存表

create table order_tbl
(
    id             int auto_increment
        primary key,
    user_id        varchar(255)  null,
    commodity_code varchar(255)  null,
    count          int default 0 null,
    money          int default 0 null
)
    charset = utf8;

create table storage_tbl
(
    id             int auto_increment
        primary key,
    commodity_code varchar(255)  null,
    count          int default 0 null,
    constraint commodity_code
        unique (commodity_code)
)
    charset = utf8;

create table undo_log
(
    id            bigint auto_increment
        primary key,
    branch_id     bigint       not null,
    xid           varchar(100) not null,
    context       varchar(128) not null,
    rollback_info longblob     not null,
    log_status    int          not null,
    log_created   datetime     not null,
    log_modified  datetime     not null,
    ext           varchar(100) null,
    constraint ux_undo_log
        unique (xid, branch_id)
)
    charset = utf8;



6.2 依赖
我们是一个spring boot的工程,所以除了seata-all 之外还要引入seata-spring-boot-starter
注意:如果不用seata-spring-boot-starter 需要自己配置 DataSourceProxy

compile group: 'io.seata', name: 'seata-all', version: '1.3.0'
compile group: 'io.seata', name: 'seata-spring-boot-starter', version: '1.3.0'



6.3 全局配置
根据上面的介绍,seata的事务是有一个全局标记的,这个标记是需要整个请求的生命周期直接流转的。我们在过滤器里获得传过来的值并赋给RootContext.

package com.ms.seata.at.filter;

import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang3.StringUtils;

import io.seata.core.context.RootContext;

@WebFilter(filterName = "seataFilter", urlPatterns = "/*")
public class SeataFilter implements Filter {

    @Override public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) request;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            chain.doFilter(request, response);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

    @Override public void destroy() {

    }
}



在调用其他服务的时候,我们需要把这个标记值传递给下一个服务。示例里使用的是restTemplate来调用其他服务的。所以我们先给restTemplate一个拦截器 SeataRestTemplateInterceptor。

package com.ms.seata.at.interceptor;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;

import io.seata.core.context.RootContext;

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
            throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(request);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return execution.execute(requestWrapper, body);
    }
}



把拦截器加到restTemplate上。

package com.ms.seata.at.conf;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.web.client.RestTemplate;

import com.ms.seata.at.interceptor.SeataRestTemplateInterceptor;

@Configuration
public class SeataRestTemplateAutoConfiguration {

    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;

    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
        return new SeataRestTemplateInterceptor();
    }

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
            Iterator var1 = this.restTemplates.iterator();

            while (var1.hasNext()) {
                RestTemplate restTemplate = (RestTemplate) var1.next();
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }

    }
}



由上可见,其实就是把全局事务的ID通过header赋值,在各个服务之间流转。
具体的使用就是在事务开始的地方加一个 @GlobalTransactional

6.4 测试
首先,我们需要启动seata-server服务(全局事务管理)。然后启动 order,storage服务。
我们的测试流程是 下单->扣库存->插入订单数据。

@GlobalTransactional
@Transactional
public void create(Order order){
    StorageDTO storageDTO = StorageDTO.builder()
            .commodityCode(order.getCommodityCode())
            .count(order.getCount()).build();
    storageClient.deduct(storageDTO);
    orderDAO.save(order);
}

我们在生成订单这里加个断点
image.png

去数据库可以看到undo_log 表有了库存的改变前后的镜像。
image1.png


{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
    "xid": "10.133.8.137:8091:52044195980414976",
    "branchId": 52044201600782336,
    "sqlUndoLogs": [
        "java.util.ArrayList",
        [
            {
                "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
                "sqlType": "UPDATE",
                "tableName": "storage_tbl",
                "beforeImage": {
                    "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
                    "tableName": "storage_tbl",
                    "rows": [
                        "java.util.ArrayList",
                        [
                            {
                                "@class": "io.seata.rm.datasource.sql.struct.Row",
                                "fields": [
                                    "java.util.ArrayList",
                                    [
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "id",
                                            "keyType": "PRIMARY_KEY",
                                            "type": 4,
                                            "value": 1
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "commodity_code",
                                            "keyType": "NULL",
                                            "type": 12,
                                            "value": "2001"
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "count",
                                            "keyType": "NULL",
                                            "type": 4,
                                            "value": 1000
                                        }
                                    ]
                                ]
                            }
                        ]
                    ]
                },
                "afterImage": {
                    "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
                    "tableName": "storage_tbl",
                    "rows": [
                        "java.util.ArrayList",
                        [
                            {
                                "@class": "io.seata.rm.datasource.sql.struct.Row",
                                "fields": [
                                    "java.util.ArrayList",
                                    [
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "id",
                                            "keyType": "PRIMARY_KEY",
                                            "type": 4,
                                            "value": 1
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "commodity_code",
                                            "keyType": "NULL",
                                            "type": 12,
                                            "value": "2001"
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "count",
                                            "keyType": "NULL",
                                            "type": 4,
                                            "value": 999
                                        }
                                    ]
                                ]
                            }
                        ]
                    ]
                }
            },
            {
                "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
                "sqlType": "UPDATE",
                "tableName": "storage_tbl",
                "beforeImage": {
                    "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
                    "tableName": "storage_tbl",
                    "rows": [
                        "java.util.ArrayList",
                        [
                            {
                                "@class": "io.seata.rm.datasource.sql.struct.Row",
                                "fields": [
                                    "java.util.ArrayList",
                                    [
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "id",
                                            "keyType": "PRIMARY_KEY",
                                            "type": 4,
                                            "value": 1
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "commodity_code",
                                            "keyType": "NULL",
                                            "type": 12,
                                            "value": "2001"
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "count",
                                            "keyType": "NULL",
                                            "type": 4,
                                            "value": 999
                                        }
                                    ]
                                ]
                            }
                        ]
                    ]
                },
                "afterImage": {
                    "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
                    "tableName": "storage_tbl",
                    "rows": [
                        "java.util.ArrayList",
                        [
                            {
                                "@class": "io.seata.rm.datasource.sql.struct.Row",
                                "fields": [
                                    "java.util.ArrayList",
                                    [
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "id",
                                            "keyType": "PRIMARY_KEY",
                                            "type": 4,
                                            "value": 1
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "commodity_code",
                                            "keyType": "NULL",
                                            "type": 12,
                                            "value": "2001"
                                        },
                                        {
                                            "@class": "io.seata.rm.datasource.sql.struct.Field",
                                            "name": "count",
                                            "keyType": "NULL",
                                            "type": 4,
                                            "value": 999
                                        }
                                    ]
                                ]
                            }
                        ]
                    ]
                }
            }
        ]
    ]
}



此时库存库中的值已经改变。
image3.png

我们就一直阻塞这里,等一会超时了。再看库存表数据,已经回滚了。
image4.png

至此,AT模式的分布式事务流程算是通了,细节之处,再根据具体问题分析。

暂无评论