记录一个从事件日志表抽取到报表的实现

在业务场景下,我们要把业务的操作记录最好汇总成报表。比如某个单据的修改人,修改时间,提交人,提交时间等。
最简单的做法就是直接一张表,每次有事件直接更新进去,例如这样

字段类型
idint
business_idint
submit_attimestamp
submit_byint
modify_attimestamp
modify_byint

但是,直接更新有一个问题,就是历史记录会丢失,并且如果并发修改多个字段,会因为锁表导致修改失败。
那么我们调整下方案,做一个日志记录表。然后通过job把时间记录聚合到报表。

字段类型
idbigint
event_typevarchar
event_attimestamp
event_byint

这样做最大的好处就是 数据都是新增的 不存在锁表的问题,不用担心并发。
再加一个执行记录表,记录当前的最大的id

字段类型
idint
event_idbigint
job_exec_attimestamp



上代码
image.png

BusinessStack

@Data
@SuperBuilder
public class BusinessStack {
    private Integer id;
    private Date submitAt;
    private Integer submitBy;
    private Date modifyAt;
    private Integer modifyBy;

    @Override public boolean equals(Object o) {
        if (this == o)
            return true;

        if (o == null || getClass() != o.getClass())
            return false;

        BusinessStack that = (BusinessStack) o;

        return new EqualsBuilder()
                .append(id, that.id)
                .isEquals();
    }

    @Override public int hashCode() {
        return new HashCodeBuilder(17, 37)
                .append(id)
                .toHashCode();
    }
}



EventLog

@Data
@SuperBuilder
public class EventLog {
    private Long id;
    private Integer businessId;
    private String eventType;
    private Date eventAt;
    private Integer eventBy;
}



EventTrigger

@Data
@SuperBuilder
public class EventTrigger {
    private Integer id;
    private Long eventLogId;
    private Date jobExecAt;
}



EventLogService

@AllArgsConstructor
@Service
public class EventLogService {

    private final JdbcTemplate jdbcTemplate;
    /**
     * 分页查询ID之后的数据
     */
    public List<EventLog> queryEventLogs( Long id,int limit){
        String sql = "select * from event_log where id>? order id asc limit ? ";
        RowMapper<EventLog> rowMapper=new BeanPropertyRowMapper<>(EventLog.class);
        return jdbcTemplate.query(sql,new Object[]{id,limit},rowMapper);
    }
}



EventTriggerService

@AllArgsConstructor
@Service
public class EventTriggerService {
    private final JdbcTemplate jdbcTemplate;
    public EventTrigger queryLatestOne(){
        String sql = "select * from event_trigger limit 1";
        RowMapper<EventTrigger> rowMapper=new BeanPropertyRowMapper<>(EventTrigger.class);
        return jdbcTemplate.queryForObject(sql,rowMapper);
    }
}



BussinessStackService

@AllArgsConstructor
@Service
public class BusinessStackService {
    private final JdbcTemplate jdbcTemplate;

    public List<BusinessStack> queryByIds(List<Integer> ids){
        String sql ="select * from business_stack where id in (?)";
        RowMapper<BusinessStack> rowMapper=new BeanPropertyRowMapper<>(BusinessStack.class);
        return jdbcTemplate.query(sql,new Object[]{ids},rowMapper);
    }

    @Transactional
    public void sync(Long id,List<BusinessStack> inserts,List<BusinessStack> updates){
        String sql = "update event_trigger set event_log_id=?,job_exec_at=? where event_log_id<?";
        Date date = (Date) Date.from(Instant.now());
        int res = jdbcTemplate.update(sql,new Object[]{id,date,id});
        if(res==0){
            return;
        }
        if(!CollectionUtils.isEmpty(inserts)){
            insertBatch(inserts);
        }
        if(!CollectionUtils.isEmpty(updates)){
           updateBatch(updates);
        }
    }

