数据库 PostgreSQL
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
使用 JDBC 驱动程序从 PostgreSQL 中读取数据或将数据写入其中。
支持作为数据源、维表、数据目的。
DDL 定义
CREATE TABLE pg_table (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5433/demo',
'table-name' = 'users',
'username' = 'root',
'password' = '123456'
);
PostgreSQL 源表 WITH 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
是 |
无 |
String |
连接器,固定值为 |
url |
是 |
无 |
String |
PostgreSQL 数据库 JDBC URL。 |
table-name |
是 |
无 |
String |
PostgreSQL 表名。 |
driver |
否 |
无 |
String |
JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username |
是 |
无 |
String |
PostgreSQL 数据库用户名。 |
password |
是 |
无 |
String |
PostgreSQL 数据库密码。 |
scan.partition.column |
否 |
无 |
Integer |
指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。 |
scan.partition.num |
否 |
无 |
Integer |
分区扫描启用后,指定分区数。 |
scan.partition.lower-bound |
否 |
无 |
Integer |
分区扫描启用后,指定第一个分区的最小值。 |
scan.partition.upper-bound |
否 |
无 |
Integer |
分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size |
否 |
0 |
Integer |
每次从数据库读取时,批量获取的行数。 |
scan.auto-commit |
否 |
true |
Boolean |
自动提交标志,决定每个语句是否在事务中自动提交。 |
说明 |
---|
|
PostgreSQL 维表 WITH 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
是 |
无 |
String |
连接器,固定值为 |
url |
是 |
无 |
String |
PostgreSQL 数据库 JDBC URL。 |
table-name |
是 |
无 |
String |
PostgreSQL 表名。 |
driver |
否 |
无 |
String |
JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username |
是 |
无 |
String |
PostgreSQL 数据库用户名。 |
password |
是 |
无 |
String |
PostgreSQL 数据库密码。 |
lookup.cache.max-rows |
否 |
无 |
Integer |
Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。 |
lookup.cache.ttl |
否 |
无 |
Duration |
Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。 |
lookup.max-retries |
否 |
3 |
Integer |
数据库查询失败时,最多重试的次数。 |
说明 |
---|
|
PostgreSQL 结果表 WITH 参数
参数值 | 必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
是 |
无 |
String |
连接器,固定值为 |
url |
是 |
无 |
String |
PostgreSQL 数据库 JDBC URL。 |
table-name |
是 |
无 |
String |
PostgreSQL 表名。 |
driver |
否 |
无 |
String |
JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 |
username |
是 |
无 |
String |
PostgreSQL 数据库用户名。 |
password |
是 |
无 |
String |
PostgreSQL 数据库密码。 |
sink.buffer-flush.max-rows |
否 |
100 |
Integer |
批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。 |
sink.buffer-flush.interval |
否 |
1s |
Duration |
每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。 |
sink.max-retries |
否 |
3 |
Integer |
数据库写入失败时,最多重试的次数。 |
说明 |
---|
|
类型映射
PostgreSQL 字段类型 | Flink SQL 字段类型 |
---|---|
SMALLINT |
SMALLINT |
INTEGER |
INT |
BIGINT |
BIGINT |
BIGINT |
BIGINT |
REAL |
FLOAT |
FLOAT8 |
DOUBLE |
NUMERIC(p, s) |
DECIMAL(p, s) |
BOOLEAN |
BOOLEAN |
DATE |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
STRING |
BYTEA |
BYTES |
ARRAY |
ARRAY |
代码示例
-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
name STRING,
age INT
) WITH (
'connector' = 'datagen'
)
-- 用 Flink SQL 注册 PostgreSQL 表 students
CREATE TEMPORARY TABLE pg_sink(
name VARCHAR,
age INT
) WITH (
'connector' = 'jdbc'
'url' = 'jdbc://postgresql://localhost:5433/mydb',
'table-name' = 'students',
'username' = 'root',
'password' = '123456'
)
INSERT INTO pg_sink
SELECT * FROM datagen_source;