添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
兴奋的玉米  ·  Mapping Attributes✨ | ...·  3 周前    · 
快乐的板凳  ·  CREATE TABLE - Amazon ...·  3 周前    · 
纯真的炒饭  ·  Cannot read ...·  3 周前    · 
深情的绿茶  ·  Real-time data ...·  昨天    · 
想表白的海龟  ·  数据库内核月报·  昨天    · 
正直的饺子  ·  Unittest with ...·  4 月前    · 
好帅的乒乓球  ·  TimeOut – not able to ...·  8 月前    · 
旅途中的硬盘  ·  Error "clear" from ...·  8 月前    · 

本文档中的代码对 PyODPS 0.11.3 及后续版本有效。对早于 0.11.3 版本的 PyODPS,请使用 odps.models.Schema 代替 odps.models.TableSchema ,使用 schema 属性代替 table_schema 属性。

我们可以用 ODPS 入口对象的 list_tables 来列出项目空间下的所有表。

for table in o.list_tables():
    print(table.name)

可以通过 prefix 参数只列举给定前缀的表:

for table in o.list_tables(prefix="table_prefix"):
    print(table.name)

通过该方法获取的 Table 对象不会自动加载表名以外的属性,此时获取这些属性(例如 table_schema 或者 creation_time)可能导致额外的请求并造成额外的时间开销。如果需要在列举表的同时读取这些属性,在 PyODPS 0.11.5 及后续版本中,可以为 list_tables 添加 extended=True 参数:

for table in o.list_tables(extended=True):
    print(table.name, table.creation_time)

如果你需要按类型列举表,可以指定 type 参数。不同类型的表列举方法如下:

managed_tables = list(o.list_tables(type="managed_table"))  # 列举内置表
external_tables = list(o.list_tables(type="external_table"))  # 列举外表
virtual_views = list(o.list_tables(type="virtual_view"))  # 列举视图
materialized_views = list(o.list_tables(type="materialized_view"))  # 列举物化视图

通过调用 exist_table 来判断表是否存在。

o.exist_table('dual')

通过调用 get_table 来获取表。

>>> t = o.get_table('dual')
>>> t.table_schema
odps.Schema {
  c_int_a                 bigint
  c_int_b                 bigint
  c_double_a              double
  c_double_b              double
  c_string_a              string
  c_string_b              string
  c_bool_a                boolean
  c_bool_b                boolean
  c_datetime_a            datetime
  c_datetime_b            datetime
>>> t.lifecycle
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
>>> t.comment
'Dual Table Comment'
>>> t.table_schema.columns
[<column c_int_a, type bigint>,
 <column c_int_b, type bigint>,
 <column c_double_a, type double>,
 <column c_double_b, type double>,
 <column c_string_a, type string>,
 <column c_string_b, type string>,
 <column c_bool_a, type boolean>,
 <column c_bool_b, type boolean>,
 <column c_datetime_a, type datetime>,
 <column c_datetime_b, type datetime>]
>>> t.table_schema['c_int_a']
<column c_int_a, type bigint>
>>> t.table_schema['c_int_a'].comment
'Comment of column c_int_a'

通过提供 project 参数,来跨project获取表。

>>> t = o.get_table('dual', project='other_project')

创建表的Schema

有两种方法来初始化。第一种方式通过表的列、以及可选的分区来初始化。

>>> from odps.models import TableSchema, Column, Partition
>>> columns = [Column(name='num', type='bigint', comment='the column'),
>>>            Column(name='num2', type='double', comment='the column2')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = TableSchema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]
>>> schema.partitions
[<partition pt, type string>]
>>> schema.names  # 获取非分区字段的字段名
['num', 'num2']
>>> schema.types  # 获取非分区字段的字段类型
[bigint, double]

第二种方法是使用 Schema.from_lists,这种方法更容易调用,但显然无法直接设置列和分区的注释了。

>>> schema = TableSchema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>,
 <column num2, type double>,
 <partition pt, type string>]

可以使用表 schema 来创建表,方法如下:

