版本说明

当前仅支持 Flink 1.12 版本。

使用范围

支持读取和写入 HBase 集群。

DDL 定义

注意
  • HBase 表所有列簇需要声明为 ROW 类型,内部定义列名和类型,可定义多组。

  • 非 ROW 类型的字段会被识别为 rowkey,rowkey 可以定义为任意名字。

-- 在 Flink SQL 里注册 HBase 表 'mytable'
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW<q1 INT>,
 family2 ROW<q2 STRING, q3 BIGINT>,
 family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
 'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);

HBase 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,目前支持 hbase-2.2

table-name

String

HBase 表名。

zookeeper.quorum

String

HBase 的 zookeeper 地址。

zookeeper.znode.parent

/hbase

String

HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}

null-string-literal

null

String

HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。

HBase 维表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,目前支持 hbase-2.2

table-name

String

HBase 表名。

zookeeper.quorum

String

HBase 的 zookeeper 地址。

zookeeper.znode.parent

/hbase

String

HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}

null-string-literal

null

String

HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。

HBase 结果表 WITH 参数

参数值 是否必填 默认值 数据类型 描述

connector

String

连接器,目前支持 hbase-2.2

table-name

String

HBase 表名。

zookeeper.quorum

String

HBase 的 zookeeper 地址。

zookeeper.znode.parent

/hbase

String

HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如:/hbase/{集群id}

null-string-literal

null

String

HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。

sink.buffer-flush.max-size

2mb

MemorySize

每次写入请求在内存中缓存的数据量。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0 禁用。

sink.buffer-flush.max-rows

1000

Integer

每次写入请求在内存中缓存的数据行数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0 禁用。

sink.buffer-flush.interval

1s

Duration

周期性 flush 缓存数据到 HBase 的时间间隔。可以提高 HBase 写入性能,但会增加写入延迟。设置为 0 禁用。

sink.parallelism

Integer

HBase sink 算子的并行度,默认情况下,框架使用上游算 子相同的平行度。

类型映射

HBase 将所有数据存储为字节数组。

Flink 字段类型 HBase 字段类型

CHAR / VARCHAR / STRING

byte[] toBytes(String s)
String toString(byte[] b)

BOOLEAN

byte[] toBytes(boolean b)
boolean toBoolean(byte[] b)

BINARY / VARBINARY

byte[] 按原样返回。

DECIMAL

byte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }
bytes[0] // returns first and only byte from bytes

SMALLINT

byte[] toBytes(short val)
short toShort(byte[] bytes)

INT

byte[] toBytes(int val)
int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)
long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)
float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)
double toDouble(byte[] bytes)

DATE

将日期转换成自 1970.01.01 以来的天数,用 int 表示。

TIME

将时间转换成自 00:00:00 以来的毫秒数,用 int 表示。

TIMESTAMP

将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示。

ARRAY

不支持

MAP / ULTISET

不支持

ROW

不支持

代码示例

-- 使用 datagen 连接器生成随机数据
CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) with (
  'connector'='datagen'
);

-- 在 Flink SQL 里注册 HBase 表 'demo'
CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCE
) with (
  'connector'='hbase-2.2',
  'table-name'='demo',
  'zookeeper.quorum'='localhost:2181'
  'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);

-- 从 datagen_source 中读取数据并写入 hbase_sink 中
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;