当我们拿到数据之后,我们就应该把数据送到需要这些数据的地方,比如mq,database,print 并不局限具体是到哪里。
在flink中 Skin 就是来做这个的,我们可以找到 RichSinkFunction 这个类,我们做的自定义的Skin 都是基于这个类来做的。
比如,在之前的代码中,我们所用的stream.print() ,其实就是一个 Skin 的方法。
找到 DataStream.java 文件下
@PublicEvolving public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out"); }
这里的PrintSinkFunction就是继承于RichSinkFunction
@PublicEvolving public class PrintSinkFunction<IN> extends RichSinkFunction<IN> { private static final long serialVersionUID = 1L; private final PrintSinkOutputWriter<IN> writer; /** * Instantiates a print sink function that prints to standard out. */ public PrintSinkFunction() { writer = new PrintSinkOutputWriter<>(false); } /** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ public PrintSinkFunction(final boolean stdErr) { writer = new PrintSinkOutputWriter<>(stdErr); } /** * Instantiates a print sink function that prints to standard out and gives a sink identifier. * * @param stdErr True, if the format should print to standard error instead of standard out. * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value */ public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); } @Override public void invoke(IN record) { writer.write(record); } @Override public String toString() { return writer.toString(); } }
由代码可以看出,具体的操作是在invoke 这个方法中去做的。
下面,我们来实现一个自己的Skin ,把数据落到mysql中。
先创建一个 flink_article表,并创建 id,title 这两个列。
MysqlSkinFunction.java
package com.moensun.demo.flink.start.skin; import com.moensun.demo.flink.start.model.Article; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * Created by Bane.Shi. * Date: 2019-07-01 * Time: 14:09 */ public class MysqlSkinFunction<T extends Article> extends RichSinkFunction<T> { private PreparedStatement ps=null; private Connection connection=null; String driver = "com.mysql.cj.jdbc.Driver"; String url = "jdbc:mysql://moensundb.mysql.rds.aliyuncs.com:3306/db_ithere?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8"; String username = "dbithere"; String password = "Ithere2018"; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql ="insert into flink_article (id,title) values (?,?)"; ps = connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); if(connection != null){ connection.close(); } if (ps != null){ ps.close(); } } @Override public void invoke(T value, Context context) throws Exception { ps.setLong(1,value.getId()); ps.setString(2,value.getTitle()); ps.executeUpdate(); } public Connection getConnection(){ try { //加载驱动 Class.forName(driver); //创建连接 connection = DriverManager.getConnection(url,username,password); } catch (Exception e) { System.out.println("********mysql get connection occur exception, msg = "+e.getMessage()); e.printStackTrace(); } return connection; } }
下面我们回到之前的MysqlDataSource中
将原来的print 改成 我们自己的skin
public class MysqlDataSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<Article> stream = env.addSource(new ArticleSourceFunction()); // stream.print(); stream.addSink(new MysqlSkinFunction<Article>()); env.execute("MysqlDataSource Example"); } }
执行方法
可见,落库成功。