    public void insertBatch(List<BusinessStack> businessStacks){
        String sql = "insert into business_stack(submit_at,submit_by,modify_at,modify_by) values (?,?,?,?)";
        jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
            @Override public void setValues(PreparedStatement ps, int i) throws SQLException {
                BusinessStack businessStack = businessStacks.get(i);
                ps.setDate(1, (Date) businessStack.getSubmitAt());
                ps.setInt(2,businessStack.getSubmitBy());
                ps.setDate(3, (Date) businessStack.getModifyAt());
                ps.setInt(4,businessStack.getModifyBy());
            }

            @Override public int getBatchSize() {
                return businessStacks.size();
            }
        });
    }

    public void updateBatch(List<BusinessStack> businessStacks){
        String sql = "update business_stack set submit_at=?,submit_by=?,modify_at=?,modify_by=? where id=?";
        jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
            @Override public void setValues( PreparedStatement ps, int i) throws SQLException {
                BusinessStack businessStack = businessStacks.get(i);
                ps.setDate(1, (Date) businessStack.getSubmitAt());
                ps.setInt(2,businessStack.getSubmitBy());
                ps.setDate(3, (Date) businessStack.getModifyAt());
                ps.setInt(4,businessStack.getModifyBy());
                ps.setInt(5,businessStack.getId());
            }

            @Override public int getBatchSize() {
                return businessStacks.size();
            }
        });
    }

}



JobSchedule

@AllArgsConstructor
public class JobSchedule {

    private EventLogService eventLogService;
    private EventTriggerService eventTriggerService;
    private BusinessStackService businessStackService;

    public void job() {
        int limit = 100;
        int errorCount = 0;
        boolean hasNext = true;
        while (hasNext) {
            try{
                EventTrigger eventTrigger = eventTriggerService.queryLatestOne();
                if (Objects.isNull(eventTrigger)) {
                    break;
                }
                List<EventLog> eventLogs = eventLogService.queryEventLogs(eventTrigger.getEventLogId(), limit);
                if(CollectionUtils.isEmpty(eventLogs)){
                    hasNext = false;
                    break;
                }
                if(eventLogs.size()<limit){
                    hasNext = false;
                }
                Long maxId = eventLogs.get(eventLogs.size()-1).getId();
                Map<Integer, BusinessStack> map = assemble(eventLogs);
                List<BusinessStack> businessStacks = businessStackService.queryByIds(Lists.newArrayList(map.keySet()));
                List<BusinessStack> inserts = Lists.newArrayList();
                inserts.addAll(map.values());
                List<BusinessStack> updates = Lists.newArrayList();
                updates.addAll(map.values());
                inserts.retainAll(businessStacks) ;//
                updates.removeAll(inserts);
                businessStackService.sync(maxId,inserts,updates);
            } catch (Exception e){
                errorCount++;
                if(errorCount>10){
                    hasNext = false;
                }
            }
        }

    }

    private Map<Integer, BusinessStack> assemble(List<EventLog> eventLogs) {
        if (CollectionUtils.isEmpty(eventLogs)) {
            return new HashMap<>();
        }
        Map<Integer, BusinessStack> map = Maps.newHashMap();
        eventLogs.forEach(eventLog -> {
            BusinessStack businessStack = map.get(eventLog.getBusinessId());
            if (Objects.nonNull(businessStack)) {
                fillValues(businessStack, eventLog);
            } else {
                businessStack = BusinessStack.builder().id(eventLog.getBusinessId()).build();
                fillValues(businessStack, eventLog);
                map.put(eventLog.getBusinessId(), businessStack);
            }
        });
        return map;
    }

    public void fillValues(BusinessStack businessStack, EventLog eventLog) {
        switch (eventLog.getEventType()) {
            case "SUBMIT":
                businessStack.setSubmitAt(eventLog.getEventAt());
                businessStack.setSubmitBy(eventLog.getEventBy());
                break;
            case "MODIFY":
                businessStack.setModifyAt(eventLog.getEventAt());
                businessStack.setModifyBy(eventLog.getEventBy());
                break;
            default:
                break;
        }
    }

}





然后在Job上去加个schedule就定时去执行就可以了。

有人会萌萌的问,为什么不在数据库直接做好聚合,而要去内存中处理?那么问题来了,聚合就要group by ,而group by 就要先排序,排序就要全表扫描,而这种日志表一般又是很大的,会占用数据库很多资源。另外,这种处理方式的理念有点类似flink的time window,是滚动处理的。每次读取的数据不大,对应用的影响的很小的,而且,现在的应用部署,不管是基于k8s 还是用守护进程,即时极端情况出现了问题,也是经常重启而已,用户是无感的,但是你把数据库拖慢了,用户是有感知的。所以,为什么要了解运维。

还有人萌萌的说,发生并发了怎么办?首先,如果是分布式的应用,可以考虑加个分布式锁,另外,event_trigger表的event_log_id这个字段,在正常情况下是会呈增长趋势的。在更新的时候,可以把这个字段当成乐观锁来用。不是一定要叫version才是乐观锁,水煮蛋会吃,茶叶蛋就不会吃了?

当然,这个方案只是一个简单实现,如果不考虑历史记录,用个消息队列,都解决了。




暂无评论