概述
上图展示的是每个Flink集群包含的模块。
组件(Component) | 作用、目的(Purpose) | 实现(Implementations) |
---|---|---|
Flink Client | 将应用转换成作业图(JobGraph),并将它提交到JobManager | Command Line、REST Endpoint、SQL Clinet、Python REPL |
JobManager | Flink的核心组件,用于协调各个作业的运行,生成作业图( JobGraph) | Standalone、Kubernetes、YARN |
TaskManager | 执行Flink作业。 | |
扩展组件(External Components) | ||
高可用的JobManager | Flink核心组件,用于协调各个作业的资源,针对不同的Resource Provider,有不同的实现,这些实现在高可用性、资源分配行为和支持的作业提交模式上又有所区分。 | Zookeeper、Kubernetes HA |
文件存储和持久化(File Storage and Persistency) | 检查点机制(流作业的恢复机制),依赖外部文件存储系统 | |
Resource Provider | 资源管理器,管理集群资源,针对 YARN、Kubernetes和standalone deployments都有不同的实现。 | |
Metrics Storage | Flink组件的内部指标报告 | |
Application-level data sources and sinks | 通过各种各样的连接器(Connectors)实现 |
部署方式(Deployment Modes)
Flink可以按以下三种方式进行部署。
-
Application Mode
-
Session Mode
-
Per-Job Mode(废弃)
Application Mode
一个专有的JobManager会在提交作业的时候被创建,这个JobManager只负责执行这个Job,待Job执行完成之后退出。Flink应用运行在JobManager上。
应用的main()
方法在JobManager上执行。
此模式下会为每一个提交的应用创建一个集群。
1)生命周期
以Flink应用程序执行作业,提交作业之前无需启动Flink集群,生命周期与Flink应用程序的生命周期相关。
2)资源隔离级别
在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。
3)适用的作业
长期运行、高稳定性要求、对较长的启动时间不敏感的大型作业。
Session Mode
多个作业共享一个JobManager。
应用的main()
方法在Client端执行。
在提交作业之前,必须存在一个已运行中的集群。所有提交的作业都使用这个集群的资源。
如果其中一个作业有问题或导致TaskManager宕机,则该TaskManager上运行的所有作业都将受影响。另外一个集群运行所有的作业,JobManager的负载会很大。
1)生命周期
集群长期启动,直到手动停止session为止,与flink作业的生命周期无关。
2)资源隔离级别
集群资源共享。
3)适用的作业
执行时间短且要求启动时间也要短的小型作业。
Per Job Mode
一个专有的JobManager会在提交作业的时候被创建,这个JobManager只负责执行这个Job,待Job执行完成之后退出。Flink应用运行在客户端。
Per-job mode只支持在YARN上运行,已经在Flink1.15废弃了,可以考虑使用Application Mode。
Resource Providers
单机模式(Standalone)
前提
Flink需要运行在UNIX环境,JDK需要支持Java 1.8.x及以上。
启动一个Session Mode单机集群
1)首先下载flink包(flink-1.15.0-bin-scala_2.12.tgz
)
2)解压缩此压缩包到对应目录,比如/usr/local/
3)启动集群
$ cd /usr/local
$ ./bin/start-cluster.sh
4)浏览器访问
如果想远程访问,需要配置文件./conf/flink-conf.yaml
#此配置项从“localhost”改为“0.0.0.0”即可
rest.bind-address: 0.0.0.0
到此,就可以在浏览器端或命令行提交任务了,不过提交的任务都是Session Mode的任务。
5)停止集群
$ ./bin/stop-cluster.sh
启用一个Application Mode单机集群
1)上传Jar包
$ cd /usr/local/flink-1.15.0
$ mkdir usrlib
$ cp xxx.jar /usr/local/flink-1.15.0/usrlib
2)启动JobManager
$ ./bin/standalone-job.sh start --job-classname com.wy.StreamFromSocketWordCount
现在可以通过http://ip:8081
访问了,但是目前还没有正常工作,因为还需要启用TaskManager。
3)启动TaskManager
$ ./bin/taskmanager.sh start
TaskManager可以启动多个。
4)停止服务
#停止单个服务
$ ./bin/taskmanager.sh stop
$ ./bin/standalone-job.sh stop
#停止所有服务
$ ./bin/taskmanager.sh stop-all
$ ./bin/standalone-job.sh stop-all
集群说明
1)配置文件conf/masters
和conf/workers
脚本bin/start-cluster.sh
和bin/stop-cluster.sh
对于集群实例的管理通过文件conf/masters
和conf/workers
来进行管理,集群中涉及的服务器需要提前配置好SSH免密登录。
2)高可用配置
当前模式下,集群的高可用需要(只能)通过Zookeeper进行配置。
通过Docker启动一个Session Mode集群
1)创建网络(非必要)
#默认是“桥接”模式的:bridge
$ docker network create flink-network
2)启动JobManager
$ docker run -d \
--rm \
--name=jobmanager \
--network flink-network \
-h 172.18.0.2 \
-p 8081:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2" \
flink:1.15.0-scala_2.12-java11 jobmanager
3)启动一个或多个TaskManager
$ FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2
taskmanager.numberOfTaskSlots: 10
"
$ docker run -d \
--rm \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:1.15.0-scala_2.12-java11 taskmanager
然后就可以通过http://ip:8081
来进行访问查询作业运行情况了。
通过Docker启动一个Application Mode集群
前提
-
将对应的作业jar文件放在某个目录下,比如
/opt/flink/usrlib
1)上传作业对应的Jar包
$ cd /opt
$ mkdir flink
$ cd flink
$ mkdir usrlib
#上传Jar包到此目录下
2)创建网络(非必要)
$ docker network create flink-network
3)启动JobManager
#命令格式
$ docker run -d \
-v /opt/flink/usrlib/artifacts1:/opt/flink/usrlib/artifacts1 \
--rm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: <host>" \
--name=jobmanager \
--network flink-network \
-h <ContainerHost> \
-p 8081:8081 \
flink:1.15.0-scala_2.12-java11 standalone-job \
--job-classname com.job.ClassName \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
#示例
$ docker run -d \
-v /opt/flink/usrlib:/opt/flink/usrlib \
--rm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2" \
--name=jobmanager \
--network flink-network \
-h 172.18.0.2 \
-p 8081:8081 \
flink:1.15.0-scala_2.12-java11 standalone-job \
--job-classname com.wy.StreamFromSocketWordCount
4)启动TaskManager
#命令格式
$ docker run -d \
--rm \
--name=taskmanager \
--network flink-network \
-v /opt/flink/usrlib/artifacts1:/opt/flink/usrlib/artifacts1 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: <host>" \
flink:1.15.0-scala_2.12-java11 taskmanager
#示例
$ FLINK_PROPERTIES="jobmanager.rpc.address: 172.18.0.2
taskmanager.numberOfTaskSlots: 10
"
$ docker run -d \
--rm \
--name=taskmanager \
--network flink-network \
-v /opt/flink/usrlib:/opt/flink/usrlib \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:1.15.0-scala_2.12-java11 taskmanager
以上如果不能满足条件,可以通过定制Dockerfile
的方式发布自己的镜像。
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2
ADD /host/path/to/flink-conf.yaml /opt/flink/conf/flink-conf.yaml
ADD /host/path/to/log4j.properties /opt/flink/conf/log4j.properties
$ docker build --tag flink_with_job_artifacts .
$ docker run \
flink_with_job_artifacts standalone-job \
--job-classname com.job.ClassName \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
$ docker run flink_with_job_artifacts taskmanager
通过Kubernetes启动Flink集群
另外也还可以通过Kubernetes启用Flink的Session Mode集群或Application Mode集群。