Apache Kafka Connector
Apache Flink® 官方提供了 Apache Kafka 的连接器,用于从 Kafka 主题中读取或者向其中写入数据,可提供精确一次的处理语义。
Apache StreamPark 中 KafkaSource 和 KafkaSink 基于官网的 Kafka Connector 进一步封装,屏蔽了很多细节,简化开发步骤,让数据的读取和写入更简单。
依赖
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。有关 Kafka 兼容性的更多细节,请参考 Apache Kafka 的官方文档。
<!--必须要导入的依赖-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<!--flink-connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
同时,在开发阶段,以下依赖也是必要的:
<!--以下 scope 为 provided 的依赖也是必须要导入的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Kafka Source (Consumer)
先介绍基于官网的标准的 kafka 消费方式,以下代码摘自 Apache Kafka 官网文档
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
可以看到,一上来定义了一堆 Kafka 的连接信息。这种方式下,各项参数都是硬编码的方式写死的,非常不灵敏。下面,我们来看看如何用 StreamPark 接入 Kafka 的数据。只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码介绍如下。