版本说明

当前仅支持 Flink 1.12 版本。

使用范围

ClickHouse 仅支持作为目的表(Sink)。

DDL 定义

CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    `list` ARRAY<STRING>,
    `map` Map<STRING, BIGINT>,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://{ip}:{port}',
    'database-name' = 'tutorial',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);

ClickHouse 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 clickhouse

url

String

ClickHouse 服务的URL地址,clickhouse://{ip}:{port}。

username

String

ClickHouse 服务的用户名。

password

String

ClickHouse 服务的密码。

database-name

default

String

ClickHouse 数据库名称。

table-name

String

ClickHouse 服务的表名。

sink.batch-size

1000

Integer

刷新频率,数据量超过该大小会刷新写入数据。

sink.flush-interval

1s

Duration

刷新频率,超过这个刷新间隔 异步线程会刷新写入数据。

sink.max-retries

3

Integer

写入失败重试最大次数,超过该次数任务会失败终止。

sink.write-local

false

Boolean

针对 distributed 表,会直接写入当前指定 url 的节点表中。

sink.partition-startegy

balanced

String

写入策略:balanced(轮询),hash(根据分区 key 的 hash 值写入),shuffle(随机写入)。

sink.partition-key

String

hash 策略中的分区 key。

sink.ignore-delete

true

Boolean

是否将 update 语句视为 insert 语句并忽略删除,默认为 true。

类型映射

ClickHouse 类型 Flink SQL 类型

STRING

CHAR

String / IP / UUID

VARCHAR

String / Enum

STRING

UInt8

BOOLEAN

FixedString

BYTES

Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256

DECIMAL

Int8

TINYINT

Int16 / UInt8

SMALLINT

Int32 / UInt16 / Interval

INTEGER

Int64 / UInt32

BIGINT

Float32

FLOAT

Float64

DOUBLE

Date

DATE

DateTime

TIME

DateTime

TIMESTAMP

DateTime

TIMESTAMP_LTZ

Int32

INTERVAL_YEAR_MONTH

Int64

INTERVAL_DAY_TIME

Not supported

ROW

Not supported

MULTISET

Not supported

RAW

代码示例

CREATE TABLE source(
    id INT,
    province STRING
) WITH (
    'connector' = 'faker',
    'fileds.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.province.expression'  = '#{regexify ''(河北省|山西省|辽宁省|吉林省|黑龙江省|江苏省|浙江省|安徽省|福建省|江西省|山东省|河南省|湖北省|湖南省|广东省|海南省|四川省|贵州省|云南省|陕西省|甘肃省|青海省|台湾省){1}''}',
    'rows-per-second'          = '1'
);

CREATE TABLE clickhouse_sink(
    id INT,
    province STRING
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://localhost:6379',
    'database-name' = 'tutorial',
    'username' = 'default',
    'password' = 'default',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);

INSERT INTO clickhouse_sink SELECT * FROM source;