文章
问答
冒泡
二、Flink部署(Deployment)之Standalone

概述

上图展示的是每个Flink集群包含的模块。

组件(Component) 作用、目的(Purpose) 实现(Implementations)
Flink Client 将应用转换成作业图(JobGraph),并将它提交到JobManager Command Line、REST Endpoint、SQL Clinet、Python REPL
JobManager Flink的核心组件,用于协调各个作业的运行,生成作业图( JobGraph) Standalone、Kubernetes、YARN
TaskManager 执行Flink作业。  
  扩展组件(External Components)  
高可用的JobManager Flink核心组件,用于协调各个作业的资源,针对不同的Resource Provider,有不同的实现,这些实现在高可用性、资源分配行为和支持的作业提交模式上又有所区分。 Zookeeper、Kubernetes HA
文件存储和持久化(File Storage and Persistency) 检查点机制(流作业的恢复机制),依赖外部文件存储系统  
Resource Provider 资源管理器,管理集群资源,针对 YARN、Kubernetes和standalone deployments都有不同的实现。  
Metrics Storage Flink组件的内部指标报告  
Application-level data sources and sinks 通过各种各样的连接器(Connectors)实现  

部署方式(Deployment Modes)

Flink可以按以下三种方式进行部署。

  • Application Mode

  • Session Mode

  • Per-Job Mode(废弃)

Application Mode

一个专有的JobManager会在提交作业的时候被创建,这个JobManager只负责执行这个Job,待Job执行完成之后退出。Flink应用运行在JobManager上。

应用的main()方法在JobManager上执行。

此模式下会为每一个提交的应用创建一个集群。

1)生命周期

以Flink应用程序执行作业,提交作业之前无需启动Flink集群,生命周期与Flink应用程序的生命周期相关。

2)资源隔离级别

在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

3)适用的作业

长期运行、高稳定性要求、对较长的启动时间不敏感的大型作业。

Session Mode

多个作业共享一个JobManager。

应用的main()方法在Client端执行。

在提交作业之前,必须存在一个已运行中的集群。所有提交的作业都使用这个集群的资源。

如果其中一个作业有问题或导致TaskManager宕机,则该TaskManager上运行的所有作业都将受影响。另外一个集群运行所有的作业,JobManager的负载会很大。

1)生命周期

集群长期启动,直到手动停止session为止,与flink作业的生命周期无关。

2)资源隔离级别

集群资源共享。

3)适用的作业

执行时间短且要求启动时间也要短的小型作业。

Per Job Mode

一个专有的JobManager会在提交作业的时候被创建,这个JobManager只负责执行这个Job,待Job执行完成之后退出。Flink应用运行在客户端。

Per-job mode只支持在YARN上运行,已经在Flink1.15废弃了,可以考虑使用Application Mode。

Resource Providers

单机模式(Standalone)

前提

Flink需要运行在UNIX环境,JDK需要支持Java 1.8.x及以上。

启动一个Session Mode单机集群

1)首先下载flink包(flink-1.15.0-bin-scala_2.12.tgz

2)解压缩此压缩包到对应目录,比如/usr/local/

3)启动集群

$ cd /usr/local

$ ./bin/start-cluster.sh

4)浏览器访问

http://ip:8081

如果想远程访问,需要配置文件./conf/flink-conf.yaml

#此配置项从“localhost”改为“0.0.0.0”即可
rest.bind-address: 0.0.0.0

到此,就可以在浏览器端或命令行提交任务了,不过提交的任务都是Session Mode的任务。

5)停止集群

$ ./bin/stop-cluster.sh

启用一个Application Mode单机集群

1)上传Jar包

$ cd /usr/local/flink-1.15.0
$ mkdir usrlib
$ cp xxx.jar /usr/local/flink-1.15.0/usrlib

2)启动JobManager

$ ./bin/standalone-job.sh start --job-classname com.wy.StreamFromSocketWordCount

现在可以通过http://ip:8081访问了,但是目前还没有正常工作,因为还需要启用TaskManager。

3)启动TaskManager

$ ./bin/taskmanager.sh start

TaskManager可以启动多个。

4)停止服务

