文章
问答
冒泡
flink 1.14+的自定义Connector Source开发
前言
flink 1.14之后,就提供了新的source架构,并且将在2.0之后抛弃之前的SourceFunction和InputFormat。官方地址 https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/sources/
很多资料都是介绍新架构的优点,运行机制等,但是目前还没找到介绍如何开发自定义Source的案例。大部分工具官方都会开发的connector应该都会跟进升级到新架构,一般来说,我们不需要自己开发,但是对于很多私有化场景总归免不了会遇到需要自定义Source的情况,这里我们就用一个自定义的MysqlSource简单介绍下如何自定义开发Source。
 
官方介绍
一个数据 source 包括三个核心组件:分片(Splits)分片枚举器(SplitEnumerator) 以及 源阅读器(SourceReader)
  • 分片(Split)是对一部分 source 数据的包装,如一个文件或者日志分区。分片是 source 进行任务分配和数据并行读取的基本粒度。
  • 源阅读器(SourceReader)会请求分片并进行处理,例如读取分片所表示的文件或日志分区。SourceReader 在 TaskManagers 上的 SourceOperators 并行运行,并产生并行的事件流/记录流。
  • 分片枚举器(SplitEnumerator) 会生成分片并将它们分配给 SourceReader。该组件在 JobManager 上以单并行度运行,负责对未分配的分片进行维护,并以均衡的方式将其分配给 reader。

开发我们的JdbcSource实现
添加依赖
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.10</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.36</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.36</version>
</dependency>

<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.42</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.32</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.1</version>
</dependency>
工程结构
文件说明
下面按照思路,看看具体实现
  1. 创建一个分片 JdbcSourceSplit,数据读取的分片,我们按照表进行划分
/**
 * 定义一个分片,默认就是根据表进行分
 */
@Data
public class JdbcSourceSplit implements SourceSplit, Serializable {
    private  String splitId;

    public JdbcSourceSplit(String splitId) {
        this.splitId = splitId;
    }

    @Override
    public String splitId() {
        return splitId;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        JdbcSourceSplit that = (JdbcSourceSplit) o;
        return Objects.equals(splitId, that.splitId);
    }

    @Override
    public int hashCode() {
        return Objects.hash(splitId);
    }
}
2. 创建一个类JdbcSourceSplitState,记录分片的状态
/**
 * 记录分片的状态
 */
public class JdbcSourceSplitState {
    private final JdbcSourceSplit split;

    public JdbcSourceSplitState(JdbcSourceSplit split){
        this.split = split;
    }

    public JdbcSourceSplit toJdbcSourceSplit(){
        return split;
    }

}
3. 分片序列化类JdbcSourceSplitSerializer, 由于在作业中传递分片信息都是需要序列化和分序列化的,这里我们定义一个序列化处理器,里面的序列化和分序列化用的是fastjson2实现的,主要是满足对byte[]的转换。
/**
 * 分片序列化类
 */
public class JdbcSourceSplitSerializer implements SimpleVersionedSerializer<JdbcSourceSplit> {
    public static final JdbcSourceSplitSerializer INSTANCE = new JdbcSourceSplitSerializer();
    @Override
    public int getVersion() {
        return 0;
    }

    @Override
    public byte[] serialize(JdbcSourceSplit obj) throws IOException {
        return JSON.toJSONBytes(obj);
    }

    @Override
    public JdbcSourceSplit deserialize(int version, byte[] serialized) throws IOException {
        return JSON.parseObject(serialized, JdbcSourceSplit.class);
    }
}
4. 定义反序列化接口JdbcDeserializationSchema,读取到的数据类型和最终返回的数据类型可能是不一样的,最终需要进行反序列化之后才返回。
/**
 * 输出反序列化的接口
 *
 * @param <T>
 */
