Kafka 数据管道是流计算系统中最常用的数据源(Source)和数据目的(Sink)。您可以把流数据导入到 Kafka 的某个 Topic 中,通过 Flink 算子进行处理后,输出到相同或不同 Kafka 的另一个 Topic 中。

Kafka 支持同一个 Topic 多分区读写,数据可以从多个分区读入,也可以写入到多个分区,以提供更高的吞吐量,减少数据倾斜和热点。

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

Kafka 支持作为数据源表(Source),也可以作为目的表(Sink)。

DDL 定义

在实际使用中请根据实际情况配置字段名和 WITH 参数。

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

元数据字段

以下元数据可以作为表定义中的元数据字段进行访问。
读/写列定义元数据字段是否可读(R)可写(W)可读写(R/W)
只读列(R)必须声明为 VIRTUAL。

Key 数据类型 说明 读/写

topic

STRING NOT NULL

Kafka 消息所在的 Topic 名称。

R

partition

INT NOT NULL

Kafka 消息所在的分区 ID。

R

headers

MAP<STRING, BYTES> NOT NULL

Kafka 消息的消息头(header)。

R/W

leader-epoch

INT NULL

Kafka 消息的 Leader epoch。

R

offset

BIGINT NOT NULL

分区中 Kafka 消息的偏移量。

R

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL

Kafka 消息的时间戳。

R/W

timestamp-type

STRING NOT NULL

Kafka 消息的时间戳类型。

  • NoTimestampType:消息中没有定义时间戳。

  • CreateTime:消息产生的时间。

  • LogAppendTime:消息被添加到 Kafka Broker 的时间。

R

Kafka 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

固定值为 kafka

topic

String

Kafka Topic 名称。
多个 Topic 以 ; 分隔,例如 topic-1;topic-2

注意

topic 和 topic-pattern 两个选项只能指定其中一个。

topic-pattern

String

匹配读取 Topic 名称的正则表达式。
所有匹配该正则表达式的 Topic 在作业运行时均会被订阅。

注意

topic 和 topic-pattern 两个选项只能指定其中一个。

properties.bootstrap.servers

String

Kafka Broker 地址列表,以 , 分隔,格式为 host:port,host:port,host:port

properties.group.id

String

Kafka 消费组 ID。

properties.*

String

后缀名称必须是 Kafka 配置文档 中定义的配置项。Flink 会将 properties. 前缀移除,并将剩余的键和值传递给 Kafka 客户端。
例如可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用自动创建 Topic。
但是有一些配置不支持,例如 key.deserializervalue.deserializer,因为 Flink 会覆盖它们。

format

String

在反序列化来自 Kafka 的消息 value 部分时使用的格式。
取值如下:

  • csv

  • json

  • avro

  • avro-confluent

  • debezium-json

  • canal-json

  • Maxwell-json

  • raw

有关更多详细信息,请参考官方文档

value.format

String

与 format 同样含义,只能配置其中一个。

key.format

String

在反序列化来自 Kafka 的消息 value 部分时使用的格式。
取值如下:

  • csv

  • json

  • avro

  • avro-confluent

  • debezium-json

  • canal-json

  • Maxwell-json

  • raw

有关更多详细信息,请参考官方文档

注意

如果指定了 key.format 参数,则也必须指定 key.fields 参数。

key.fields

String

消息键解析出来的数据存放的字段。
多个字段名以;分隔,例如:field1;field2
默认不配置该参数,key 数据将被丢弃。

key.fields-prefix

String

为所有消息键指定自定义前缀,以避免与消息体格式字段重名,默认前缀为空。
如果定义了自定义前缀,表 schema 和配置项 key.fields 请使用带前缀的名称。
当构建消息 key 数据类型时,前缀会被移除,将使用无前缀的名称。

注意

该配置项要求必须将 value.fields-include 配置为 EXCEPT_KEY。

value.fields-include

ALL

String

控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:

  • ALL:消息 value 解析出来的数据将包含 schema 中所有的字段,包括定义为 key.fields 的字段。

  • EXCEPT_KEY:除去 key.fields 定义字段,剩余 schema 定义字段可以用来存放消息 value 解析出来的数据。

scan.startup.mode

group-offset

String

Kafka consumer 的启动模式。包括:lastest-offset、earliest-offset、specific-offset、group-offset、timestamp。详细信息请参考:启动模式

scan.startup.specific-offsets

Sring