>>> table = o.create_table('my_new_table', schema)
>>> table = o.create_table('my_new_table', schema, if_not_exists=True)  # 只有不存在表时才创建
>>> table = o.create_table('my_new_table', schema, lifecycle=7)  # 设置生命周期

更简单的方式是采用“字段名 字段类型”字符串来创建表,方法如下:

>>> table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
>>> # 创建分区表可传入 (表字段列表, 分区字段列表)
>>> table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)

在未经设置的情况下,创建表时,只允许使用 bigint、double、decimal、string、datetime、boolean、map 和 array 类型。如果你使用的是位于公共云上的服务,或者支持 tinyint、struct 等新类型,可以设置 options.sql.use_odps2_extension = True 打开这些类型的支持,示例如下:

>>> from odps import options
>>> options.sql.use_odps2_extension = True
>>> table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body string>')

行记录 Record

Record表示表的一行记录,我们在 Table 对象上调用 new_record 就可以创建一个新的 Record。

>>> t = o.get_table('mytable')
>>> r = t.new_record(['val0', 'val1'])  # 值的个数必须等于表schema的字段数
>>> r2 = t.new_record()  #  也可以不传入值
>>> r2[0] = 'val0' # 可以通过偏移设置值
>>> r2['field1'] = 'val1'  # 也可以通过字段名设置值
>>> r2.field1 = 'val1'  # 通过属性设置值
>>> print(record[0])  # 取第0个位置的值
>>> 




    
print(record['c_double_a'])  # 通过字段取值
>>> print(record.c_double_a)  # 通过属性取值
>>> print(record[0: 3])  # 切片操作
>>> print(record[0, 2, 3])  # 取多个位置的值
>>> print(record['c_int_a', 'c_double_a'])  # 通过多个字段取值

获取表数据

有若干种方法能够获取表数据。首先,如果只是查看每个表的开始的小于1万条数据,则可以使用 head 方法。

>>> t = o.get_table('dual')
>>> for record in t.head(3):
>>>     # 处理每个Record对象

其次,在 table 实例上可以执行 open_reader 操作来打一个 reader 来读取数据。如果表为分区表,需要引入 partition 参数指定需要读取的分区。

使用 with 表达式的写法,with 表达式会保证离开时关闭 reader:

>>> with t.open_reader(partition='pt=test,pt2=test2') as reader:
>>>     count = reader.count
>>>     for record in reader[5:10]:  # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
>>>         # 处理一条记录

不使用 with 表达式的写法:

>>> reader = t.open_reader(partition='pt=test,pt2=test2')
>>> count = reader.count
>>> for record in reader[5:10]:  # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
>>>     # 处理一条记录
>>> reader.close()

更简单的调用方法是使用 ODPS 对象的 read_table 方法,例如

>>> for record in o.read_table('test_table', partition='pt=test,pt2=test2'):
>>>     # 处理一条记录

从 0.11.2 开始,PyODPS 支持使用 Arrow 格式读写数据,该格式可以以更高效率与 pandas 等格式互相转换。安装 pyarrow 后,在调用 open_reader 时增加 arrow=True 参数,即可按 Arrow RecordBatch 格式读取表内容。

>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>>     count = reader.count
>>>     for batch in reader:  # 可以执行多次,直到将所有 RecordBatch 读完
>>>         # 处理一个 RecordBatch,例如转换为 Pandas
>>>         print(batch.to_pandas())

你也可以直接调用 reader 上的 to_pandas 方法直接从 reader 获取 pandas DataFrame。 读取时,可以指定起始行号(从0开始)和行数。如果不指定,则默认读取所有数据。

>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>>     # 指定起始行号和行数
>>>     pd_df = reader.to_pandas(start=10, count=20)
>>>     # 如不指定,则读取所有数据
>>>     pd_df = reader.to_pandas()

你可以利用多进程加速读取 Pandas DataFrame:

>>> import multiprocessing
>>> n_process = multiprocessing.cpu_count()
>>> with t.open_reader(partition='pt=test,pt2=test2', arrow=True) as reader:
>>>     pd_df = reader.to_pandas(n_process=n_process)

为方便读取数据为 pandas,从 PyODPS 0.12.0 开始,Table 和 Partition 对象支持直接调用 to_pandas

