本文为您介绍如何使用云数据库Tair(Tair企业版)连接器。
背景信息
云数据库 Tair(兼容 Redis) 是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求。
Tair连接器支持的信息如下。
类别 |
详情 |
支持类型 |
结果表 |
运行模式 |
仅支持流模式 |
数据格式 |
String |
特有监控指标 |
说明
指标含义详情,请参见 监控指标说明 。 |
API种类 |
SQL |
是否支持更新或者删除结果表数据 |
是 |
前提条件
使用限制
-
仅Flink计算引擎VVR 6.0.6及以上版本支持云数据库Tair(Tari企业版)连接器。
-
云数据库Tair连接器不支持配置多个host。
语法结构
云数据库Tair在兼容Redis数据结构STRING、LIST、SET、HASHMAP和SORTEDSET的基础上,支持所有Tair自研数据结构。
DDL语句定义示例如下。
CREATE TABLE tair_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- 必填。
) WITH (
'connector'= 'tair',
'host' = '<yourHost>'
);
云数据库Tair兼容Redis数据结构,Redis数据结构的语法示例请参见 云数据库Redis 。
WITH参数
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
connector |
表类型。 |
String |
是 |
无 |
固定值为tair。 |
host |
Tair Server连接地址。 |
String |
是 |
无 |
推荐您使用内网地址。
说明
由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。 |
mode |
对应Tair数据结构。 |
String |
是 |
无 |
参数取值如下:
Tair支持的数据结构包括Redis原生数据结构和Tair自研数据结构。参数取值详情请参见 Redis结果表数据结构格式 和 Tair自研数据结构格式 。
说明
|
port |
Tair Server连接端口。 |
Int |
否 |
6379 |
无。 |
password |
Tair数据库密码。 |
String |
否 |
空字符串 |
空字符串表示不进行校验。 |
dbNum |
目标数据库编号。 |
Int |
否 |
0 |
无。 |
clusterMode |
是否为集群模式。 |
Boolean |
否 |
false |
参数取值如下:
|
ignoreDelete |
是否忽略Retraction消息。 |
Boolean |
否 |
false |
参数取值如下:
|
expiration |
为写入数据对应的key设置TTL。 |
Long |
否 |
0 |
0表示未设置TTL。如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。 |
expirationAt |
为写入数据对应的key设置绝对过期时间。 |
Long |
否 |
0 |
单位为毫秒,默认值为0,代表不设过期时间。 如果该参数的值大于0且expiration参数为0,则写入数据对应的Key会被设置相应的绝对过期时间。 |
incrMode |
对应Tair的sink模式。 |
String |
否 |
None |
参数取值如下:
|
incrValue |
incrMode下incr的值。 |
String |
否 |
无 |
参数取值如下:
|
fieldExpireMode |
tairhash结构field级别或tairts结构Skey级别的过期模式。 |
String |
否 |
None |
参数取值如下:
|
fieldExpireValue |
tairhash结构 field级别或tairts结构Skey级别过期时间。 |
String |
否 |
无 |
参数取值如下:
|
类型映射
Flink字段类型 |
Tair字段类型 |
VARCHAR |
STRING |
DOUBLE |
DOUBLE |
Tair自研数据结构格式
类型 |
格式 |
操作命令 |
incrMode为None时,DDL为2列:
|
|
|
incrMode为int或float时,DDL为1列,第1列为key,STRING类型。 |
|
|
incrMode为dynamic_int或dynamic_float时,DDL为2列:
|
|
|
incrMode为None时,DDL为3列:
|
|
|
incrMode为int或float时,DDL为2列:
|
|
|
incrMode为dynamic_int或dynamic_float时,DDL为3列:
|
|
|
incrMode为None时,Tairzset支持多维度的排序能力,最大支持256维的 double类型的分值排序,所以DDL介于 3列和258列之间。
|
说明
如果您需实现多维度排序,则各维度的score格式必须相同。 |
|
incrMode为int或float时,DDL为2列:
|
|
|
incrMode为dynamic_int或dynamic_float时,DDL为3列:
|
|
|
只支持incrMode为None模式。 第一次插入时会自动创建一个默认容量(capacity)为100,错误率(error_rate)为0.01的Tairbloom。DDL为2列:
|
|
|
只支持incrMode为None模式,DDL为3列:
|
|
|
incrMode为None时,DDL为4列:
|
说明
插入数据前需要先创建索引并添加映射,命令如下。
|
|
incrMode为int或float时,DDL为4列:
|
文档操作命令如下。
说明
插入数据前需要先创建索引并添加映射,命令如下。
|
|
incrMode为dynamic_int或dynamic_float时,DDL为5列:
|
文档操作命令如下。
说明
插入数据前需要先创建索引并添加映射,命令如下。
|
|
incrMode为只支持为None模式。DDL为2列:
|
|
|
incrMode只支持为None模式。DDL为3列:
|
|
|
incrMode只支持为None模式。DDL为3列:
|
|
|
incrMode只支持为None模式。DDL为6列:
|
说明
插入数据前需要先创建索引并添加映射,命令如下。
|
|
incrMode为None时,DDL为4列:
|
|
|
incrMode为float时,DDL为3列:
|
|
|
incrMode为dynamic_float时,DDL 为4列:
|
|
使用示例
-
普通模式插入结果数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' CREATE TEMPORARY TABLE tair_output ( index_name STRING, doc_id STRING, doc STRING, mapping STRING, PRIMARY KEY(index_name) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairsearch', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}' INSERT INTO tair_output SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping FROM datagen_stream;
-
incr模式插入结果数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' CREATE TEMPORARY TABLE tair_output ( key STRING, step STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'dynamic_float', 'incrValue' = 'step' INSERT INTO tair_output SELECT * FROM datagen_stream;
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING ) WITH ( 'connector' = 'datagen' CREATE TEMPORARY TABLE tair_output ( key STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairstring', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'incrMode' = 'float', 'incrValue' = '11.11' INSERT INTO tair_output SELECT v FROM datagen_stream;
-
fieldExpire模式写入结果表数据示例
CREATE TEMPORARY TABLE datagen_stream ( v STRING, p STRING, s STRING ) WITH ( 'connector' = 'datagen' CREATE TEMPORARY TABLE tair_ouput ( key STRING, field STRING, value STRING, PRIMARY KEY (key) NOT ENFORCED ) WITH ( 'connector' = 'tair', 'mode' = 'tairhash', 'host' = '${tairHost}', 'port' = '${tairPort}', 'password' = '${password}', 'fieldExpireMode' = 'millisecond', 'fieldExpireValue' = '1000' INSERT INTO tair_output SELECT v, p, s FROM datagen_stream;