文章
问答
冒泡
一、Flink概述

Flink是什么

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。相比于Apache Spark,Flink主要用来处理流式数据,Spark主要用来处理批式数据。

Flink的主要特点

事件驱动型(event-driven)

基于流的处理

分层API

低延迟

高吞吐

结果的准确性和良好的容错性

Flink中的Api

  • Flink API最底层的抽象为有状态的实时流处理,其对应的API抽象实现即为ProcessFunction

  • Flink API第二层的API为核心API,我们会经常使用到这一层的API,这其中包含DataStream API(应用于有界/无界数据流场景)、DataSetAPI(应用于有界数据流场景)。

  • Flink API第三层抽象的Table API是一套内嵌在Java和Scala语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询,Table API提供了类似于关系模型中的操作,比如select、join、group-by等等。Table API可以与DataStream API/DataSet API混合使用。

  • Flink API最顶层抽象的Flink SQL基于实现了SQL标准,可以通过书写纯SQL的方式来进行数据的处理。

简单示例

这里是一个流处理Demo,基于scala语言,演示的是一个最基本的Demo,统计单词数量。

Requirements

  • Maven 3.0.4 (or higher)
  • Java 11

Step

1)创建一个maven项目,pom.xml配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wy</groupId>
    <artifactId>flink-maven-scala-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>

        <flink.version>1.15.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 编译Scala代码为class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.5.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2)创建源文件目录src/main/scala

3)创建StreamWordCount.scala

package com.wy.wc

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
 * 流处理wordCount
 */
object StreamWordCount {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        val inputDataStream:DataStream[String] = env.socketTextStream("192.168.10.6", 7777)

        val resultDataStream:DataStream[(String, Int)] = inputDataStream
          .flatMap(_.split(" "))
          .map((_, 1))
          .keyBy(_._1)
          .sum(1)

        resultDataStream.print()

        env.execute("wordCountJob")
    }
}

4)linux环境上启动nc命令

nc -lk 7777

5)运行文件StreamWordCount.scala

6)使用nc进行测试

nc输入:

hello world
hello china
hello jiangsu
hello suzhou

控制台打印:

=======================================

Flink批处理示例

Flink也可以进行数据的批处理。接上面的流处理Demo的环境。

在src/main/resources下创建一个文本文件wordcount.txt。内容如下:

hello world
hello china
hello jiangsu
hello suzhou

然后再创建WordCount.scala

package com.wy.wc

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

/**
 * 批处理word count
 */
object WordCount {
    def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment

        val inputPath = "D:\\IdeaWorkspace\\flink-maven-scala-demo\\src\\main\\resources\\wordcount.txt"
        val inputDataSet:DataSet[String] = env.readTextFile(inputPath)

        var resultDataSet:DataSet[(String, Int)] = inputDataSet
          .flatMap(_.split(" "))
          .map((_, 1))
          .groupBy(0)
          .sum(1)

        resultDataSet.print()
    }
}

运行,控制台打印:

(world,1)
(hello,4)
(suzhou,1)
(china,1)
(jiangsu,1)

flink
scala

关于作者

justin
123456
获得点赞
文章被阅读