>>> # 将表读取为 pandas DataFrame
>>> pd_df = table.to_pandas(start=10, count=20)
>>> # 通过2个进程读取所有数据
>>> pd_df = table.to_pandas(n_process=2)
>>> # 将分区读取为 pandas
>>> pd_df = partitioned_table.to_pandas(partition="pt=test", start=10, count=20)

与此同时,从 PyODPS 0.12.0 开始,你也可以使用 iter_pandas 方法从一张表或分区按多个批次读取 pandas DataFrame,并通过 batch_size 参数指定每次读取的 DataFrame 批次大小,该大小默认值为 options.tunnel.read_row_batch_size 指定,默认为 1024。

>>> # 以默认 batch_size 读取所有数据
>>> for batch in table.iter_pandas():
>>>     print(batch)
>>> # 以 batch_size==100 读取前 1000 行数据
>>> for batch in table.iter_pandas(batch_size=100, start=0, count=1000):
>>>     print(batch)

open_readerread_table 以及 to_pandas 方法仅支持读取单个分区。如果需要读取多个分区的值,例如读取所有符合 dt>20230119 这样条件的分区,需要使用 iterate_partitions 方法,详见 遍历表分区 章节。

导出数据是否包含分区列的值由输出格式决定。Record 格式数据默认包含分区列的值,而 Arrow 格式默认不包含。从 PyODPS 0.12.0 开始,你可以通过指定 append_partitions=True 显示引入分区列的值,通过 append_partitions=False 将分区列排除在结果之外。

向表写数据

类似于 open_reader,table对象同样能执行 open_writer 来打开writer,并写数据。如果表为分区表,需要引入 partition 参数指定需要写入的分区。

使用 with 表达式的写法,with 表达式会保证离开时关闭 writer 并提交所有数据:

>>> with t.open_writer(partition='pt=test') as writer:
>>>     records = [[111, 'aaa', True],                 # 这里可以是list
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # 这里records可以是可迭代对象
>>>     records = [t.new_record([111, 'aaa', True]),   # 也可以是Record对象
>>>                t.new_record([222, 'bbb', False]),
>>>                t.new_record([333, 'ccc', True]),
>>>                t.new_record([444, '中文', False])]
>>>     writer.write(records)

如果分区不存在,可以使用 create_partition 参数指定创建分区,如

>>> with t.open_writer(partition='pt=test', create_partition=True) as writer:
>>>     records




    
 = [[111, 'aaa', True],                 # 这里可以是list
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     writer.write(records)  # 这里records可以是可迭代对象

更简单的写数据方法是使用 ODPS 对象的 write_table 方法,例如

>>> records = [[111, 'aaa', True],                 # 这里可以是list
>>>            [222, 'bbb', False],
>>>            [333, 'ccc', True],
>>>            [444, '中文', False]]
>>> o.write_table('test_table', records, partition='pt=test', create_partition=True)

注意:每次调用 write_table,MaxCompute 都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。因此,我们建议在使用 write_table 方法时,一次性写入多组数据,或者传入一个 generator 对象。

write_table 写表时会追加到原有数据。如果需要覆盖数据,可以为 write_table 增加一个参数 overwrite=True (仅在 0.11.1 以后支持),或者调用 table.truncate() / 删除分区后再建立分区。

你可以使用多线程写入数据。从 PyODPS 0.11.6 开始,直接将 open_writer 创建的 Writer 对象分发到各个线程中即可完成多线程写入,写入时请注意不要关闭 writer,待所有数据写入完成后再关闭 writer。

import random
# Python 2.7 请从三方库 futures 中 import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
def write_records(writer):
    for i in range(5):
        # 生成数据并写入
        record = table.new_record([random.randint(1, 100), random.random()])
        writer.write(record)
N_THREADS = 3
# 此处省略入口对象 o 的创建过程
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
with table.open_writer() as writer:
    pool = ThreadPoolExecutor(N_THREADS)
    futures = []
    for i in range(N_THREADS):
        futures.append(pool.submit(write_records, writer))
    # 等待线程中的写入完成
    [f.result() for f in futures]

你也可以使用多进程写入数据,以避免 Python GIL 带来的性能损失。从 PyODPS 0.11.6 开始,只需要将 open_writer 创建的 Writer 对象通过 multiprocessing 标准库传递到需要写入的子进程中即可写入。需要注意的是,与多线程的情形不同,你应当在每个子进程完成写入后关闭 writer,并在所有写入子进程退出后再关闭主进程 writer(或离开 with 语句块),以保证所有数据被提交。

import random
from multiprocessing import Pool
def write_records(writer):
    for i in range(5):
        # 生成数据并写入
        record = table.new_record([random.randint(1, 100), random.random()])
        writer.write(record)
    # 需要手动在每个子进程中关闭连接
    writer.close()
# 如果在独立的 Python 代码文件中,需要判断是否代码按主模块执行
# 以防止下面的代码被 multiprocessing 反复执行
if __name__ == '__main__':
    N_WORKERS = 3
    # 此处省略入口对象 o 的创建过程
    table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
    with table.open_writer() as writer:
        pool = Pool(processes=N_WORKERS)
        futures = []
        for i in range(N_WORKERS):
            futures.append(pool.apply_async(write_records, (writer,)))
        # 等待子进程中的执行完成
        [f.get() for f in futures]

从 0.11.2 开始,PyODPS 支持使用 Arrow 格式读写数据,该格式可以以更高效率与 pandas 等格式互相转换。安装 pyarrow 后,在调用 open_writer 时增加 arrow=True 参数,即可按 Arrow RecordBatch 格式写入表内容。PyODPS 也支持直接写入 pandas DataFrame,支持自动转换为 Arrow RecordBatch。

>>> import pandas as pd
>>> import pyarrow as pa
>>> with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
>>>     records = [[111, 'aaa', True],
>>>                [222, 'bbb', False],
>>>                [333, 'ccc', True],
>>>                [444, '中文', False]]
>>>     df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
>>>     # 写入 RecordBatch
>>>     batch = pa.RecordBatch.from_pandas(df)
>>>     writer.write(batch)
>>>     # 也可以直接写入 Pandas DataFrame
>>>     writer.write(df)

为方便写入 pandas DataFrame,从 0.12.0 开始,PyODPS 支持直接通过 write_table 方法写入 pandas DataFrame。如果写入数据前对应表不存在,可以增加 create_table=True 参数以自动创建表。

>>> import pandas as pd
>>> df = pd.DataFrame([
>>>     [111, 'aaa', True],
>>>     [222, 'bbb', False],
>>>     [333, 'ccc', True],
>>>     [444, '中文', False]
>>> ], columns=['num_col', 'str_col', 'bool_col'])
>>> # 如果表 test_table 不存在,将会自动创建
>>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True)

从 PyODPS 0.12.0 开始,write_table 方法也支持动态分区,可通过 partitions 参数传入需要作为分区的列名,并指定 create_partition=True,相应的分区将会自动创建。

>>> import pandas as pd
>>> df = pd.DataFrame([
>>>     [111, 'aaa', True, 'p1'],
>>>     [222, 'bbb', False, 'p1'],
>>>     [333, 'ccc', True, 'p2'],
>>>     [444, '中文', False, 'p2']
>>> 




    
], columns=['num_col', 'str_col', 'bool_col', 'pt'])
>>> # 如果分区 pt=p1 或 pt=p2 不存在,将会自动创建。
>>> o.write_table('test_part_table', df, partitions=['pt'], create_partition=True)

压缩选项

为加快数据上传 / 下载速度,你可以在上传 / 下载数据时设置压缩选项。具体地,可以创建一个 CompressOption 实例,在其中指定需要的压缩算法及压缩等级。目前可用的压缩算法包括 zlib 和 ZSTD,其中 ZSTD 需要额外安装 zstandard 包。

from odps.tunnel import CompressOption
compress_option = CompressOption(
    compress_algo="zlib",  # 算法名称
    level=0,               # 压缩等级,可选
    strategy=0,            # 压缩策略,可选,目前仅适用于 zlib

此后可在 open_reader / open_writer 中设置压缩选项,例如:

with table.open_writer(compress_option=compress_option) as writer:
    # 写入数据,此处从略

如果仅需指定算法名,也可以直接在 open_reader / open_writer 中指定 compress_algo 参数,例如

with table.open_writer(compress_algo="zlib") as writer:
    # 写入数据,此处从略

PyODPS提供了 DataFrame框架 ,支持更方便地方式来查询和操作ODPS数据。 使用 to_df 方法,即可转化为 DataFrame 对象。

>>> table = o.get_table('my_table_name')
>>> df = table.to_df()

判断表是否为分区表:

>>> if table.table_schema.partitions:
>>>     print('Table %s is partitioned.' % table.name)

判断分区是否存在(该方法需要填写所有分区字段值):

>>> table.exist_partition('pt=test,sub=2015')

判断给定前缀的分区是否存在:

>>> # 表 table 的分区字段依次为 pt, sub
>>> table.exist_partitions('pt=test')

获取一个分区的相关信息:

>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size

这里的”分区”指的不是分区字段而是所有分区字段均确定的分区定义对应的子表。如果某个分区字段对应多个值, 则相应地有多个子表,即多个分区。而 get_partition 只能获取一个分区的信息。因而,

  • 如果某些分区未指定,那么这个分区定义可能对应多个子表,get_partition 时则不被 PyODPS 支持。此时,需要使用 iterate_partitions 分别处理每个分区。

  • 如果某个分区字段被定义多次,或者使用类似 pt>20210302 这样的非确定逻辑表达式,则无法使用 get_partition 获取分区。在此情况下,可以尝试使用 iterate_partitions 枚举每个分区。

  • 如果要遍历部分分区值确定的分区,可以使用 iterate_partitions 方法。

    >>> for partition in table.iterate_partitions(spec='pt=test'):
    >>>     print(partition.name)
    

    自 PyODPS 0.11.3 开始,支持为 iterate_partitions 指定简单的逻辑表达式及通过逗号连接,每个子表达式均须满足的复合逻辑表达式。或运算符暂不支持。

    >>> for partition in table.iterate_partitions(spec='dt>20230119'):
    >>>     print(partition.name)
    

    在 0.11.3 之前的版本中,iterate_partitions 仅支持枚举前若干个分区等于相应值的情形。例如, 当表的分区字段按顺序分别为 pt1、pt2 和 pt3,那么 iterate_partitions 中的 spec 参数只能指定 pt1=xxx 或者 pt1=xxx,pt2=yyy 这样的形式。自 0.11.3 开始, iterate_partitions 支持更多枚举方式,但仍建议尽可能限定上一级分区以提高枚举的效率。

    删除分区

    下面的操作将删除一个分区:

    >>> t.delete_partition('pt=test', if_exists=True)  # 存在的时候才删除
    >>> partition.drop()  # Partition对象存在的时候直接drop
    开始支持此功能。

    创建分区表并写入一些数据:

    t = o.create_table("test_multi_pt_table", ("col string", "pt1 string, pt2 string"))
    for pt1, pt2 in (("a", "a"), ("a", "b"), ("b", "c"), ("b", "d")):
        o.write_table("test_multi_pt_table", [["value"]], partition="pt1=%s,pt2=%s" % (pt1, pt2))
    

    如果想要获得值最大的分区,可以使用下面的代码:

    >>> part = t.get_max_partition()
    <Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>
    >>> part.partition_spec["pt1"]  # 获取某个分区字段的值
    

    如果只希望获得最新的分区而忽略分区内是否有数据,可以用

    >>> t.get_max_partition(skip_empty=False)
    <Partition cupid_test_release.`test_multi_pt_table`(pt1='b',pt2='d')>
    

    对于多级分区表,可以通过限定上级分区值来获得值最大的子分区,例如

    >>> t.get_max_partition("pt1=a")
    <Partition cupid_test_release.`test_multi_pt_table`(pt1='a',pt2='b')>
    

    不推荐直接使用 Tunnel 接口,该接口较为低级,简单的表写入推荐直接使用 Tunnel 接口上实现的表 接口,可靠性和易用性更高。 只有在分布式写表等复杂场景下有直接使用 Tunnel 接口的需要。

    ODPS Tunnel 是 MaxCompute 的数据通道,用户可以通过 Tunnel 向 MaxCompute 中上传或者下载数据。

    分块上传接口

    直接使用 Tunnel 分块接口上传时,需要首先通过 create_upload_session 方法使用表名和分区创建 Upload Session,此后从 Upload Session 创建 Writer。每个 Upload Session 可多次调用 open_record_writer 方法创建多个 Writer,每个 Writer 拥有一个 block_id 对应一个数据块。完成写入后,需要调用 Upload Session 上的 commit 方法并指定需要提交的数据块列表。如果有某个 block_id 有数据写入但未包括在 commit 的参数中,则该数据块不会出现在最终的表中。

    from odps.tunnel import TableTunnel
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Upload Session
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    # 创建 record writer 并指定需要写入的 block_id 为 0
    with upload_session.open_record_writer(0) as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    # 提交刚才写入的 block 0。多个 block id 需要同时提交
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    upload_session.commit([0])
    

    如果你需要在多个进程乃至节点中使用相同的 Upload Session,可以先创建 Upload Session,并获取其 id 属性。此后在其他进程中调用 create_upload_session 方法时,将该值作为 upload_id 参数。完成每个进程的上传后,需要收集各进程提交数据所用的 block_id,并在某个进程中完成 commit

    from odps.tunnel import TableTunnel
    ##############
    # 主进程
    ##############
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Upload Session
    upload_session_main = tunnel
    
    
    
    
        
    .create_upload_session(table.name, partition_spec='pt=test')
    # 获取 Session ID
    session_id = upload_session_main.id
    # 分发 Session ID,此处省略分发过程
    ##############
    # 子进程
    ##############
    # 使用分发的 upload_id 创建 upload session
    upload_session_sub = tunnel.create_upload_session(table.name, partition_spec='pt=test', upload_id=session_id)
    # 创建 reader 并写入数据,注意区分不同进程的 block_id
    with upload_session_sub.open_record_writer(local_block_id) as writer:
        # ... 生成数据 ...
        writer.write(record)
    # 回传本进程中使用的所有 block_id,此处省略回传过程
    ##############
    # 主进程
    ##############
    # 收集所有子进程上的 block_id,此处省略收集过程
    # 提交收集到的 block_id
    upload_session_main.commit(collected_block_ids)
    

    需要注意的是,指定 block id 后,所创建的 Writer 为长连接,如果长时间不写入会导致连接关闭,并导致写入失败,该时间通常为 5 分钟。如果你写入数据的间隔较大,建议生成一批数据后再通过 open_record_writer 接口创建 Writer 并按需写入数据。如果你只希望在单个 Writer 上通过 Tunnel 写入数据,可以考虑在调用 open_record_writer 时不指定 block id,此时创建的 Writer 在写入数据时将首先将数据缓存在本地,当 Writer 关闭或者缓存数据大于一定大小(默认为 20MB,可通过 options.tunnel.block_buffer_size 指定)时才会写入数据。写入数据后,需要先通过 Writer 上的 get_blocks_written 方法获得已经写入的 block 列表,再进行提交。

    from odps.tunnel import TableTunnel
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Upload Session
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    # 不指定 block id 以创建带缓存的 record writer
    with upload_session.open_record_writer() as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    # 从 writer 获得已经写入的 block id 并提交
    upload_session.commit(writer.get_blocks_written())
    

    使用带缓存的 Writer 时,需要注意不能在同一 Upload Session 上开启多个带缓存 Writer 进行写入,否则可能导致冲突而使数据丢失。

    如果你需要使用 Arrow 格式而不是 Record 格式进行上传,可以将 open_record_writer 替换为 open_arrow_writer,并写入 Arrow RecordBatch / Arrow Table 或者 pandas DataFrame。

    import pandas as pd
    import pyarrow as pa
    from odps.tunnel import TableTunnel
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
    # 使用 open_arrow_writer 而不是 open_record_writer
    with upload_session.open_arrow_writer(0) as writer:
        df = pd.DataFrame({"name": ["test1", "test2"], "id": ["id1", "id2"]})
        batch = pa.RecordBatch.from_pandas(df)
        writer.write(batch)
    # 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
    upload_session.commit([0])
    

    本章节中所述所有 Writer 均非线程安全。你需要为每个线程单独创建 Writer。

    流式上传接口

    MaxCompute 提供了流式上传接口用于简化分布式服务开发成本。可以使用 create_stream_upload_session 方法创建专门的 Upload Session。此时,不需要为该 Session 的 open_record_writer 提供 block id。

    from odps.tunnel import TableTunnel
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    upload_session = tunnel.create_stream_upload_session(table.name, partition_spec='pt=test')
    with upload_session.open_record_writer() as writer:
        record = table.new_record()
        record[0] = 'test1'
        record[1] = 'id1'
        writer.write(record)
        record = table.new_record(['test2', 'id2'])
        writer.write(record)
    

    直接使用 Tunnel 接口下载数据时,需要首先使用表名和分区创建 Download Session,此后从 Download Session 创建 Reader。每个 Download Session 可多次调用 open_record_reader 方法创建多个 Reader,每个 Reader 需要指定起始行号以及需要的行数。起始行号从 0 开始,行数可指定为 Session 的 count 属性,为表或分区的总行数。

    from odps.tunnel import TableTunnel
    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Download Session
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    # 创建 record reader 并指定需要读取的行范围
    with download_session.open_record_reader(0, download_session.count) as reader:
        for record in reader:
            # 处理每条记录
    

    如果你需要在多个进程乃至节点中使用相同的 Download Session,可以先创建 Download Session,并获取其 id 属性。此后在其他进程中调用 create_download_session 方法时,将该值作为 download_id 参数。

    from odps.tunnel import TableTunnel
    ##############
    # 主进程
    ##############
    table = o.get_table('my_table')
    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Download Session
    download_session_main = tunnel.create_download_session(table.name, partition_spec='pt=test')
    # 获取 Session ID
    session_id = download_session_main.id
    # 分发 Session ID,此处省略分发过程
    ##############
    # 子进程
    ##############
    # 使用分发的 upload_id 创建 download session
    download_session_sub = tunnel.create_download_session(table.name, partition_spec='pt=test', download_id=session_id)
    # 创建 reader 并读取数据,注意不同的进程可能需要指定不同的 start / count
    with download_session_sub.open_record_reader(start, count) as reader:
        for record in reader:
            # 处理记录
    

    你也可以通过使用 open_arrow_reader 而不是 open_record_reader 使读取的数据为 Arrow 格式而不是 Record 格式。

    from odps.tunnel import TableTunnel
    tunnel = TableTunnel(o)
    download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
    with download_session.open_arrow_reader(0, download_session.count) as reader:
        for batch in reader:
            # 处理每个 Arrow RecordBatch
    

    压缩选项

    为加快数据上传 / 下载速度,你可以在上传 / 下载数据时设置压缩选项。具体地,可以创建一个 CompressOption 实例,在其中指定需要的压缩算法及压缩等级。目前可用的压缩算法包括 zlib 和 ZSTD,其中 ZSTD 需要额外安装 zstandard 包。

    from odps.tunnel import CompressOption
    compress_option = CompressOption(
        compress_algo="zlib",  # 算法名称
        level=0,               # 压缩等级,可选
        strategy=0,            # 压缩策略,可选,目前仅适用于 zlib
    

    此后,在创建 Upload / Download Session 时,可以指定 compress_option 参数,并在 open_xxx_reader / open_xxx_writer 方法中设置 compress=True 即可启用压缩:

    tunnel = TableTunnel(o)
    # 为 table 和 pt=test 分区创建 Download Session
    download_session = tunnel.create_download_session(
        'my_table', partition_spec='pt=test', compress_option=compress_option
    # 创建 record reader 并指定需要读取的行范围
    with download_session.open_record_reader(0, download_session.count, compress=True) as reader:
        for record in reader:
            # 处理每条记录