public interface JdbcDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    T deserialize(Row record) throws Exception;

}
5. JdbcRecordEmitter 行数据发射器
/**
 * 行数据发射器
 * 工作主要就是用来做反序列化,将外部系统当中的数据格式转换成下游当中的一些数据格式。
 *
 * @param <T>
 */
public class JdbcRecordEmitter<T> implements RecordEmitter<Row, T, JdbcSourceSplitState> {
    private final JdbcDeserializationSchema<T> deserializationSchema;

    public JdbcRecordEmitter(JdbcDeserializationSchema<T> deserializationSchema) {
        this.deserializationSchema = deserializationSchema;
    }

    /**
     * 发射数据
     * 得到的数据进行反序列化,然后,加入到 SourceOutput 对象中,由于后续的操作
     */
    @Override
    public void emitRecord(Row element, SourceOutput<T> output, JdbcSourceSplitState splitState) throws Exception {
        T deserialize = deserializationSchema.deserialize(element);
        if (deserialize != null) {
            output.collect(deserialize);
        }
    }

}
6. 源分片读取器,执行分片的读取操作
/**
 * 分片读取,读取数据库中表的数据
 */
@Slf4j
public class JdbcSourceSplitReader implements SplitReader<Row, JdbcSourceSplit> {
    private JdbcConnOptions connectionOptions;
    private boolean closed = false;
    private boolean finished = false;
    private JdbcSourceSplit currentSplit;
    private Connection connection;

    public JdbcSourceSplitReader(
            JdbcConnOptions connectionOptions
    ) {
        this.connectionOptions = connectionOptions;
    }

    /**
     * 读取数据,读取完之后,标记分片完成
     *
     * @return
     * @throws IOException
     */
    @Override
    public RecordsWithSplitIds<Row> fetch() throws IOException {
        log.info("fetch data");
        if (closed) {
            throw new IllegalStateException("Cannot fetch records from a closed split reader");
        }
        RecordsBySplits.Builder<Row> builder = new RecordsBySplits.Builder<>();
        if (currentSplit == null) {
            return builder.build();
        }
        try {
            connection = createConnection();
            ResultSet resultSet = connection.createStatement().executeQuery("select * from " + connectionOptions.getTable());
            while (resultSet.next()) {
                ResultSetMetaData metaData = resultSet.getMetaData();
                Row row = new Row(metaData.getColumnCount());
                for (int i = 0; i < row.getArity(); i++) {
                    String columnName = metaData.getColumnName(i + 1);
                    Object value = resultSet.getObject(i + 1);
                    row.setField(i, value); // 注意Row的索引是从0开始的
                }
                builder.add(currentSplit, row);
            }

            builder.addFinishedSplit(currentSplit.splitId());
            finished = true;
            return builder.build();
        } catch (Exception e) {
            throw new IOException("Scan records form db failed", e);
        } finally {
            if (finished) {
                closeConnection();
            }
        }

    }

    /**
     * 处理新增的分片
     *
     * @param splitsChanges
     */
    @Override
    public void handleSplitsChanges(SplitsChange<JdbcSourceSplit> splitsChanges) {
        log.debug("Handle split changes {}", splitsChanges);

        if (!(splitsChanges instanceof SplitsAddition)) {  //如果不是新增分片则报异常
            throw new UnsupportedOperationException(
                    String.format(
                            "The SplitChange type of %s is not supported.",
                            splitsChanges.getClass()));
        }

        JdbcSourceSplit sourceSplit = splitsChanges.splits().get(0);
        if (!(sourceSplit instanceof JdbcSourceSplit)) {
            throw new UnsupportedOperationException(
                    String.format(
                            "The SourceSplit type of %s is not supported.",
                            sourceSplit.getClass()));
        }

        this.currentSplit = (JdbcSourceSplit) sourceSplit;
        this.finished = false;
    }

    @Override
    public void wakeUp() {

    }

    @Override
    public void close() throws Exception {
        this.closed = true;
        closeConnection();
    }

