Skip to main content

· 15 min read

1. 背景

Hadoop 体系虽然在目前应用非常广泛,但架构繁琐、运维复杂度过高、版本升级困难,且由于部门原因,数据中台需求排期较长,我们急需探索敏捷性开发的数据平台模式。在目前云原生架构的普及和湖仓一体化的大背景下,我们已经确定了将 Doris 作为离线数据仓库,将 TiDB(目前已经应用于生产)作为实时数据平台,同时因为 Doris 具有 on MySQL 的 ODBC 能力,所以又可以对外部数据库资源进行整合,统一对外输出报表

(这里借用一下 Doris 官方的架构图)


2. 遇到的问题

在数据引擎上,我们确定使用 Spark 和 Flink

  • 使用 Spark on K8s client 客户端模式做离线数据处理
  • 使用 Flink on K8s Native-Application/Session 模式做实时任务流管理

在这里,实际上有一些问题我们一直没有彻底解决:

用过 Native-Application 模式的朋友都知道,每提交一个任务,都需要打包新的镜像,提交到私有仓库,然后再调用 Flink Run 指令沟通 K8s,去拉取镜像运行 Pod。任务提交之后,还需要去 K8s 查看 log, 但是:

  1. 任务运行监控怎么处理?
  2. 使用 Cluster 模式还是 NodePort 暴露端口访问 Web UI?
  3. 提交任务能否简化打包镜像的流程?
  4. 如何减少开发压力?


3. 解决问题的过程

以上的这些其实都是需要解决的问题,如果单纯地使用命令行去提交每个任务,是不现实的,任务量大了,会变得不可维护。如何解决这些问题变成一个不得不面对的问题。


简化镜像构建

首先,针对 Flink 原生镜像需要二次 build 的问题:我们利用了 MinIO 作为外部存储,并使用 s3-fuse 通过 DaemonSet 的方式直接挂载在了每个宿主节点上,我们所需要提交的 jar 包都可以放到上面统一管理。这样的话,即使扩缩容 Flink 节点,也能实现 S3 挂载自动伸缩。

Flink 从 1.13 版本开始,就支持 Pod Template,我们可以在 Pod Template 中利用数据卷挂载的方式再将宿主机目录挂载到每个 pod 中,从而无需镜像打包而直接在 K8s 上运行 Flink 程序。如上图,我们将 S3 先通过 s3-fuse Pod 挂载在 Node 1、Node 2 的 /mnt/data-s3fs 目录下,然后再将 /mnt/data-s3fs 挂载到 Pod A 中。

但是,因为对象存储随机写入或追加文件需要重写整个对象,导致这种方式仅适合于频繁读。而这刚好满足我们现在的场景。


引入 StreamPark

之前我们写 Flink SQL 基本上都是使用 Java 包装 SQL,打 jar 包,提交到 S3 平台上。通过命令行方式提交代码,但这种方式始终不友好,流程繁琐,开发和运维成本太大。我们希望能够进一步简化流程,将 Flink TableEnvironment 抽象出来,有平台负责初始化、打包运行 Flink 任务,实现 Flink 应用程序的构建、测试和部署自动化。

这是个开源兴起的时代,我们自然而然的将目光投向开源领域中:在一众开源项目中,经过对比各个项目综合评估发现 Zeppelin StreamPark 这两个项目对 Flink 的支持较为完善,都宣称支持 Flink on K8s ,最终进入到我们的目标选择范围中,以下是两者在 K8s 相关支持的简单比较(目前如果有更新,麻烦批评指正)。

功能ZeppelinStreamPark
任务状态监控 稍低 ,不能作为任务状态监控工具 较高
任务资源管理,但目前版本还不是很健全
本地化部署 稍低 ,on K8s 模式只能将 Zeppelin 部署在 K8s 中,否则就需要打通 Pod 和外部网络,但是这在生产环境中很少这样做的 可以本地化部署
多语言支持 较高 ,支持 Python/Scala/Java 多语言 一般 ,目前 K8s 模式和 YARN 模式同时支持 FlinkSQL,并可以根据自身需求,使用 Java/Scala 开发 DataStream
Flink WebUI 代理 目前还支持的不是很完整 ,主开发大佬目前是考虑整合 Ingress 较好 ,目前支持 ClusterIp/NodePort/LoadBalance 模式
学习成本 成本较低 ,需要增加额外的参数学习,这个和原生的 FlinkSQL 在参数上有点区别 无成本 ,K8s 模式下 FlinkSQL 为原生支持的 SQL 格式;同时支持 Custome-Code(用户编写代码开发Datastream/FlinkSQL 任务)
Flink 多版本支持 支持 支持
Flink 原生镜像侵入 有侵入 ,需要在 Flink 镜像中提前部署 jar 包,会同 JobManager 启动在同一个 Pod 中,和 zeppelin-server 通信 无侵入 ,但是会产生较多镜像,需要定时清理
代码多版本管理 支持 支持
(PS: 此处仅从调研用户角度出发,我们对双方开发都保持极大的尊重)

