**导读:**自如作为一家专注于提供租房产品和服务的 O2O 互联网公司,构建了一个涵盖城市居住生活领域全链条的在线化、数据化、智能化平台,实时计算在自如一直扮演着重要的角色。到目前为止,自如每日需要处理 TB 级别的数据,本文由来自自如的实时计算小伙伴带来,介绍了自如基于 StreamPark 的实时计算平台深度实践。
- 实时计算遇到的挑战
- 需求解决方案之路
- 基于 StreamPark 的深度实践
- 实践经验总结和示例
- 带来的收益
- 未来规划
自如作为提供租房产品及服务的 O2O 互联网品牌,成立于 2011 年 10 月,自如已为近 50 万业主、500 万自如客提供服务,管理房源超过 100 万间。截至 2021 年 3 月,自如已开通北京、上海、深圳、杭州、南京、广州、成都、天津、武汉、苏州 10 大城市。 自如通过打造涵盖 To C 和 To B 的品质居住产品、逐步实现城市居住生活领域全链条的线上化、数据化、智能化的平台能力。自如 APP 装机量累计达 1.4 亿次,日均线上服务调用达 4 亿次,拥有智能化房源万余间。自如现已在 PC、APP、微信全渠道实现租房、服务、社区的 O2O 闭环,省去传统租房模式所有中间冗余环节,通过 O2O 模式重构居住市场格局,并建立了中国最大的 O2O 青年居住社区。
在拥有庞大用户群体情况下,自如为了给用 户提供更加优质的产品体验,实现企业的数字化转型,从 2021 年开始大力发展实时计算,Flink 在自如的实时计算中一直扮演着重要的角色。到目前为止,自如每日需要处理 TB 级别的数据,总共拥有 500+ 个实时作业,并支撑每日超过 1000 万次的数据调用请求。
实时计算遇到的挑战
在自如,实时计算大概分为 2 个应用场景:
-
数据同步:包括 Kafka、Mysql、Mongo 数据同步到 Hive / Paimon / ClickHouse 等。
-
实时数仓:包括出租、收房、家服等业务实时指标。
在实时计算实践过程中遇到了一些挑战,大致如下:
01 作业上线效率低
在自如实时作业的开发上线流程是:数据仓库开发人员将 Flink SQL 代码嵌入到程序中,在本地进行代码调试,然后编译成 FatJar,最后再将作业以工单和 JAR 包形式提交给运维,运维负责作业上线的小伙伴再通过命令行的方式将作业部署到线上 Kubernetes session 环境。可以看到这一过程涉及到诸多环节,每一步都是需要人为介入,效率极其低下,而且非常容易出错,影响工作效率和稳定性。因此,我们亟需构建一套高效、自动化的实时计算平台,以满足日益增涨的实时计算需求。
02 作业归属信息不明确
由于实时计算平台没有对作业进行统一管理,业务代码都是由 GitLab 管理,虽然解决了部分问题,但是我们发现仓库代码与线上部署的 Flink 作业管理之间仍然存在缺陷:缺乏明确归属、缺少分组和有效权限控制,导致作业管理混乱且责任链路难以追溯。为确保代码和上线作业的一致和可控性,亟需建立严格且清晰的作业管理体系,其中包括实行严格的代码版本控制、明确作业归属和负责人、以及建立有效的权限控制。
03 作业维护困难
在自如有多个不同版本的 Flink 作业在运行,由于 Apache Flink® 的 API 在大版本升级中经常会发生变动,且不保证向下兼容性,这直接导致流作业项目代码的升级成本变得很高。因此如何管理这些不同版本的作业成了头痛的问题。
由于没有统一的作业平台,这些作业在提交时,只能通过执行脚本的形式进行提交。不同的作业有不同重要程度和数据量级,作业所需资源和运行参数也都各不相同,都需要相应的修改。我们可以通过修改提交脚本或直接在代码中设置参数来进行修改,但这使得配置信息的获取变得困难,尤其是当作业出现重启或失败时,FlinkUI 无法打开,配置信息变成一个黑盒。因此,亟需建立一个更加高效、支持配置实时 计算平台。
04 作业开发调试困难
在以往的开发流程中,我们通常在本地的 IDEA 环境中,通过将 SQL 代码嵌入程序代码的方式进行作业的开发和调试,以验证程序的正确性。然而,这种方式存在如下弊端:
1.多数据源调试困难。通常,一个需求可能涉及多个不同的数据源。为了在本地环境调试,开发人员需要申请开通数据访问需要的白名单,这一过程既耗时又繁琐。
2.SQL 代码难以阅读和修改。由于 SQL 代码是嵌入在程序代码中的,这使得代码难以阅读,修改起来也相当不便。更为困难的是,当需要通过 SQL 片段的方式进行调试时,由于缺少 SQL 版本管理和语法校验的支持,开发人员很难通过客户端日志定位到具体的 SQL 行号,从而找出执行失败的原因。
因此,急需提高开发、调试的效率
寻求解决方案之路
在平台构建的初期阶段,2022 年初开始我们就全面调查了行业内的几乎所有相关项目,涵盖了商业付费版和开源版。经过调查和对比发现,这些项目都或多或少地存在一定的局限性,可用性和稳定性也无法有效地保障。
综合下来 StreamPark 在我们的评估中表现最优,是唯一一个没有硬伤且扩展性很强的项目:同时支持 SQL 和 JAR 作业,在 Flink 作业的部署模式上也是最为完善和稳定的,特有的架构设计使 得不仅不会锁定 Flink 版本,还支持便捷的版本切换和并行处理,有效解决了作业依赖隔离和冲突的问题。我们重点关注的作业管理 & 运维能力也非常完善,包括监控、告警、SQL 校验、SQL 版本对比、CI 等功能,StreamPark 对 Flink on K8s 的支持也是我们调研的所有开源项目中最为完善的。但 StreamPark 的 K8s 模式提交需在本地构建镜像导致存储资源消耗。
目前在最新的 2.2 版本中社区已经重构了这部分实现
在深入分析了众多开源项目的优缺点后,我们认为可以先去参与了解那些拥有优秀的架构、充满发展潜力,并且核心团队积极努力的项目,投身其中。基于这样的认识,我们做出了如下决策:
1.在作业部署模式上,我们决定采用 On Kubernetes 的模式。实时作业的资源消耗具有动态性,对 Kubernetes 提供的弹性扩缩容有强烈的需求,这有助于我们更好地应对数据产出的波动,确保作业的稳定运行。
2.在开源组件的选择上,我们经过各项指标综合对比评估,最终选择了当时的 StreamX。后续和社区保持密切的沟通,在此过程中深刻感受到创始人认真负责的态度和社区的团结友善的氛围,也见证了项目 2022 年 09 月加入 Apache 孵化器的过程,这让我们对该项目的未来充满希望。
3.在 StreamPark 基础上,我们要推动与公司已有生态的整合,以便更好地满足我们的业务需求。
基于 Apache StreamPark™ 的深度实践
基于上述决策,我们启动了以 “痛点需求” 为导向的实时计算平台演进 工作,基于StremaPark 打造一个稳定、高效、易维护的实时计算平台。从 2022 年初开始我们便参与社区的建设,同时我们内部平台建设也正式提上日程。
首先我们在 StreamPark 的基础上进一步完善相关的功能:
01 LDAP 登录支持
在 StreamPark 的基础上,我们进一步完善了相关的功能,其中包括对 LDAP 的支持,以便我们未来可以完全开放实时能力,让公司的四个业务线所属的分析师能够使用该平台,预计届时人数将达到 170 人左右。随着人数的增加,账号的管理变得越发重要,特别是在人员变动时,账号的注销和申请将成为一项频繁且耗时的操作。所以,接入 LDAP 变得尤为重要。因此我们及时和社区沟通,并且发起讨论,最终我们贡献了该 Feature。现在在 StreamPark 开启 LDAP 已经变得非常简单,只需要简单两步即可:
step1: 填写对应的 LDAP 配置:
编辑 application.yml 文件,设置 LDAP 基础信息,如下:
ldap:
# Is ldap enabled?
enable: false
## AD server IP, default port 389
urls: ldap://99.99.99.99:389
## Login Account
base-dn: dc=streampark,dc=com
username: cn=Manager,dc=streampark,dc=com
password: streampark
user:
identity-attribute: uid
email-attribute: mail
step2: LDAP 登陆
登录界面点击 LDAP 方式登录,然后输入对应的账号密码,点击登录即可
02 提交作业自动生成 Ingress
由于公司的网络安全政策,运维人员在 Kubernetes 的宿主机上仅开放了 80 端口,这导致我们无法直接通过 “域名+随机端口” 的方式访问在 Kubernetes 上的作业 WebUI。为了解决这个问题,我们需要使用Ingress在访问路径上增加一层代理,从而启到访问路由的效果。在 StreamPark 2.0 版本我们贡献了 Ingress 相关的功能[3]。采用了策略模式的实现方式,在初始构建阶段,获取 Kubernetes 的元数据信息来识别其版本,针对不同版本来进行相应的对象构建,确保了在各种 Kubernetes 环境中都能够顺利地使用 Ingress 功能。
具体的配置步骤如下:
step1: 点击进入 Setting-> 选择 Ingress Setting,填写域名
step2: 提交作业
点击进入 K8s 管理平台,可以观察到在提交 Flink 作业的同时会对应提交一个名称相同的 Ingress
step3: 点击进入 Flink 的 WebUI
可以观察到生成的地址将由三部分组成:域名 + 作业提交的命名空间 + 作业名称
03 支持查看作业部署日志
在持续部署作业的过程中,我们逐渐意识到,没有日志就无法进行有效的运维操作,日志的留存归档和查看成为了我们在后期排查问题时非常重要的一环。因此在 StreamPark 2.0 版本我们贡献了 On Kubernetes 模式下启动日志存档、页面查看的能力[4],现在点击作业列表里的日志查看按钮,可以很方便的查看作业的实时日志。
04 集成 Grafana 监控图表链接
在实际的使用过程中,我们发现随着作业数量的增加,使用人数的上升,以及涉及的部门的增多,面临故障自排查困难的问题。我们团队的运维能力实际上是非常有限的。由于专业领域的不同,当我们告诉用户去 Grafana、ELK 上查看图表和日志时,用户通常会感到无从下手,不知道如何去找到与自己作业相关的信息。
为了解决这个问题,我们在社区提出一个需求:希望每个作业都能够通过超链接直接跳转到对应的监控图表和日志归档页面,这样使用者就可以直接查看与自己作业相关的监控信息和日志。无需在复杂的系统界面中进行繁琐的搜索,从而提高故障排查的效率。
我们在社区展开了讨论、并很快得到响应、大家都认为这是一个普遍存在的需求、因此很快有开发小伙伴提交了设计和相关PR,该问题也很快被解决,现在在 StreamPark 中要开启该功能已经变得非常简单:
step1: 创建徽章标签
step2: 将徽章标签和跳转链接进行关联
step3: 点击徽章标签进行链接跳转
05 集成 Flink sql security 权限控制
在我们的系统中,血缘关系管理采用 Apache Atlas,权限管理基于开源项目 Flink-sql-security,这是一个FlinkSQL数据脱敏和行级权限解决方案的开源项目,支持面向用户级别的数据脱敏和行级数据访问控制, 即特定用户只能访问到脱敏后的数据或授权过的行。
这种设计是为了处理一些复杂的继承逻辑。例如,当将加密字段 age 的A 表与 B 表进行 join 操作,得到 C 表时,C 表中的 age 字段应继承 A 表的加密逻辑,以确保数据的加密状态不会因数据处理过程中的转换而失效。这样,我们可以更好地保护数据的安全性,确保数据在整个处理过程中都符合安全标准。
在权限控制方面,我们基于 Flink-sql-security 做了二次开发,实现了 flink-sql-security-streampark 插件。基本的实现思路如下:
1.在检查提交时系统会解析传入的 SQL,获取 InputTable 和 OutputTable 两个数据集。
2.系统通过对远端权限服务的查询,获取到用户所绑定的 RBAC(基于角色的访问控制)权限。
3.根据获取到的 RBAC 权限,系统会得到对应的表的加密规则。
4.系统将通过重写 SQL,将原本 SQL 查询字段包上一层预设的加密算法,进而实现逻辑的重组。
5.最后,系统会根据重组后的逻辑,进行相应的提交。
通过这样的整合和插件的开发,我们实现了对用户查询请求的权限控制,从而确保了数据的安全性。
01行级权限条件
输入 SQL
SELECT * FROM orders;执行SQL
用户 A 的真实执行 SQL:
SELECT * FROM orders WHERE region = 'beijing';
用户 B 的真实执行 SQL:
SELECT * FROM orders WHERE region = 'hangzhou';
02字段脱敏条件
输入 SQL
SELECT name, age, price, phone FROM user;
执行 SQL:
用户 A 的真实执行 SQL:
SELECT Encryption_function(name), age, price, Sensitive_field_functions(phone) FROM user;
用户 B 的真实执行 SQL:
SELECT name, Encryption_function(age), price, Sensitive_field_functions(phone) FROM user;
06 基于 Apache StreamPark™ 的数据同步平台
随着 StreamPark 的技术解决方案在公司的成功落地,我们实现了对 Flink 作业的深度支持,从而为数据处理带来质的飞跃。这促使我们对过往的数据同步逻辑进行彻底的革新,目标是通过技术的优化和整合,最大限度地降低运维成本。因此,我们逐步替换了历史上的 Sqoop 作业、Canal 作业和 Hive JDBC Handler 作业,转而采用 Flink CDC 作业、Flink 流和批作业。在这个过程中,我们也不断优化和强化 StreamPark 的接口能力,新增了状态回调机制,同时实现了与 DolphinScheduler[7] 调度系统的完美集成,进一步提升了我们的数据处理能力。
外部系统集成 StreamPark 步骤如下,只需要简单几个步骤即可:
1.首先创建 API 访问的 Token:
2.查看 Application 外部调用链接:
curl -X POST '/flink/app/start' \
-H 'Authorization: $token' \
-H 'Content-Type: application/x-www-form-urlencoded; charset=UTF-8' \
--data-urlencode 'savePoint=' \
--data-urlencode 'allowNonRestored=false' \
--data-urlencode 'savePointed=false' \
--data-urlencode 'id=100501'
3.DolphinScheduler 中配置 Http 调度
实践经验总结
在深度使用 StreamPark 实践过程中,我们总结了一些常见问题和实践过程中所探索出解决方案,我们把这些汇总成示例,仅供大家参考。
01 构建 Base 镜像
要使用 StreamPark 在 Kubernetes 上部署一个 Flink 作业,首先要准备一个基于 Flink 构建的 Base 镜像。然后,在 Kubernetes 平台上,会使用用户所提供的镜像来启动 Flink 作业。如果是沿用官方所提供的 “裸镜像”,在实际开发中是远远不够的,用户开发的业务逻辑往往会涉及到上下游多个数据源,这就需要相关数据源的 Connector,以及 Hadoop 等关联依赖。因此需要将这部分依赖项打入镜像中,下面我将介绍具体操作步骤。
step1: 首先创建一个文件夹,内部包含两个文件夹和一个 Dockerfile 文件
conf 文件夹:存放 HDFS 配置文件,主要用于配置 Flink 的 Checkpoint 写入和FlinkSQL 中使用 Hive 的元数据
lib 文件夹:存放涉及到的 Jar 依赖包,如下:
Dockerfile 文件用于定义镜像的构建
FROM apache/flink:1.14.5-scala_2.11-java8
ENV TIME_ZONE=Asia/Shanghai
COPY ./conf /opt/hadoop/conf
COPY lib $FLINK_HOME/lib/
step2: 镜像构建命令使用多架构构建模式,如下:
docker buildx build --push --platform linux/amd64 -t ${私有镜像仓库地址}
02 Base 镜像集成 Arthas 示例
随着公司内新发布上线的运行作业越来越多,团队内常常会遇到作业在长时间运行后性能下降的这种问题,如 Kafka 消费能力减弱、内存使用增加和 GC 时间延长等,我们是比较推荐使用Arthas,一 款阿里巴巴开源的 Java 诊断工具。可以通过全局视角实时查看java应用 load、内存、gc、线程的状态信息,并能在不修改应用代码的情况下,完成查看方法调用的出入参、异常,监测方法执行耗时,类加载信息等,可以大大提升我们线上问题排查效率。
因此做了 base 镜像集成 arthas,来方便运行时问题的排查。
FROM apache/flink:1.14.5-scala_2.11-java8
ENV TIME_ZONE=Asia/Shanghai
COPY ./conf /opt/hadoop/conf
COPY lib $FLINK_HOME/lib/
RUN apt-get update --fix-missing && apt-get install -y fontconfig --fix-missing && \
apt-get install -y openjdk-8-jdk && \
apt-get install -y ant && \
apt-get clean;
RUN apt-get install sudo -y
# Fix certificate issues
RUN apt-get update && \
apt-get install ca-certificates-java && \
apt-get clean && \
update-ca-certificates -f;
# Setup JAVA_HOME -- useful for docker commandline
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME
RUN apt-get install -y unzip
RUN curl -Lo arthas-packaging-latest-bin.zip 'https://arthas.aliyun.com/download/latest_version?mirror=aliyun'
RUN unzip -d arthas-latest-bin arthas-packaging-latest-bin.zip
03 镜像中依赖冲突的解决方式
在使用 StreamPark 的过程中,我们常遇到基于 Base 镜像运行的 Flink 作业中出现 NoClassDefFoundError、ClassNotFoundException 和 NoSuchMethodError 这三种依赖冲突异常。排查思路就是,找到报错中所示的冲突类,所在的包路径。例如这个报错的类在 org.apache.orc:orc-core, 就到相应模块的目录下跑 mvn dependency::tree 然后搜 orc-core,看一下是谁带进来的依赖,用 exclusion 去掉就可以了。下面我通过一个 base 镜像中的 flink-shaded-hadoop-3-uber JAR 包引起的依赖冲突示例,来详细介绍通过自定义打包的方法来解决依赖冲突。