    public Connection createConnection() throws SQLException {
        if (connection != null) {
            return connection;
        }
        return DriverManager.getConnection(connectionOptions.getUrl(), connectionOptions.getUsername(), connectionOptions.getPassword());
    }

    public void closeConnection() {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                log.error("close connection failed", e);
            }
        }
    }

}
7. JdbcSourceReaderContext 源读取器上下文,可以对SourceReaderContext进行一些封装
public class JdbcSourceReaderContext implements SourceReaderContext {
    private final SourceReaderContext readerContext;
    private final AtomicInteger readCount = new AtomicInteger(0);

    public JdbcSourceReaderContext(SourceReaderContext readerContext) {
        this.readerContext = readerContext;
    }
    @Override
    public SourceReaderMetricGroup metricGroup() {
        return readerContext.metricGroup();
    }

    @Override
    public Configuration getConfiguration() {
        return readerContext.getConfiguration();
    }

    @Override
    public String getLocalHostName() {
        return readerContext.getLocalHostName();
    }

    @Override
    public int getIndexOfSubtask() {
        return readerContext.getIndexOfSubtask();
    }

    @Override
    public void sendSplitRequest() {
        readerContext.sendSplitRequest();
    }

    @Override
    public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
        readerContext.sendSourceEventToCoordinator(sourceEvent);
    }

    @Override
    public UserCodeClassLoader getUserCodeClassLoader() {
        return readerContext.getUserCodeClassLoader();
    }
}
8. JdbcSourceReader 源读取器,官方推荐继承SourceReaderBase实现,可以减少很多重复工作。
@Slf4j
public class JdbcSourceReader<OUT> extends SingleThreadMultiplexSourceReaderBase<Row, OUT, JdbcSourceSplit, JdbcSourceSplitState> {

    public JdbcSourceReader(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<Row>> elementQueue,
            Supplier<SplitReader<Row, JdbcSourceSplit>> splitReaderSupplier,
            RecordEmitter<Row, OUT, JdbcSourceSplitState> recordEmitter,
            JdbcSourceReaderContext context
    ) {
        super(
                elementQueue,
                new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier),
                recordEmitter,
                context.getConfiguration(),
                context
        );
    }

    @Override
    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            context.sendSplitRequest();
        }
    }

    @Override
    protected void onSplitFinished(Map<String, JdbcSourceSplitState> finishedSplitIds) {
        for (JdbcSourceSplitState splitState : finishedSplitIds.values()) {
            JdbcSourceSplit sourceSplit = splitState.toJdbcSourceSplit();
            log.info("Split {} is finished.", sourceSplit.splitId());
        }
        context.sendSplitRequest();
    }

    /**
     * 初始化分片状态
     * @param split a newly added split.
     * @return
     */
    @Override
    protected JdbcSourceSplitState initializedState(JdbcSourceSplit split) {
        return new JdbcSourceSplitState(split);
    }

    @Override
    protected JdbcSourceSplit toSplitType(String splitId, JdbcSourceSplitState splitState) {
        return splitState.toJdbcSourceSplit();
    }
}
9.JdbcSourceEnumState 定义一个状态类,用于记录处理状态
/**
 * 定义一个状态类,用于记录处理状态
 */
@Data
public class JdbcSourceEnumState implements Serializable {

    private  List<String> remainingTables; //剩余待处理的表

    private  List<String> alreadyProcessedTables; //已经处理过的表

    private  List<JdbcSourceSplit> remainingSplits; //剩余待处理的split
    private  Map<String,JdbcSourceSplit> assignedSplits; //分配的split

    private final boolean initialized;

    public JdbcSourceEnumState(List<String> remainingTables, List<String> alreadyProcessedTables, List<JdbcSourceSplit> remainingSplits, Map<String,JdbcSourceSplit> assignedSplits, boolean initialized) {
        this.remainingTables = remainingTables;
        this.alreadyProcessedTables = alreadyProcessedTables;
        this.remainingSplits = remainingSplits;
        this.assignedSplits = assignedSplits;
        this.initialized = initialized;
    }