调研过程中,我们与两者的主开发人员都进行了多次沟通。经过我们反复研究之后,还是决定将 StreamPark 作为我们目前的 Flink 开发工具来使用。

(StreamPark 官网的闪屏)

经过开发同学长时间开发测试,StreamPark 目前已经具备:

  • 完善的SQL 校验功能
  • 实现了自动 build/push 镜像
  • 使用自定义类加载器,通过 Child-first 加载方式 解决了 YARN 和 K8s 两种运行模式支持了自由切换 Flink 多版本
  • 与 Flink-Kubernetes 进行深度整合,提交任务后返回 WebUI,通过 remote rest api + remote K8s,追踪任务执行状态
  • 同时支持了 Flink 1.12、1.13、1.14 等版本

以上基本解决了我们目前开发和运维中存在的大部分问题。

(StreamPark 对 Flink 多版本的支持演示视频)

在目前最新发布的 1.2.0 版本中,StreamPark 较为完善地支持了 K8s-Native-Application 和 K8s-Session-Application 模式。

(StreamPark K8s 部署演示视频)

K8s Native Application 模式

在 StreamPark 中,我们只需要配置相应的参数,并在 Maven POM 中填写相应的依赖,或者上传依赖 jar 包,点击 Apply,相应的依赖就会生成。这就意味着我们也可以将所有使用的 UDF 打成 jar 包,以及各种 connector.jar,直接在 SQL 中使用。如下图:

SQL 校验能力和 Zeppelin 基本一致:

我们也可以指定资源,指定 Flink Run 中的动态参数 Dynamic Option,甚至参数可以整合 Pod Template

程序保存后,点击运行时,也可以指定 savepoint。任务提交成功后,StreamPark 会根据 FlinkPod 网络 Exposed Type(loadBalancer/NodePort/ClusterIp),返回相应的 WebURL,从而自然的实现 WebUI 跳转。但是,目前因为线上私有 K8s 集群出于安全性考虑,尚未打通 Pod 与客户端节点网络(目前也没有这个规划)。所以么,我们只使用 NodePort。如果后续任务数过多,有使用 ClusterIP 的需求的话,我们可能会将 StreamPark 部署在 K8s,或者同 Ingress 做进一步整合。

注意:K8s master 如果使用 vip 做均衡代理的情况下,Flink 1.13 版本会返回 vip 的 ip 地址,在 1.14 版本中已经修复该问题。

下面是 K8s Application 模式下具体提交流程

(以上是依据个人理解绘制的任务提交流程图,如有错误,敬请谅解)

K8s Native Session 模式

StreamPark 还较好地支持了 K8s Native-Sesson 模式,这为我们后续做离线 FlinkSQL 开发或部分资源隔离做了较好的技术支持。

Native-Session 模式需要事先使用 Flink 命令创建一个运行在 K8s 中的 Flink 集群。如下:

./kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-on-k8s-flinkSql-test \
-Dkubernetes.context=XXX \
-Dkubernetes.namespace=XXXX \
-Dkubernetes.service-account=XXXX \
-Dkubernetes.container.image=XXXX \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.taskmanager.node-selector=XXXX \
-Dkubernetes.rest-service.exposed.type=Nodeport

如上图,使用该 ClusterId 作为 StreamPark 的任务参数 Kubernetes ClusterId。保存提交任务后,任务会很快处于 Running 状态:

我们顺着 application info 的 WebUI 点击跳转:

可以看到,其实 StreamPark 是将 jar 包通过 REST API 上传到 Flink 集群上,并调度执行任务的。


Custom Code 模式

另我们惊喜的是,StreamPark 还支持代码编写 DataStream/FlinkSQL 任务。对于特殊需求,我们可以自己写 Java/Scala 实现。可以根据 StreamPark 推荐的脚手架方式编写任务,也可以编写一个标准普通的 Flink 任务,通过这种方式我们可以将代码管理交由 git 实现,平台可以用来自动化编译打包与部署。当然,如果能用 SQL 实现的功能,我们会尽量避免自定义 DataStream,减少不必要的运维麻烦。



4. 意见和规划

改进意见

当然 StreamPark 还有很多需要改进的地方,就目前测试来看:

  • 资源管理还有待加强:多文件系统jar包等资源管理功能尚未添加,任务版本功能有待加强。
  • 前端 button 功能还不够丰富:比如任务添加后续可以增加复制等功能按钮。
  • 任务提交日志也需要可视化展示:任务提交伴随着加载 class 文件,打 jar 包,build 镜像,提交镜像,提交任务等过程,每一个环节出错,都会导致任务的失败,但是失败日志往往不明确,或者因为某种原因导致异常未正常抛出,没有转换任务状态,用户会无从下手改进。

众所周知,一个新事物的出现一开始总会不是那么完美。尽管有些许问题和需要改进的 point,但是瑕不掩瑜,我们仍然选择 StreamPark 作为我们的 Flink DevOps,我们也将会和主开发人员一道共同完善 StreamPark,也欢迎更多的人来使用,为 StreamPark 带来更多进步。


