版本说明

当前仅支持 Flink 1.12 版本。

使用范围

使用 JDBC 驱动程序从 MySQL 中读取数据或将数据写入其中。
支持作为数据源、维表、数据目的。

DDL 定义

CREATE TABLE mysql_table (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/demo',
   'table-name' = 'users',
   'username' = 'root',
   'password' = '123456'
);

源表 WITH 参数

参数 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

MySQL 数据库 JDBC URL。

table-name

String

MySQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

MySQL 数据库用户名。

password

String

MySQL 数据库密码。

scan.partition.column

Integer

指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。

scan.partition.num

Integer

分区扫描启用后,指定分区数。

scan.partition.lower-bound

Integer

分区扫描启用后,指定第一个分区的最小值。

scan.partition.upper-bound

Integer

分区扫描启用后,指定最后一个分区的最大值。

scan.fetch-size

0

Integer

每次从数据库读取时,批量获取的行数。

scan.auto-commit

true

Boolean

自动提交标志,决定每个语句是否在事务中自动提交。

说明
  • scan.partition.lower-boundscan.partition.upper-bound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。

  • 分区扫描功能可以加速读取数据,每个子任务可以读取自己的分区。使用该功能时,四个 scan.partition 开头的参数都必须指定。

维表 WITH 参数

参数 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

MySQL 数据库 JDBC URL。

table-name

String

MySQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

MySQL 数据库用户名。

password

String

MySQL 数据库密码。

lookup.cache.max-rows

Integer

Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。

lookup.cache.ttl

Duration

Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。

lookup.max-retries

3

Integer

数据库查询失败时,最多重试的次数。

说明
  • Lookup 缓存提升维表读取性能,目前仅支持同步读取模式。

  • 默认情况下,Lookup 缓存未启用,所有请求需要请求数据库。

  • 通过设置 lookup.cache.max-rowslookup.cache.ttl 可以启用 Lookup 缓存,这时每个进程(即 TaskManager)都会持有一份缓存。Flink 会先查找缓存,缓存未命中时会向数据库发送请求,并根据返回的值更新缓存。

结果表 WITH 参数

参数 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

MySQL 数据库 JDBC URL。

table-name

String

MySQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

MySQL 数据库用户名。

password

String

MySQL 数据库密码。

sink.buffer-flush.max-rows

100

Integer

批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。

sink.buffer-flush.interval

1s

Duration

每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。

sink.max-retries

3

Integer

数据库写入失败时,最多重试的次数。

说明
  • 如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

  • 对于 MySQL 表,Upsert 功能的实现采用 INSERT .. ON DUPLICATE KEY UPDATE .. 语法。

类型映射

MySQL 字段类型 Flink SQL 字段类型

TINYINT

TINYINT

SMALLINT
TINYINT UNSIGNED

SMALLINT

INT
MEDIUMINT
SMALLINT UNSIGNED

INT

BIGINT
INT UNSIGNED

BIGINT

BIGINT
UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE
DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)
DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN
TINYINT(1)

BOOLEAN

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)
VARCHAR(n)
TEXT

STRING

BINARY
VARBINARY
BLOB

BYTES

代码示例

示例一

-- 用 Flink SQL 注册 MySQL 表 users
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

CREATE TABLE MY_DIM (
    key BIGINT,
    desc STRING,
    rt AS PROCTIME(),
    PRIMARY KEY (key) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    'table-name' = 'user_extr'
);

-- 从另一张表 T 迁移数据到 MyUserTable 表
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

-- 查看数据
SELECT id, name, age, status FROM MyUserTable;

-- 将 MyUserTable 作为维表与其他流式数据源进行 temporal join
SELECT * FROM MyUserTable LEFT JOIN MY_DIM FOR SYSTEM_TIME AS OF MY_DIM.rt
ON MY_DIM.key = MyUserTable.id;

示例二

-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
    name STRING,
    age INT
) WITH (
    'connector' = 'datagen'
)

-- 用 Flink SQL 注册 MySQL 表 students
CREATE TEMPORARY TABLE mysql_sink(
    name VARCHAR,
    age INT
) WITH (
    'connector' = 'jdbc'
    'url' = 'jdbc://mysql://localhost:3306/mydb',
    'table-name' = 'students',
    'username' = 'root',
    'password' = '123456'
)

INSERT INTO mysql_sink
SELECT * FROM datagen_source;