scan.startup.mode 选择 specific-offsets 时填写,指定各个分区 offset 的位置。
例如:partition:0,offset:42;partition:1,offset:300

scan.startup.timestamp-millis

Long

scan.startup.mode 选择 timestamp 时填写,指定启动的时间戳,单位为毫秒。
例如:1639979252461

scan.topic-partition-discovery.interval

Duration

Kafka consumer 定期发现动态创建的 Kafka topic 和分区的时间间隔。
例如:100s

Kafka 结果表 WITH 参数

参数 是否必填 默认值 数据类型 描述

connector

String

固定值为 kafka

topic

String

结果写入的 Topic 名称。

properties.bootstrap.servers

String

Kafka Broker 地址列表,以(,)分隔,格式为 host:port,host:port,host:port

sink.partitioner

Default

String

Flink 分区到 Kafka 分区的映射模式。有效值为:

  • default:使用 kafka 默认 partitioner 对记录进行分区。

  • fixed:每个 Flink 分区对应至多一个 Kafka 分区。

  • round-robin:Flink 分区中的数据将被轮流分配到 Kafka 的各个分区。它仅在未指定 key 时有效。 自定义分区:通过实现 FlinkKafkaPartitioner 来自定义分区,例如org.mycompany.MyPartitioner
    更多详细信息,请参考接收器分区

sink.parallelism

Integer

Kafka sink 算子的并行度,默认情况下,框架使用上游算子相同的平行度。

properties.group.id

String

Kafka 消费组 ID。

sink.semantic

at-least-once

String

Kafka的写入策略。

  • at-least-once:保证不会丢失任何记录(可能重复)。

  • exactly-once:恰好一次。

  • none:Flink 不做任何保证。生成的记录可能会丢失或重复。

value.format

String

与 format 同样含义,只能配置其中一个。

key.fields-prefix

String

为所有消息 key 指定自定义前缀,以避免与消息 value 字段重名,默认前缀为空。
如果定义了自定义前缀,表 schema 和配置项 key.fields 请使用带前缀的名称。
当构建消息 key 数据类型时,前缀会被移除,将使用无前缀的名称。

注意

该配置项要求必须将 value.fields-include 配置为 EXCEPT_KEY。

format

String

序列化 Kafka 消息 value 时使用的格式。

  • csv

  • json

  • avro

  • raw

有关更多详细信息,请参考官方文档

key.format

String

序列化 Kafka 消息 value 部分时使用的格式。取值如下:

  • csv

  • json

  • avro

  • raw

有关更多详细信息,请参考官方文档

注意

如果指定了 key.format 参数,则也必须指定 key.fields 参数。

value.fields-include

ALL

String

控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:

  • ALL:消息 value 解析出来的数据将包含 schema 中所有的字段,包括定义为 key.fields 的字段。

  • EXCEPT_KEY:除去 key.fields 定义字段,剩余 schema 定义字段可以用来存放消息 value 解析出来的数据。

key.fields

String

消息 key 的数据字段。
多个字段名以 ; 分隔,如:field1;field2。
默认不配置该参数,key 数据将被丢弃。

CSV 格式

详情请参见:CSV 格式

CSV 格式 DDL 定义

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '<yourKafkaBrokers>',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'csv.ignore-parse-errors' = 'true',
 'csv.allow-comments' = 'true'
)

CSV 格式 WITH 参数

参数 是否必填 默认值 数据类型 描述

format

String

格式,固定值为 csv

csv.field-delimiter

,

String

指定 CSV 字段分隔符,默认是半角逗号。字段分隔符,必须是单个字符。您可以使用反斜杠指定特殊字符,例如表示制表符。您还可以使用 unicode,例如 'csv.field-delimiter' = U&'\0001'表示 0x01 字符。

csv.disable-quote-character

false

Boolean

禁止字段包围引号。如果为 true,则 csv.quote-character 选项不可用。

csv.quote-character

"

String

字段包围引号。默认是双引号。

csv.allow-comments

false

Boolean

忽略 # 开头的注释行(请务必将 csv.ignore-parse-errors 设为 true)。

csv.ignore-parse-errors

false

Boolean

忽略处理错误。对于无法解析的字段,会输出为 null。

csv.array-element-delimiter

;

String

数组元素的分隔符。

csv.escape-character

String

指定转义符,默认禁用转义。

csv.null-literal

String

将指定的字符串看作 null 值。

JSON 格式

详情请参见:JSON 格式

JSON 格式 DDL 定义

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '<yourKafkaBrokers>',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