未来规划

  • 我们会继续跟进 Doris,并将业务数据 + 日志数据统一入 Doris,通过 Flink 实现湖仓一体;
  • 我们也会逐步将探索 StreamPark 同 DolphinScheduler 2.x 进行整合,完善DolphinScheduler 离线任务,逐步用 Flink 替换掉 Spark,实现真正的流批一体;
  • 基于我们自身在 S3 上的探索积累,fat-jar 包 build 完成之后不再构建镜像,直接利用 Pod Tempelet 挂载 PVC 到 Flink Pod 中的目录,进一步优化代码提交流程;
  • 将 StreamPark 持续应用到我们生产中,并汇同社区开发人员,共同努力,增强 StreamPark 在 Flink 流上的开发部署能力与运行监控能力,努力把 StreamPark 打造成一个功能完善的流数据 DevOps。

附:

StreamPark GitHub:https://github.com/apache/incubator-streampark
Doris GitHub:https://github.com/apache/doris

· 19 min read

导读:本文主要介绍顺网科技在使用 Flink 计算引擎中遇到的一些挑战,基于 StreamPark 作为实时数据平台如何来解决这些问题,从而大规模支持公司的业务。

  • 公司业务介绍
  • 遇到的挑战
  • 为什么用 StreamPark
  • 落地实践
  • 带来的收益
  • 未来规划

公司业务介绍

杭州顺网科技股份有限公司成立于 2005 年,秉承科技连接快乐的企业使命,是国内具有影响力的泛娱乐技术服务平台之一。多年来公司始终以产品和技术为驱动,致力于以数字化平台服务为人们创造沉浸式的全场景娱乐体验。

自顺网科技成立以来,随着业务快速发展,顺网科技服务了 8 万家线下实体店,拥有超过 5000 万互联网用户,年触达超 1.4 亿网民,每 10 家公共上网服务场所有 7 家使用顺网科技产品。

在拥有庞大的用户群体的情况下,顺网科技为了给用户提供更加优质的产品体验,实现企业的数字化转型,从 2015 年开始大力发展大数据, Flink 在顺网科技的实时计算中一直扮演着重要的角色。在顺网科技,实时计算大概分为 4 个应用场景:

  • 用户画像实时更新:包括网吧画像和网民画像。
  • 实时风控:包括活动防刷、异地登录监测等。
  • 数据同步:包括 Kafka 数据同步到 Hive / Iceberg / ClickHouse 等。
  • 实时数据分析:包括游戏、语音、广告、直播等业务实时大屏。

到目前为止,顺网科技每日需要处理 TB 级别的数据,总共拥有 700+ 个实时任务,其中 FlinkSQL 任务占比为 95% 以上。随着公司业务快速发展和数据时效性要求变高,预计在今年年底 Flink 任务会达到 900+。

遇到的挑战

Flink 作为当下实时计算领域中最流行的技术框架之一,拥有高吞吐、低延迟、有状态计算等强大的特性。在探索中我们发现 Flink 虽然拥有强大的计算能力,但是对于作业开发管理和运维问题,社区并没有提供有效的解决方案。我们对 Flink 作业开发管理上遇到的一些痛点大概总结为 4 个方面,如下:

图片

在面对 Flink 作业管理和运维上的的一系列痛点后,我们一直在寻找合适的解决方案来降低开发同学使用 Flink 门槛,提高工作效率。

在没有遇到 StreamPark 之前,我们调研了部分公司的 Flink 管理解决方案,发现都是通过自研实时作业平台的方式来开发和管理 Flink 作业。于是,我们也决定自研一套实时计算管理平台,来满足了开发同学对于 Flink 作业管理和运维的基础需求,我们这套平台叫 Streaming-Launcher,大体功能如下:

图片

但是后续开发同学在使用过程中,发现 Streaming-Launcher 存在比较多的缺陷:Flink 开发成本依然过高、工作效率低下、问题排查困难。我们总结了 Streaming-Launcher 存在的缺陷,大致如下:

SQL开发流程繁琐

作业务开发需要多个工具完成一个 SQL 作业开发,提高了开发同学的使用门槛。

cc0b1414ed43942e0ef5e9129c2bf817

SQL-Client 存在弊端

Flink 提供的 SQL-Client 目前对作业运行模式支持上,存在一定的弊端。

图片

作业缺少统一管理

Streaming-Launcher 中,没有提供统一的作业管理界面。开发同学无法直观的看到作业运行情况,只能通过告警信息来判断作业运行情况,这对开发同学来说非常不友好。如果因为 Yarn 集群稳定性问题或者网络波动等不确定因素,一下子失败大批量任务,在开发同学手动恢复作业的过程中,很容易漏恢复某个任务而造成生产事故。

问题诊断流程繁琐

一个作业查看日志需要通过多个步骤,一定程度上降低了开发同学工作效率。

图片

为什么用 StreamPark