    public List<String> getRemainingTables() {
        return remainingTables;
    }
    public List<String> getAlreadyProcessedTables() {
        return alreadyProcessedTables;
    }
    public List<JdbcSourceSplit> getRemainingSplits() {
        return remainingSplits;
    }
    public Map<String,JdbcSourceSplit> getAssignedSplits() {
        return assignedSplits;
    }
    public boolean isInitialized() {
        return initialized;
    }

    public static JdbcSourceEnumState initialState(){
        return new JdbcSourceEnumState(new ArrayList<>(),new ArrayList<>(),new ArrayList<>(),new HashMap<>(),false);
    }
}
10.JdbcSourceEnumStateSerializer 分片枚举起状态序列化工具
/**
 * 分片枚举器状态序列化类
 */
public class JdbcSourceEnumStateSerializer implements SimpleVersionedSerializer<JdbcSourceEnumState> {

    public static final JdbcSourceEnumStateSerializer INSTANCE = new JdbcSourceEnumStateSerializer();

    @Override
    public int getVersion() {
        return 0;
    }

    @Override
    public byte[] serialize(JdbcSourceEnumState obj) throws IOException {
        return JSON.toJSONBytes(obj);
    }

    @Override
    public JdbcSourceEnumState deserialize(int version, byte[] serialized) throws IOException {
        return JSON.parseObject(serialized, JdbcSourceEnumState.class);
    }
}
11. JdbcSplitAssigner 分片的实际执行类
/**
 * 分片实际执行
 */
@Slf4j
public class JdbcSplitAssigner {
    private final JdbcConnOptions connectionOptions;
    private final LinkedList<String> remainingTables;
    private final List<String> alreadyProcessedTables;
    private final LinkedList<JdbcSourceSplit> remainingSplits;
    private final Map<String, JdbcSourceSplit> assignedSplits;
    private boolean initialized;

    /**
     * 分片枚举器的指定执行器(实际执行)
     * @param connectionOptions
     * @param sourceEnumState
     */
    public JdbcSplitAssigner(
            JdbcConnOptions connectionOptions,
            JdbcSourceEnumState sourceEnumState) {
        this.connectionOptions = connectionOptions;
        this.remainingTables = new LinkedList<>(sourceEnumState.getRemainingTables());
        this.remainingSplits = new LinkedList<>(sourceEnumState.getRemainingSplits());
        this.alreadyProcessedTables = sourceEnumState.getAlreadyProcessedTables();
        this.assignedSplits = sourceEnumState.getAssignedSplits();
        this.initialized = sourceEnumState.isInitialized();
    }

    public void open() {
        log.info("jdbc scan split assigner is opening.");
        if (!initialized) {
            String tableId =
                    String.format(
                            "%s.%s",
                            connectionOptions.getDatabase(), connectionOptions.getTable());
            remainingTables.add(tableId);
            this.initialized = true;
        }
    }

    public Optional<JdbcSourceSplit> getNext() {
        if (!remainingSplits.isEmpty()) {
            JdbcSourceSplit split = remainingSplits.poll();
            assignedSplits.put(split.splitId(), split);
            return Optional.of(split);
        } else {
            String nextTable = remainingTables.poll();
            if (nextTable != null) {
                JdbcSourceSplit split = new JdbcSourceSplit(nextTable);
                remainingSplits.add(split);
                alreadyProcessedTables.add(nextTable);
                return getNext();
            } else {
                return Optional.empty();
            }
        }
    }

    public void addSplitsBack(Collection<JdbcSourceSplit> splits) {
        for (JdbcSourceSplit split : splits) {
            remainingSplits.add(split);
            assignedSplits.remove(split.splitId());
        }
    }

    public JdbcSourceEnumState snapshotState(long checkpointId) {
        return new JdbcSourceEnumState(
                remainingTables,
                alreadyProcessedTables,
                remainingSplits,
                assignedSplits,
                initialized);
    }

