Stream Load
Stream load
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
基本原理
下图展示了 Stream load 的主要流程,省略了一些导入细节。
^ +
| |
| | 1A. User submit load to FE
| |
| +--v-----------+
| | FE |
4. Return result to user | +--+-----------+
| |
| | 2. Redirect to BE
| |
| +--v-----------+
+---+Coordinator BE| 1B. User submit load to BE
+-+-----+----+-+
| | |
+-----+ | +-----+
| | | 3. Distrbute data
| | |
+-v-+ +-v-+ +-v-+
|BE | |BE | |BE |
+---+ +---+ +---+
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
支持数据格式
目前 Stream Load 支持数据格式:CSV(文本)、JSON
1.2+ 支持PARQUET 和 ORC基本操作
创建导入
Stream Load 通过 HTTP 协议提交和传输数据。这里通过
curl
命令展示如何提交导入。
用户也可以通过其他 HTTP client 进行操作。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
# Header 中支持属性见下面的 ‘导入任务参数’ 说明
# 格式为: -H "key1:value1"
示例:
curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
创建导入的详细语法帮助执行
HELP STREAM LOAD
查看, 下面主要介绍创建 Stream Load 的部分参数意义。
签名参数
-
user/passwd
Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
导入任务参数
Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream Load 导入任务参数的部分参数意义。
-
label
导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
label 的另一个作用,是防止用户重复导入相同的数据。 强烈推荐用户同一批次数据使用相同的 label。这样同一批次数据的重复请求只会被接受一次,保证了 At-Most-Once
当 label 对应的导入作业状态为 CANCELLED 时,该 label 可以再次被使用。
-
column_separator
用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。
可以使用多个字符的组合作为列分隔符。
-
line_delimiter
用于指定导入文件中的换行符,默认为\n。
可以使用做多个字符的组合作为换行符。
-
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。
如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
计算公式为:
(dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio
dpp.abnorm.ALL
表示数据质量不合格的行数。如类型不匹配,列数不匹配,长度不匹配等等。dpp.norm.ALL
指的是导入过程中正确数据的条数。可以通过SHOW LOAD
命令查询导入任务的正确数据量。原始文件的行数 =
dpp.abnorm.ALL + dpp.norm.ALL
-
where
导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入
num_rows_unselected
。 -
Partitions
待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入
dpp.abnorm.ALL
-
columns
待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
-
format
指定导入数据格式,支持
支持 `csv_with_names` (csv文件行首过滤)、`csv_with_names_and_types`(csv文件前两行过滤)、`parquet`、`orc`。 支持 `arrow`格式。csv
、json
和arrow
,默认是csv
。列顺序变换例子:原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)
如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法如下:
columns: dst_c1, dst_c2, dst_c3
如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法如下:
columns: dst_c2, dst_c3, dst_c1
表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。 -
exec_mem_limit
导入内存限制。默认为 2GB,单位为字节。
-
strict_mode
Stream Load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明
strict_mode=true
。默认的 strict mode 为关闭。strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
- 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
- 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
- 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
-
merge_type
数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
-
two_phase_commit
Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为
PRECOMMITTED
,用户手动触发commit操作之后,数据才可见。 -
enclose
包围符。当csv数据字段中含有行分隔符或列分隔符时,为防止意外截断,可指定单字节字符作为包围符起到保护作用。例如列分隔符为",",包围符为"'",数据为"a,'b,c'",则"b,c"会被解析为一个字段。
-
escape
转义符。用于转义在csv字段中出现的与包围符相同的字符。例如数据为"a,'b,'c'",包围符为"'",希望"b,'c被作为一个字段解析,则需要指定单字节转义符,例如"\",然后将数据修改为"a,'b,\'c'"。
示例:
- 发起stream load预提交操作
curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
{
"TxnId": 18036,
"Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
"TwoPhaseCommit": "true",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 100,
"NumberLoadedRows": 100,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 1031,
"LoadTimeMs": 77,
"BeginTxnTimeMs": 1,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 58,
"CommitAndPublishTimeMs": 0
}-
对事务触发commit操作
注意1) 请求发往fe或be均可
注意2) commit 的时候可以省略 url 中的
{table}
使用事务id
curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc
{
"status": "Success",
"msg": "transaction [18036] commit successfully."
}使用label
curl -X PUT --location-trusted -u user:passwd -H "label:55c8ffc9-1c40-4d51-b75e-f2265b3602ef" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/{table}/_stream_load_2pc
{
"status": "Success",
"msg": "label [55c8ffc9-1c40-4d51-b75e-f2265b3602ef] commit successfully."
}-
对事务触发abort操作
注意1) 请求发往fe或be均可
注意2) abort 的时候可以省略 url 中的
{table}