#停止单个服务
$ ./bin/taskmanager.sh stop
$ ./bin/standalone-job.sh stop

#停止所有服务
$ ./bin/taskmanager.sh stop-all
$ ./bin/standalone-job.sh stop-all

集群说明

1)配置文件conf/mastersconf/workers

脚本bin/start-cluster.shbin/stop-cluster.sh对于集群实例的管理通过文件conf/mastersconf/workers来进行管理,集群中涉及的服务器需要提前配置好SSH免密登录。

2)高可用配置

当前模式下,集群的高可用需要(只能)通过Zookeeper进行配置。

通过Docker启动一个Session Mode集群

1)创建网络(非必要)

#默认是“桥接”模式的:bridge 
$ docker network create flink-network

2)启动JobManager

$ docker run -d \
   --rm \
   --name=jobmanager \
   --network flink-network \
   -h 172.18.0.2 \
   -p 8081:8081 \
   --env FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2" \
  flink:1.15.0-scala_2.12-java11 jobmanager

3)启动一个或多个TaskManager

$ FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2
taskmanager.numberOfTaskSlots: 10
"

$ docker run -d \
   --rm \
   --name=taskmanager \
   --network flink-network \
   --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
  flink:1.15.0-scala_2.12-java11 taskmanager

然后就可以通过http://ip:8081来进行访问查询作业运行情况了。

通过Docker启动一个Application Mode集群

前提
  • 将对应的作业jar文件放在某个目录下,比如/opt/flink/usrlib

1)上传作业对应的Jar包

$ cd /opt
$ mkdir flink
$ cd flink
$ mkdir usrlib
#上传Jar包到此目录下

2)创建网络(非必要)

$ docker network create flink-network

3)启动JobManager

#命令格式
$ docker run -d \
   -v /opt/flink/usrlib/artifacts1:/opt/flink/usrlib/artifacts1 \
   --rm \
   --env FLINK_PROPERTIES="jobmanager.rpc.address: <host>" \
   --name=jobmanager \
   --network flink-network \
   -h <ContainerHost> \
   -p 8081:8081 \
  flink:1.15.0-scala_2.12-java11 standalone-job \
   --job-classname com.job.ClassName \
  [--job-id <job id>] \
  [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
  [job arguments]
   
#示例
$ docker run -d \
   -v /opt/flink/usrlib:/opt/flink/usrlib \
   --rm \
   --env FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2" \
   --name=jobmanager \
   --network flink-network \
   -h 172.18.0.2 \
   -p 8081:8081 \
  flink:1.15.0-scala_2.12-java11 standalone-job \
   --job-classname com.wy.StreamFromSocketWordCount

4)启动TaskManager

#命令格式
$ docker run -d \
   --rm \
   --name=taskmanager \
   --network flink-network \
   -v /opt/flink/usrlib/artifacts1:/opt/flink/usrlib/artifacts1 \
   --env FLINK_PROPERTIES="jobmanager.rpc.address: <host>" \
  flink:1.15.0-scala_2.12-java11 taskmanager

#示例
$ FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2
taskmanager.numberOfTaskSlots: 10
"

$ docker run -d \
   --rm \
   --name=taskmanager \
   --network flink-network \
   -v /opt/flink/usrlib:/opt/flink/usrlib \
   --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
  flink:1.15.0-scala_2.12-java11 taskmanager

以上如果不能满足条件,可以通过定制Dockerfile的方式发布自己的镜像。

FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2

ADD /host/path/to/flink-conf.yaml /opt/flink/conf/flink-conf.yaml
ADD /host/path/to/log4j.properties /opt/flink/conf/log4j.properties
$ docker build --tag flink_with_job_artifacts .
$ docker run \
  flink_with_job_artifacts standalone-job \
   --job-classname com.job.ClassName \
  [--job-id <job id>] \
  [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
  [job arguments]

$ docker run flink_with_job_artifacts taskmanager

通过Kubernetes启动Flink集群

另外也还可以通过Kubernetes启用Flink的Session Mode集群或Application Mode集群。

flink
Standalone

关于作者

justin
123456
获得点赞
文章被阅读