面对自研平台 Streaming-Launcher 存在的缺陷,我们一直在思考如何将 Flink 的使用门槛降到更低,进一步提高工作效率。考虑到人员投入成本和时间成本,我们决定向开源社区求助寻找合适的开源项目来对我们的 Flink 任务进行管理和运维。

很幸运在 2022 年 6 月初,我们在 GitHub 机缘巧合之间认识到了 StreamPark,我们满怀希望地对 StreamPark 进行了初步的探索。发现 StreamPark 具备的能力大概分为三大块:用户权限管理、作业运维管理和开发脚手架。

用户权限管理

在 StreamPark 平台中为了避免用户权限过大,发生一些不必要的误操作,影响作业运行稳定性和环境配置的准确性,提供了相应的一些用户权限管理功能,这对企业级用户来说,非常有必要。

图片

作业运维管理

我们在对 StreamPark 做调研的时候,最关注的是 StreamPark 对于作业的管理的能力。StreamPark 是否有能力管理作业一个完整的生命周期:作业开发、作业部署、作业管理、问题诊断等。很幸运,StreamPark 在这一方面非常优秀,对于开发同学来说只需要关注业务本身,不再需要特别关心 Flink 作业管理和运维上遇到的一系列痛点。在 StreamPark 作业开发管理管理中,大致分为三个模块:作业管理基础功能,Jar 作业管理,FlinkSQL 作业管理。如下:

图片

开发脚手架

通过进一步的研究发现,StreamPark 不仅仅是一个平台,还包含 Flink 作业开发脚手架, 在 StreamPark 中,针对编写代码的 Flink 作业,提供一种更好的解决方案,将程序配置标准化,提供了更为简单的编程模型,同时还提供了一些列 Connectors,降低了 DataStream 开发的门槛。

图片

02 StreamPark 解决自研平台的问题

上面我们简单介绍了 StreamPark 的核心能力。在顺网科技的技术选型过程中,我们发现 StreamPark 所具备强大的功能不仅包含了现有 Streaming-Launcher 的基础功能,还提供了更完整的对应方案解决了 Streaming-Launcher 的诸多不足。在这部分,着重介绍下 StreamPark 针对我们自研平台 Streaming-Launcher 的不足所提供的解决方案。

图片

Flink 作业一站式的开发能力

StreamPark 为了降低 Flink 作业开发门槛,提高开发同学工作效率,提供了 FlinkSQL IDE、参数管理、任务管理、代码管理、一键编译、一键作业上下线等使用的功能。在调研中,我们发现 StreamPark 集成的这些功能可以进一步提升开发同学的工作效率。在某种程度上来说,开发同学不需要去关心 Flink 作业管理和运维的难题,只要专注于业务的开发。同时,这些功能也解决了 Streaming-Launcher 中 SQL 开发流程繁琐的痛点。

图片

支持多种部署模式

在 Streaming-Launcher 中,由于只支持 Yarn Session 模式,对于开发同学来说,其实非常不灵活。StreamPark 对于这一方面也提供了完善的解决方案。StreamPark 完整的支持了Flink 的所有部署模式:Remote、Yarn Per-Job、Yarn Application、Yarn Session、K8s Session、K8s Application**可以让开发同学针对不同的业务场景自由选择合适的运行模式。

作业统一管理中心

对于开发同学来说,作业运行状态是他们最关心的内容之一。在 Streaming-Launcher 中由于缺乏作业统一管理界面,开发同学只能通过告警信息和 Yarn 中Application 的状态信息来判断任务状态,这对开发同学来说非常不友好。StreamPark 针对这一点,提供了作业统一管理界面,可以一目了然查看到每个任务的运行情况。

图片

在 Streaming-Launcher 中,开发同学在作业问题诊断的时候,需要通过多个步骤才能定位作业运行日志。StreamPark 提供了一键跳转功能,能快速定位到作业运行日志。

图片

落 地 实 践

在 StreamPark 引入顺网科技时,由于公司业务的特点和开发同学的一些定制化需求,我们对 StreamPark 的功能做了一些增加和优化,同时也总结了一些在使用过程中遇到的问题和对应的解决方案。

在顺网科技,我们基于 Flink-SQL-Gateway 自研了 ODPS 平台来方便业务开发同学管理 Flink 表的元数据。业务开发同学在 ODPS 上对 Flink 表进行 DDL 操作,然后在 StreamPark 上对创建的 Flink 表进行分析查询操作。在整个业务开发流程上,我们对 Flink 表的创建和分析实现了解耦,让开发流程显得比较清晰。

开发同学如果想在 ODPS 上查询实时数据,我们需要提供一个 Flink SQL 的运行环境。我们使用 StreamPark 运行了一个 Yarn Session 的 Flink 环境提供给 ODPS 做实时查询。

图片

目前 StreamPark 社区为了进一步降低实时作业开发门槛,正在对接 Flink-SQL-Gateway。

https://github.com/apache/streampark/issues/2274