    public boolean noMoreSplits() {
        checkState(initialized, "The noMoreSplits method was called but not initialized.");
        return remainingTables.isEmpty() && remainingSplits.isEmpty();
    }

    public void close() {

    }
}
12. JdbcSourceEnumerator 分片枚举器
/**
 * 分片枚举器
 */
@Slf4j
public class JdbcSourceEnumerator implements SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumState> {
    private final Boundedness boundedness;
    private final SplitEnumeratorContext<JdbcSourceSplit> context;
    private final JdbcSplitAssigner splitAssigner;
    private final TreeSet<Integer> readersAwaitingSplit;

    public JdbcSourceEnumerator(
            Boundedness boundedness,
            SplitEnumeratorContext<JdbcSourceSplit> context,
            JdbcSplitAssigner splitAssigner
    ) {
        this.boundedness = boundedness;
        this.context = context;
        this.splitAssigner = splitAssigner;
        this.readersAwaitingSplit = new TreeSet<>();
    }

    @Override
    public void start() {
        splitAssigner.open();
    }

    /**
    * 处理分片请求
    */
    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!context.registeredReaders().containsKey(subtaskId)) {
            // reader failed between sending the request and now. skip this request.
            return;
        }

        readersAwaitingSplit.add(subtaskId);
        assignSplits();
    }

    @Override
    public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
        log.debug("Jdbc Source Enumerator adds splits back: {}", splits);
        splitAssigner.addSplitsBack(splits);
    }

    @Override
    public void addReader(int subtaskId) {
        log.debug("Adding reader {} to JdbcSourceEnumerator.", subtaskId);
    }

    @Override
    public JdbcSourceEnumState snapshotState(long checkpointId) throws Exception {
        return splitAssigner.snapshotState(checkpointId);
    }

    @Override
    public void close() throws IOException {
        splitAssigner.close();
    }
    
    /**
     * 处理分片,这里主要以表作为分片依据
     */
    private void assignSplits() {
        final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            if (!context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }
            if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) {
                context.signalNoMoreSplits(nextAwaiting);
                awaitingReader.remove();
                log.info(
                        "All scan splits have been assigned, closing idle reader {}", nextAwaiting);
                continue;
            }
            Optional<JdbcSourceSplit> split = splitAssigner.getNext();
            if (split.isPresent()) {
                JdbcSourceSplit jdbcSourceSplit = split.get();
                context.assignSplit(jdbcSourceSplit, nextAwaiting);
                awaitingReader.remove();
                log.info("Assign split {} to subtask {}", jdbcSourceSplit, nextAwaiting);
                break;
            } else {
                break;
            }
        }
    }
}
13. JdbcSource 源阅读器
public class JdbcSource<OUT> implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumState>, ResultTypeQueryable<OUT> {
    private final JdbcConnOptions connectionOptions;
    private final Boundedness boundedness;
    private final JdbcDeserializationSchema<OUT> jdbcDeserializationSchema;

    public JdbcSource(
            JdbcConnOptions connectionOptions,
            JdbcDeserializationSchema<OUT> jdbcDeserializationSchema
    ) {
        this.connectionOptions = connectionOptions;
        this.jdbcDeserializationSchema = jdbcDeserializationSchema;
        this.boundedness = Boundedness.BOUNDED;
    }

    @Override
    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    /**
     * 创建分片枚举器
     *
     * @param enumContext The {@link SplitEnumeratorContext context} for the split enumerator.
     * @return
     * @throws Exception
     */
    @Override
    public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumState> createEnumerator(SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
        JdbcSourceEnumState sourceEnumState = JdbcSourceEnumState.initialState();
        JdbcSplitAssigner splitAssigner = new JdbcSplitAssigner(connectionOptions, sourceEnumState);
        return new JdbcSourceEnumerator(this.boundedness, enumContext, splitAssigner);
    }

