文章
问答
冒泡
flink之Skin

当我们拿到数据之后,我们就应该把数据送到需要这些数据的地方,比如mq,database,print 并不局限具体是到哪里。

在flink中 Skin 就是来做这个的,我们可以找到 RichSinkFunction 这个类,我们做的自定义的Skin 都是基于这个类来做的。

比如,在之前的代码中,我们所用的stream.print() ,其实就是一个 Skin 的方法。

找到 DataStream.java 文件下

@PublicEvolving
public DataStreamSink<T> print() {
   PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
   return addSink(printFunction).name("Print to Std. Out");
}

这里的PrintSinkFunction就是继承于RichSinkFunction


@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {

   private static final long serialVersionUID = 1L;

   private final PrintSinkOutputWriter<IN> writer;

   /**
    * Instantiates a print sink function that prints to standard out.
    */
   public PrintSinkFunction() {
      writer = new PrintSinkOutputWriter<>(false);
   }

   /**
    * Instantiates a print sink function that prints to standard out.
    *
    * @param stdErr True, if the format should print to standard error instead of standard out.
    */
   public PrintSinkFunction(final boolean stdErr) {
      writer = new PrintSinkOutputWriter<>(stdErr);
   }

   /**
    * Instantiates a print sink function that prints to standard out and gives a sink identifier.
    *
    * @param stdErr True, if the format should print to standard error instead of standard out.
    * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
    */
   public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
      writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
   }

   @Override
   public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
      writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
   }

   @Override
   public void invoke(IN record) {
      writer.write(record);
   }

   @Override
   public String toString() {
      return writer.toString();
   }
}


由代码可以看出,具体的操作是在invoke 这个方法中去做的。


下面,我们来实现一个自己的Skin ,把数据落到mysql中。

先创建一个 flink_article表,并创建 id,title 这两个列。


MysqlSkinFunction.java

package com.moensun.demo.flink.start.skin;

import com.moensun.demo.flink.start.model.Article;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Created by Bane.Shi.
 * Date: 2019-07-01
 * Time: 14:09
 */
public class MysqlSkinFunction<T extends Article> extends RichSinkFunction<T> {

    private PreparedStatement ps=null;
    private Connection connection=null;
    String driver = "com.mysql.cj.jdbc.Driver";
    String url = "jdbc:mysql://moensundb.mysql.rds.aliyuncs.com:3306/db_ithere?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8";
    String username = "dbithere";
    String password = "Ithere2018";

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql ="insert into flink_article (id,title) values (?,?)";
        ps = connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(connection != null){
            connection.close();
        }
        if (ps != null){
            ps.close();
        }
    }

    @Override
    public void invoke(T value, Context context) throws Exception {
        ps.setLong(1,value.getId());
        ps.setString(2,value.getTitle());
        ps.executeUpdate();
    }


    public Connection getConnection(){
        try { //加载驱动
            Class.forName(driver); //创建连接
            connection = DriverManager.getConnection(url,username,password);
        }
        catch (Exception e) {
            System.out.println("********mysql get connection occur exception, msg = "+e.getMessage());
            e.printStackTrace();
        }
        return connection;
    }
}


下面我们回到之前的MysqlDataSource中

将原来的print 改成 我们自己的skin

public class MysqlDataSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        DataStream<Article> stream = env.addSource(new ArticleSourceFunction());
      //  stream.print();
        stream.addSink(new MysqlSkinFunction<Article>());
        env.execute("MysqlDataSource Example");
    }
}

执行方法

可见,落库成功。

flink

关于作者

落雁沙
非典型码农
获得点赞
文章被阅读