在顺网科技,存在大量从 Kafka 数据同步到 Iceberg / PG / Clickhouse / Hive 的作业。这些作业需要的 Yarn 对于资源要求和时效性要求不高,但是如果全部使用 Yarn Application 和 per-job 模式,每个任务都会启动 JobManager,那么会造成 Yarn 资源的浪费。对此,我们决定使用 Yarn Session 模式运行这些大量的数据同步作业。

在实践中我们发现业务开发同学很难直观的知道在每个 Yarn Session 中运行了多少个作业,其中包括作业总数和正在运行中的作业数量。基于这个原因,我们为了方便开发同学可以直观地观察到 Yarn Session 中的作业数量,在 Flink Cluster 界面增加了 All Jobs 和 Running Jobs 来表示在一个 Yarn Session 中总的作业数和正在运行的作业数。

图片

03 增强告警能力

因为每个公司的短信告警平台实现都各不相同,所以 StreamPark 社区并没有抽象出统一的短信告警功能。在此,我们通过 Webhook 的方式,自己实现了短信告警功能。

图片

04 增加阻塞队列解决限流问题

在生产实践中,我们发现在大批量任务同时失败的时候,比如 Yarn Session 集群挂了,飞书 / 微信等平台在多线程同时调用告警接口时会存在限流的问题,那么大量的告警信息因为飞书 / 微信等平台限流问题,StreamPark 只会发送一部分的告警信息,这样非常容易误导开发同学排查问题。在顺网科技,我们增加了一个阻塞队列和一个告警线程,来解决限流问题。

图片

当作业监控调度器检测到作业异常时,会产生一条作业异常的消息发送的阻塞队列中,在告警线程中会一直消费阻塞队列中的消息,在得到作业异常消息后则会根据用户配置的告警信息单线程发送到不同的平台中。虽然这样做可能会让用户延迟收到告警,但是我们在实践中发现同时有 100+ 个 Flink 作业失败,用户接受到告警的延迟时间小于 3s。对于这种延迟时间,我们业务开发同学完全是可以接受的。该改进目前已经记录 ISSUE,正在考虑贡献到社区中。

https://github.com/apache/streampark/issues/2142

带来的收益

我们从 StreamX 1.2.3(StreamPark 前身)开始探索和使用,经过一年多时间的磨合,我们发现 StreamPark 真实解决了 Flink 作业在开发管理和运维上的诸多痛点。

StreamPark 给顺网科技带来的最大的收益就是降低了 Flink 的使用门槛,提升了开发效率。我们业务开发同学在原先的 Streaming-Launcher 中需要使用 vscode、GitLab 和调度平台等多个工具完成一个 FlinkSQL 作业开发,从开发到编译到发布的流程中经过多个工具使用,流程繁琐。StreamPark 提供一站式服务,可以在 StreamPark 上完成作业开发编译发布,简化了整个开发流程。

目前 StreamPark 在顺网科技已经大规模在生产环境投入使用,StreamPark 从最开始管理的 500+ 个 FlinkSQL 作业增加到了近 700 个 FlinkSQL作业,同时管理了 10+ 个 Yarn Sesssion Cluster。

图片

图片

未 来 规 划

顺网科技作为 StreamPark 早期的用户之一,在 1 年期间内一直和社区同学保持交流,参与 StreamPark 的稳定性打磨,我们将生产运维中遇到的 Bug 和新的 Feature 提交给了社区。在未来,我们希望可以在 StreamPark 上管理 Flink 表的元数据信息,基于 Flink 引擎通过多 Catalog 实现跨数据源查询分析功能。目前 StreamPark 正在对接 Flink-SQL-Gateway 能力,这一块在未来对于表元数据的管理和跨数据源查询功能会提供了很大的帮助。

由于在顺网科技多是已 Yarn Session 模式运行的作业,我们希望 StreamPark 可以提供更多对于 Remote集群、Yarn Session 集群和 K8s Session 集群功能支持,比如监控告警,优化操作流程等方面。

考虑到未来,随着业务发展可能会使用 StreamPark 管理更多的 Flink 实时作业,单节点模式下的 StreamPark 可能并不安全。所以我们对于 StreamPark 的 HA 也是非常期待。

对于 StreamPark 对接 Flink-SQL-Gateway 能力、丰富 Flink Cluster 功能和 StreamPark HA,我们后续也会参与建设中。

· 23 min read

摘要:本文整理自联通数科实时计算团队负责人、Apache StreamPark Committer 穆纯进在 Flink Forward Asia 2022 平台建设专场的分享,本篇内容主要分为四个部分:

  • 实时计算平台背景介绍
  • Flink 实时作业运维挑战
  • 基于 StreamPark 一体化管理
  • 未来规划与演进

实时计算平台背景介绍

上图是实时计算平台的整体架构,最底层是数据源,由于一些敏感信息,没有将数据源的详细信息列出,它主要包含三部分,分别是业务数据库、用户行为日志、用户位置,联通的数据源非常多,业务数据库这一项就有几万张表;主要通过 Flink SQL 和 DataStream API 来处理数据 ,数据处理流程包括 Flink 对数据源的实时解析、规则的实时计算以及实时产品;用户在可视化订阅平台上进行实时数据订阅,用户可以在地图上画一个电子围栏,并设置一些规则,如来自于哪里,在围栏里驻留多长时间等,还可以筛选一些特征,符合这些规则的用户信息会实时进行推送,然后是实时安全部分,当某个用户连接了高危基站或是有异常操作行为时,我们会认为可能存在诈骗行为,会对手机号码进行关停等等,还有用户的一些实时特征以及实时大屏。