    /**
     * 重置分片枚举器
     */
    @Override
    public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumState> restoreEnumerator(SplitEnumeratorContext<JdbcSourceSplit> enumContext, JdbcSourceEnumState checkpoint) throws Exception {
        JdbcSourceEnumState sourceEnumState = JdbcSourceEnumState.initialState();
        JdbcSplitAssigner splitAssigner = new JdbcSplitAssigner(connectionOptions, sourceEnumState);
        return new JdbcSourceEnumerator(this.boundedness, enumContext, splitAssigner);
    }

    /**
     * 分片的序列化器
     *
     * @return
     */
    @Override
    public SimpleVersionedSerializer<JdbcSourceSplit> getSplitSerializer() {
        return JdbcSourceSplitSerializer.INSTANCE;
    }

    /**
     * 处理状态的序列化器
     *
     * @return
     */
    @Override
    public SimpleVersionedSerializer<JdbcSourceEnumState> getEnumeratorCheckpointSerializer() {
        return JdbcSourceEnumStateSerializer.INSTANCE;
    }

    /**
     * 创建源阅读器
     *
     */
    @Override
    public SourceReader<OUT, JdbcSourceSplit> createReader(SourceReaderContext readerContext) throws Exception {
        FutureCompletingBlockingQueue<RecordsWithSplitIds<Row>> elementsQueue =
                new FutureCompletingBlockingQueue<>();
        JdbcSourceReaderContext jdbcSourceReaderContext = new JdbcSourceReaderContext(readerContext);
        Supplier<SplitReader<Row, JdbcSourceSplit>> splitReaderSupplier = () -> new JdbcSourceSplitReader(
                connectionOptions
        );
        return new JdbcSourceReader<>(elementsQueue, splitReaderSupplier, new JdbcRecordEmitter<>(jdbcDeserializationSchema), jdbcSourceReaderContext);
    }

    @Override
    public TypeInformation<OUT> getProducedType() {
        return jdbcDeserializationSchema.getProducedType();
    }
}
 
到这里,Source 代码层面算是开发完成了。
 
测试
下面我们做个测试,从一个表向另外一个表迁移数据。
1.创建表user
create table user
(
    id varchar(64) null
);
然后随便插入两条数据1,2
再创建一个同样结构的user_sink表
2. 行数据的反序列化实现类
/**
 * 行数据的反序列化实现类
 */
private static class JdbcJsonDeserializationSchema implements JdbcDeserializationSchema<Row> {

    @Override
    public TypeInformation<Row> getProducedType() {
        return new RowTypeInfo(Types.STRING);
    }

    @Override
    public Row deserialize(Row record) {
        return record;
    }
}
3. 测试代码
添加依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.2-1.18</version>
</dependency>

代码实现

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    JdbcSource<Row> jdbcSource = new JdbcSource<>(
            JdbcConnOptions.builder()
                    .url("jdbc:mysql://xxxx:3306/flink-demo")
                    .username("flink_demo")
                    .password("flink_demo")
                    .database("flink-demo")
                    .table("user")
                    .build(),
            new JdbcJsonDeserializationSchema());
    DataStream<Row> dataStream = env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "jdbc sources");
    dataStream.addSink(JdbcSink.sink(
            "insert into user_sink (id) values (?)",
            (ps, t) -> {
                ps.setString(1,t.getFieldAs(0));
            },
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://xxxx:3306/flink-demo")
                    .withUsername("flink_demo")
                    .withPassword("flink_demo")
                    .withDriverName("com.mysql.cj.jdbc.Driver")
                    .build()
    ));
    env.execute();
}
4. 运行结果
顺利将user表中的数据同步到user_sink表
 
以上就是一个自定义Source的实现,具体的细节可以通过阅读flink源码进行深入详细的了解。
flink
connector

关于作者

落雁沙
非典型码农
获得点赞
文章被阅读