版本说明

当前仅支持 Flink 1.12 版本。

使用范围

使用 JDBC 驱动程序从 PostgreSQL 中读取数据或将数据写入其中。

支持作为数据源、维表、数据目的。

DDL 定义

CREATE TABLE pg_table (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://localhost:5433/demo',
   'table-name' = 'users',
   'username' = 'root',
   'password' = '123456'
);

PostgreSQL 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

PostgreSQL 数据库 JDBC URL。

table-name

String

PostgreSQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

PostgreSQL 数据库用户名。

password

String

PostgreSQL 数据库密码。

scan.partition.column

Integer

指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。

scan.partition.num

Integer

分区扫描启用后,指定分区数。

scan.partition.lower-bound

Integer

分区扫描启用后,指定第一个分区的最小值。

scan.partition.upper-bound

Integer

分区扫描启用后,指定最后一个分区的最大值。

scan.fetch-size

0

Integer

每次从数据库读取时,批量获取的行数。

scan.auto-commit

true

Boolean

自动提交标志,决定每个语句是否在事务中自动提交。

说明
  1. scan.partition.lower-boundscan.partition.upper-bound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。

  2. 分区扫描功能可以加速读取数据,每个子任务可以读取自己的分区。使用该功能时,四个 scan.partition 开头的参数都必须指定。

PostgreSQL 维表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

PostgreSQL 数据库 JDBC URL。

table-name

String

PostgreSQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

PostgreSQL 数据库用户名。

password

String

PostgreSQL 数据库密码。

lookup.cache.max-rows

Integer

Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。

lookup.cache.ttl

Duration

Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。

lookup.max-retries

3

Integer

数据库查询失败时,最多重试的次数。

说明
  1. Lookup 缓存提升维表读取性能,目前仅支持同步读取模式。

  2. 默认情况下,Lookup 缓存未启用,所有请求需要请求数据库。

  3. 通过设置 lookup.cache.max-rows 和 lookup.cache.ttl 可以启用 Lookup 缓存,这时每个进程(即 TaskManager)都会持有一份缓存。Flink 会先查找缓存,缓存未命中时会向数据库发送请求,并根据返回的值更新缓存。

PostgreSQL 结果表 WITH 参数

参数值 必填 默认值 数据类型 描述

connector

String

连接器,固定值为 jdbc

url

String

PostgreSQL 数据库 JDBC URL。

table-name

String

PostgreSQL 表名。

driver

String

JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。

username

String

PostgreSQL 数据库用户名。

password

String

PostgreSQL 数据库密码。

sink.buffer-flush.max-rows

100

Integer

批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。

sink.buffer-flush.interval

1s

Duration

每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。

sink.max-retries

3

Integer

数据库写入失败时,最多重试的次数。

说明
  • 如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

  • 对于 PostgreSQL 表,Upsert 功能的实现采用 INSERT .. ON CONFLICT .. DO UPDATE SET .. 语法。

类型映射

PostgreSQL 字段类型 Flink SQL 字段类型

SMALLINT
INT2
SMALLSERIAL
SERIAL2

SMALLINT

INTEGER
SERIAL

INT

BIGINT
BIGSERIAL

BIGINT

BIGINT

BIGINT

REAL
FLOAT4

FLOAT

FLOAT8
DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)
DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]


CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT

STRING

BYTEA

BYTES

ARRAY

ARRAY

代码示例

-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
    name STRING,
    age INT
) WITH (
    'connector' = 'datagen'
)

-- 用 Flink SQL 注册 PostgreSQL 表 students
CREATE TEMPORARY TABLE pg_sink(
    name VARCHAR,
    age INT
) WITH (
    'connector' = 'jdbc'
    'url' = 'jdbc://postgresql://localhost:5433/mydb',
    'table-name' = 'students',
    'username' = 'root',
    'password' = '123456'
)

INSERT INTO pg_sink
SELECT * FROM datagen_source;