上图是数据处理的详细的流程。

第一部分是采集解析,我们的数据源来自于业务的数据库,包含 OGG 和 DTS 格式的消息、日志消息、用户行为和用户位置数据,总共 50 多种数据源,后续还会逐渐增加,所有数据源均使用 Flink 做实时解析;并增加了 Metrics 来监控数据源的延迟情况。

第二部分是实时计算, 这个环节处理的数据量很大,数据量在万亿级别,支撑了 10000+的数据实时订阅,有 200 多个 Flink 任务,我们将某一种同类型的业务封装成一种场景,同一个 Flink 作业可以支持相同场景的多个订阅,目前 Flink 作业数还在不停的增长,后续可能会增加到 500 多个;其中面临的一个很大挑战是每天万亿级的数据实时关联电子围栏、用户特征等信息,电子围栏有几万个,用户特征涉及数亿用户,最初我们将电子围栏信息和用户特征放到 HBase, 但这样会导致 HBase 压力很大,经常遇到性能问题造成数据延迟,而且一旦产生数据积压,需要很长的时间去消化,得益于 Flink State 的强大,我们将电子围栏信息和用户特征放到状态里,目前已经很好的支撑了大并发的场景,同时我们也增加了数据处理的性能监控;最后是实时产品和营销触达前端的一些应用。

2018 年采用了三方黑盒的计算引擎,不能支持灵活定制个性化功能,且依赖过多外部系统,导致外部系统负载高,运维复杂;2019 年使用了 Spark Streaming 的微批处理,2020 年开始使用 Flink 的流式计算,从 2021 年开始,几乎所有 Spark Streaming 的微批处理都被 Flink 替代了,同时上线了 Apache StreamPark 对我们的 Flink 作业进行管理。

总结一下平台背景,主要包含以下几部分:

  • 数据量大:日均万亿的数据处理。
  • 数据源多:集成了 50 多种实时数据源。
  • 订阅多:支撑了 10000+的数据服务订阅。
  • 用户多:支撑了 30 多个内部和外部用户使用。

运维背景也可以分为以下几部分:

  • 支撑需求多:50 多种数据源,10000+的数据服务订阅。
  • 实时作业多:现在有 200+Flink 生产作业,并且持续快速增长中, 未来可达 500+。
  • 上线频率高:每天都有新增的或增强的 Flink 作业上线操作。
  • 开发人员多:50+研发人员参与开发 Flink 实时计算任务。
  • 使用用户多:30+内部和外部组织的用户使用。
  • 监控延迟低:一旦发现问题我们要立马进行处理,避免引起用户的投诉。

基于平台和运维背景,尤其是 Flink 作业越来越多的情况下,遇到了很大的挑战,主要有两方面,分别是作业运维困境和业务支撑困境。

在作业运维困境上,首先作业部署流程长、效率低;在联通安全是第一红线下,在服务器上部署程序的时候,要连接 VPN、登录 4A、打包编译、部署、然后再启动,整个流程比较长,最初在开发 Flink 的时候,都是用脚本启动的,导致代码分支是不可控的,部署完之后也难以追溯,再就是脚本很难与 git 上的代码进行同步,因为对于脚本代码,开发人员更喜欢在服务器上直接改,很容易忘记上传 git。

由于作业运维困境上的种种因素,会产生业务支撑困境,如导致上线故障率高、影响数据质量、上线时间长、数据延迟高、告警漏发处理等,引起的投诉,此外,我们的业务影响不明确,一旦出现问题,处理问题会成为第一优先级。

基于 StreamPark 一体化管理

对于以上的两种困境,我们基于 StreamPark 一体化管理解决了很多问题,首先来看一下 StreamPark 的双线演进,分别是 Flink 作业管理和 Flink 作业 DevOps 平台;在作业管理上,StreamPark 支持将 Flink 实时作业部署到不同的集群里去,比如 Flink 原生自带的 Standalone 模式,Flink on Yarn 的 Session、Application、PerJob 模式,在最新的版本中将支持 Kubernetes Native Session 模式;中间层是项目管理、作业管理、集群管理、团队管理、变量管理、告警管理。

  • 项目管理:当部署 Flink 程序的时候,可以在项目管理里填写 git 地址,同时选择要部署的分支。
  • 作业管理:可以指定 Flink 作业的执行模式,比如你要提交到什么类型的集群里去,同时还可以配置一些资源,比如 TaskManager 的数量、TaskManager/JobManager 的内存大小、并行度等等,还可以设置一些容错,比如 Flink 作业失败后,StreamPark 可以支持它自动拉起,同时支持传入一些动态参数。
  • 集群管理:可以在界面上添加和管理大数据集群。
  • 团队管理:在企业的实际生产过程中会有多个团队,团队之间是隔离的。
  • 变量管理:可以把一些变量统一维护在一个地方,比如 Kafka 的 Broker 地址定义成一个变量,在配置 Flink 作业或者 SQL 的时候,就可以以变量的方式来替换 Broker 的 IP,且后续这个 Kafka 要下线的时候,也可以通过这个变量去查看到底哪些作业使用了这个集群,方便我们去做一些后续的流程。
  • 告警管理:支持多种告警模式,如微信、钉钉、短信和邮件。