JSON 格式 WITH 参数

参数 是否必填 默认值 数据类型 描述

format

String

格式,固定值为json

json.fail-on-missing-field

false

Boolean

  • 如果为 true,则遇到缺失字段时,会让作业失败。

  • 如果为 false,则只会把缺失字段设置为 null 并继续处理。

json.ignore-parse-errors

false

Boolean

  • 如果为 true,则遇到解析异常时,会把这个字段或行设置为 null 并继续处理。

  • 如果为 false,则会让作业失败。

json.timestamp-format.standard

SQL

String

指定 JSON 时间戳 TIMESTAMP TIMESTAMP WITH LOCAL TIME ZONE 类型字段的格式,可选值为 SQL、ISO-8601。

  • 默认是 SQL,格式是yyyy-MM-dd HH:mm:ss.s{可选精度}

  • 也可以选择 ISO-8601,格式是 yyyy-MM-ddTHH:mm:ss.s{可选精度}

json.map-null-key.mode

FAIL

String

序列化 Map 遇到 null key 时的处理模式:

  • FAIL:抛出异常。

  • DROP:丢弃 null key 记录。

  • LITERAL:用 json.map-null-key.literal 中定义的字符串替换。

json.map-null-key.literal

null

String

json.map-null-key.mode 定义为 LITERAL 时,指定字符串以替换空键。

Avro 格式

详情请参见:Avro 格式

Avro 格式 DDL 定义

CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '<yourKafkaBrokers>',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)

Avro 格式 WITH 参数

参数 是否必填 默认值 数据类型 描述

format

String

格式,固定值为 avro

Confluent Avro 格式

Avro Schema Registry (avro-confluent) 格式支持读取由 io.confluent.kafka.serializers.KafkaAvroSerializer 序列化后的消息

输出的消息响应的也可以被 io.confluent.kafka.serializers.KafkaAvroDeserializer 读取。详情请参见:Confluent Avro 格式

Debezium 格式

Debezium 是一个 CDC(Changelog Data Capture)工具,可以将 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的实时 changelog 流式传输到 Kafka。Debezium 为 changelog 提供统一的 schema 格式,并支持使用 JSON 和 Apache Avro 序列化消息。详情请参见:Debezium 格式

Canal 格式

Canal 是一个 CDC(Changelog Data Capture)工具,可以将 MySQL 中的实时 changelog 流式传输到其他系统。Canal 为 changelog 提供了统一的 schema 格式,并支持使用 JSON 和 protobuf 序列化消息(protobuf 是 Canal 的默认格式)。详情请参见:Canal 格式

Maxwell 格式

Maxwell 是一个 CDC(Changelog Data Capture)工具,可以将 MySQL 的实时 changelog 流式传输到 Kafka 和其他流式 connector。Maxwell 为 changelog 提供统一的 schema 格式,并支持使用 JSON 序列化消息。详情请参见:Maxwell 格式

Raw 格式

Raw 格式允许将原始(基于字节的)值作为单列进行读写。详情请参见:Raw 格式

Raw 格式 DDL 定义

CREATE TABLE nginx_log (
  log STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'nginx_log',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = 'testGroup',
  'format' = 'raw'
)

Raw 格式 WITH 参数

参数 是否必填 默认值 数据类型 描述

format

String

格式,固定值为 raw

raw.charset

UTF-8

String

编码字符集

raw.endianness

big-endian

String

编码字节序。有效值为 big-endianlittle-endian

代码示例

示例一

从 Kafka 中读取数据后插入 Kafka。
从名称为 source 的 Topic 中读取 Kafka 数据,再写入名称为 sink 的 Topic,数据使用 json 格式。

CREATE TEMPORARY TABLE Kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE Kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'value.format' = 'json'
);

INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;

示例二

key 和 value 包含相同名称的字段。

CREATE TABLE KafkaTable (
  `k_version` INT,
  `k_user_id` BIGINT,
  `k_item_id` BIGINT,
  `version` INT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  ...

  'key.format' = 'json',
  'key.fields-prefix' = 'k_',
  'key.fields' = 'k_version;k_user_id;k_item_id',
  'value.format' = 'json',
  'value.fields-include' = 'EXCEPT_KEY'
)

示例三

常规 kafka 与 upsert-kafka 做 join 查询,对实时交易数据与实时汇率数据做联合查询,获取实时交易额。

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = 'localhost:9092',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json',
  'value.fields-include' = 'ALL'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  t.total,
  c.currency_code,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;