StreamPark 支持 Flink SQL、Flink Jar 的提交,支持资源配置,支持状态跟踪,如状态是运行状态,失败状态等,同时支持指标大屏和各种日志查看。

Flink 作业 DevOps 平台,主要包括以下几部分:

  • 团队:StreamPark 支持多个团队,每个团队都有团队的管理员,他拥有所有权限,同时还有团队的开发者,他只有少量的一部分权限。
  • 编译、打包:在创建 Flink 项目时,可以把 git 地址、分支、打包的命令等配置在项目里,然后一键点击 build 按钮进行编译、打包。
  • 发布、部署:发布和部署的时候会创建 Flink 作业,在 Flink 作业里可以选择执行模式、部署集群、资源设置、容错设置、变量填充,最后通过一键启动停止,启动 Flink 作业。
  • 状态监测:Flink 作业启动完成之后,就是状态的实时跟踪,包括 Flink 的运行状态、运行时长、Checkpoint 信息等,并支持一键跳转到 Flink 的 Web UI。
  • 日志、告警:包含构建的一些日志和启动日志,同时支持钉钉、微信、邮件、短信等告警方式。

企业一般有多个团队同时开发实时作业,在我们公司包含实时采集团队、数据处理团队和实时的营销团队,StreamPark 支持多个团队的资源隔离。

Flink 作业平台化管理面临如下挑战:

  • 脚本数量多:平台有几百个脚本,分散在多个服务器上。
  • 脚本类型多:在启动 Flink 作业时,会有启动脚本、停止脚本和守护脚本,而且操作权限很难控制。
  • 脚本不一致:服务器上的脚本与 git 上的脚本不一致。
  • 脚本确权难:Flink 作业的责任人,用途不明确。
  • 分支不可控:启动作业的时候,需要在脚本里指定 git 分支,导致分支不可追溯的。

基于以上的挑战,StreamPark 通过项目管理来解决了责任人不明确,分支不可追溯的问题,因为在创建项目的时候需要手动指定一些分支,一旦打包成功,这些分支是有记录的;通过作业管理对配置进行了集中化,避免了脚本太过于分散,而且作业启动、停止的权限有严格的控制,避免了脚本化权限不可控的状态,StreamPark 以接口的方式与集群进行交互来获取作业信息,这样做会让作业控制更加精细。

可以看一下上图中下面的图,通过项目管理进行打包,通过作业管理进行配置,然后发布,可以进行一键启停,通过 API 提交作业。

图片

早期我们需要通过 7 步进行部署,包括连接 VPN、登录 4A、执行编译脚本、执行启动脚本、打开 Yarn、搜索作业名、进入 Flink UI 等 7 个步骤,StreamPark 可以支持 4 个一键进行部署,包括一键打包、一键发布、一键启动、一键到 Flink UI。

图片

上图是我们 StreamPark 的作业提交流程,首先 StreamPark 会将作业进行发布,发布的时候会上传一些资源,然后会进行作业的提交,提交的时候会带上配置的一些参数,以 Flink Submit 的方式调用接口发布到集群上;这里会有多个 Flink Submit 对应着不同的执行模式,比如 Yarn Session、Yarn Application、Kubernetes Session、Kubernetes Application 等都是在这里控制的,提交作业之后,如果是 Flink on Yarn 作业,会得到这个 Flink 作业的 Application ID 或者 Job ID,这个 ID 会保存在我们的数据库中,如果是基于 Kubernetes 执行的话,也会得到 Job ID,后面我们在跟踪作业状态的时候,主要就是通过保存的这些 ID 去跟踪作业的状态。

图片

如上所述,如果是 Flink on Yarn 作业,在提交作业的时候会获取两个 ID,Application ID 或者 Job ID,基于这两个 ID 可以获取我们的状态,但当 Flink 作业非常多的时候会遇到一些问题,StreamPark 它是有一个状态获取器,它会通过我们保存的数据库里的 Application ID 或者 Job ID,去向 ResourceManager 做一个请求,会做每五秒钟周期性的轮询,如果作业特别多,每次轮询 ResourceManager 会负责再去调用 Job Manager 的地址访问它的状态,这就会导致 ResourceManager 的连接数压力较大和连接数过高。

上图中 ResourceManager 的连接数阶段性、周期性的持续走高,可以看到 ResourceManager 处于比较红的状态,从主机上去监控的时候,它的连接数确实比较高。

图片

针对上面的问题,我们做了一些优化,首先 StreamPark 保存了提交作业之后的 Application ID 或者 Job ID,同时也会获取 Job Manager 直接访问的地址,并保存在数据库中,每次轮询时不再通过 ResourceManager 获取作业的状态,它可以直接调用各个 Job Manager 的地址实时获取状态,极大的降低了 ResourceManager 的连接数;从上图最后的部分可以看到,基本不会产生太大的连接数,大大减轻了 ResourceManager 的压力,且后续当 Flink 作业越来越多时获取状态也不会遇到瓶颈的问题。

图片

StreamPark 解决的另一个问题是 Flink 从状态恢复的保障,以前我们用脚本做运维的时候,在启动 Flink 的时候,尤其是在业务升级的时候,要从上一个最新的 Checkpoint 来恢复,但经常有开发人员忘记从上一个检查点进行恢复,导致数据质量产生很大的问题,遭到投诉,StreamPark 的流程是在首次启动的时候,每五秒钟轮询一次获取 Checkpoint 的记录,同时保存在数据库之中,在 StreamPark 上手动停止 Flink 作业的时候,可以选择做不做 Savepoint,如果选择了做 Savepoint,会将 Savepoint 的路径保存在数据库中,同时每次的 Checkpoint 记录也保存在数据库中,当下次启动 Flink 作业的时候,默认会选择最新的 Checkpoint 或者 Savepoint 记录,有效避免了无法从上一个检查点去恢复的问题,也避免了导致问题后要进行 offset 回拨重跑作业造成的资源浪费,同时也保证了数据处理的一致性。

图片

StreamPark 还解决了在多环境下多个组件的引用挑战,比如在企业中通常会有多套环境,如开发环境、测试环境、生产环境等,一般来说每套环境下都会有多个组件,比如 Kafka,HBase、Redis 等,而且在同一套环境里还可能会存在多个相同的组件,比如在联通的实时计算平台,从上游的 Kafka 消费数据的时候,将符合要求的数据再写到下游的 Kafka,这个时候同一套环境会涉及到两套 Kafka,单纯从 IP 很难判断是哪个环境哪个组件,所以我们将所有组件的 IP 地址都定义成一个变量,比如 Kafka 集群,开发环境、测试环境、生产环境都有 Kafka.cluster 这个变量,但它们指向的 Broker 的地址是不一样的,这样不管是在哪个环境下配置 Flink 作业,只要引用这个变量就可以了,大大降低了生产上的故障率。

图片

StreamPark 支持 Flink 多执行的模式,包括基于 on Yarn 的 Application/ Perjob / Session 三种部署模式,还支持 Kubernetes 的 Application 和 Session 两种部署模式,还有一些 Remote 的模式。

图片

StreamPark 也支持 Flink 的多版本,比如联通现在用的是 1.14.x,现在 1.16.x 出来后我们也想体验一下,但不可能把所有的作业都升级到 1.16.x,我们可以把新上线的升级到 1.16.x,这样可以很好的满足使用新版本的要求,同时也兼容老版本。

未来规划与演进

图片

未来我们将加大力度参与 StreamPark 建设,以下我们计划要增强的方向。

  • 高可用:StreamPark 目前不支持高可用,这方面还需要做一些加强。
  • 状态的管理:在企业实践中 Flink 作业在上线时,每个算子会有 UID。如果 Flink UID 不做设置,做 Flink 作业的升级的时候,就有可能出现状态无法恢复的情况,目前通过平台还无法解决这个问题,所以我们想在平台上增加这个功能,在 Flink Jar 提交时,增加检测算子是否设置 UID 的功能,如果没有,会发出提醒,这样可以避免每次上线 Flink 作业时,作业无法恢复的问题;之前遇到这种情况的时候,我们需要使用状态处理的 API,从原来的状态里进行反序列化,然后再用状态处理 API 去制作新的状态,供升级后的 Flink 加载状态。
  • 更细致的监控:目前支持 Flink 作业失败之后,StreamPark 发出告警。我们希望 Task 失败之后也可以发出告警,我们需要知道失败的原因;还有作业反压监控告警、Checkpoint 超时、失败告警性能指标采集,也有待加强。
  • 流批一体:结合 Flink 流批一体引擎和数据湖流批一体存储探索流批一体平台。

上图是 StreamPark 的 Roadmap。

  • 数据源:StreamPark 会支持更多数据源的快速接入,达到数据一键入户。
  • 运维中心:获取更多 Flink Metrics 进一步加强监控运维的能力。
  • K8S-operator:现有的 Flink on K8S 还是有点重,经历了打 Jar 包、打镜像、推镜像的过程,后续需要改进优化,积极拥抱上游对接的 K8S-operator。
  • 流式数仓:增强对 Flink SQL 作业能力的支持,简化 Flink SQL 作业的提交,计划对接 Flink SQL Gateway;SQL 数仓方面的能力加强,包括元数据存储、统一建表语法校验、运行测试、交互式查询,积极拥抱 Flink 上游,探索实时数仓和流式数仓。