添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

SelectObject 接口 支持用户对 BOS 中指定格式(CSV/JSON/Parquet)的 object 内容执行 SQL 语句,通过 SQL 这种结构化查询语言对 object 内容进行筛选、分析、过滤之后再返回用户需要的文件内容。

目前用户想对存储在 BOS 的 object 内容进行筛选过滤,只能先通过 GetObject 接口下载单个 object ,然后再在本地对数据分析过滤;而 SelectObject 接口将把筛选过滤工作集成到 BOS 服务层,从而减少客户从 BOS 下载数据的网络带宽和延迟,同时也能节省客户筛选数据消耗的 CPU 和内存等资源,从而降低客户访问BOS中数据所需的应用程序成本。

SelectObject 典型的应用场景是和大数据产品结合使用,替换原来处理 BOS 数据的 GetObject 接口,用于提取日志文件指定内容,数据分析筛选等工作。

如果想使用 SelectObject 接口筛选 BOS 中的 object,需要满足以下限制条件和细节要求:

支持的文件类型

  • 仅支持 select UTF-8 编码的 RFC 4180 标准 CSV(包括TSV等类CSV文件) 、Json 文件和Parquet文件;
  • 支持的 CSV 文件的最大行及最大列长度都是 512K;
  • 支持 select 的 Json 文件包括 DOCUMENT 和 LINES 两种,DOCUMENT 是指整个文件是单一的 JSON 对象,LINES 表示整个文件由多行的 JSON 对象组成,但整个文件本身并不是一个合法的 JSON 对象,行与行之间以换行分隔符隔开,支持用户指定常见的 \n,\r,\n等行列分隔符;
  • 支持 select 标准、低频、冷存储三种存储类别的文件;
  • 支持 select SSE-BOS、SSE-KMS、SSE-C三种服务端加密方式加密之后的文件;
  • 支持 select 通过 GZIP 方式压缩后的文件,流式解压选取内容返回,GZIP 文件不支持 deflate 格式,支持标准为 RFC1952: Gzip 压缩标准参考 。支持检索经过 GZIP 列压缩或 SNAPPY 列压缩的 Parquet文件。
  • 支持的 SQL 语法

  • 目前只支持 SELECT 语法,SQL语句满足 Select field_list From source Where condition Limit number 形式;
  • 支持 string、int(64bit)、float(64bit), timestamp,boolean,decimal 数据类型;
  • 支持逻辑条件(AND/OR/NOT), 算术表达式(+-*/%), 比较运算符(>,=,<,>=,<=,!=),匹配运算符(LIKE,BETWEEN+AND,IN),空判断(IS NULL/IS NOT NULL);
  • 支持聚合函数(AVG、COUNT、MAX、MIN、SUM),转换函数 CAST ,别名关键词 AS;
  • 仅支持单文件查询,不支持 join、order by、group by、having、offset 等关键词。
  • SQL 语句限制

  • 单个 SQL 语句最大长度 16K,最大的列数是 1000,列名称最长为 1024,聚合操作(count/avg等)最多 100 个;
  • SQL 语句中字符串需要使用单引号包围,标识符使用双引号包围;例如 SELECT * FROM "ident" = 'str' ,其中 ident 表示数据中的标识符,str 表示具体的字符串值;
  • LIMIT 函数优先级高于聚合函数,例如 Select avg(cast(_1 as int)) from BosObject limit 100 表示求前 100 个元素的平均值,与 MySQL 语义不同;
  • COUNT 函数后只能用 * ,即 count(*) ,不允许 count(_1)形式;
  • SQL 语句 FROM 之后的 json path 指定的 JSON 节点数据最长为 512K,最大深度为 10 层;
  • [*] 数组通配符只能出现在 SELECT Json 文件时,而且 select 或者 where 后边的表达式中不能有 [*] 数组通配符,数组通配符只能出现在 from 后的 json path 中;
  • SELECT CSV 文件时,from 关键词之后只能是 BosObject;
  • WHERE 语句里不能包含聚合条件,只允许使用逻辑操作符;
  • LIKE 语句中,支持最多 5 个 % 通配符,表示 0 或多个任意字符, _表示单个字符;IN 语句中最多支持1024个常量项;
  • Select 后的 fields 可以是列名,CSV 列索引(_1, _2等),或者是聚合函数,例如 AVG(CAST _1 as int),但是不能是单独的CAST _1 as int;field不支持二元表达式;
  • select 后边如果有一个 field 是 * ,那就不再允许有其他 field,例如 select *, _1 from s 这种是不合法的;select 的 field 中聚合函数和单独列名不可单独出现;select 的 field 中所有 alias 别名必须都不一样;
  • 如果 json SQL 中存在 key[*]/key[1] 这种形式的field或者source,我们会认为这个field是表示select一个数组元素,键是 key;但是如果 SQL field/source 包括 key[a] 这种形式,会被解析成键为 key[a] ,去获取json中对应的value;
  • Json 文件和 SQL 中 Key 的匹配是大小写敏感的,比如 select s.Age 和select s.age是不同的;
  • BETWEEN AND IN 关键词用于集合和范围匹配操作时,需要确保 value 属于同一数据类型。
  • Select 数据容错机制

    (一) 处理缺失数据

  • csv 文件的某列数据缺失时,如果该列用于 WHERE 之后做条件判断,可以直接认为条件不满足,跳过该行数据;但是如果该缺失列被用于 SELECT 之后做聚合操作时,例如 avg(cast _1 as int),我们认为聚合一个不存在的列是非法的,应该直接结束并返回相应错误信息;
  • json 文件的某个key缺失时,同上;
  • csv 文件的列数据缺失或者 json 文件的某个 key 缺失,默认都当做 NULL 处理,也就是可以通过 IS NULL 判断为 true。
  • 其它情况:
  • 当 json key 或者 csv 列在 WHERE 后边表达式中使用的话,例如 …… where _1 = '' , …… where a.b = 'value' ;如果缺失的话,我们都默认当做NULL值处理
  • 当 json key 或者 csv 列直接作为 field 出现在 select 之后时,例如 select _1 from…… , select a.b from…… ;如果缺失的话,csv 列应该默认返回空字符串,json key 也是返回空字符串
  • (二) 处理类型不匹配数据

  • csv 文件的列数据类型非法,例如 CAST _1 as INT ,然而 _1 值为非数值字符串导致 cast 失败,如果 CAST _1 as INT 用于WHERE后边做条件判断,可以直接认为条件不满足,跳过该行数据;但是如果该列被用于 SELECT 之后做聚合操作时,例如 avg(cast _1 as int) ,我们认为聚合一个不合法列是非法的,应该直接结束并返回相应错误信息;
  • json 文件的某个 key 对应数据类型非法时,同上
  • SelectObject 为 CPU 消耗性请求,扫描数据量以 8MB 为单位计算,计费标准详见 产品定价-数据处理费用-Select扫描费用
  • 单个 bucket 限制 SelectObject 请求 QPS 不超过 500。
  • SQL 语句或者文件内容中包含特殊字符串的情况,请使用\转义符进行转义。例如 select "key }\"[" from BosObject.array[*] 表示选取 json 数组 array 中对应路径为 key }"[ 的值。
  • 如果涉及到浮点数数值计算或者金钱交易等对数据准确性要求非常高的情况,优先使用 decimal 数据类型,decimal 支持的操作包括算术表达式(+-*/), 比较运算符(>,=, <, >=, <=, !=),匹配运算符(BETWEEN+AND,IN);同时允许其它数值类型和 decimal 进行运算等操作,默认会将其他类型(例如int,float)提升成 decimal,以便得到准确的结果。
  • CSV 文件中数据都默认 string 类型,Json 不内置 decimal 数据类型;因此如果希望将 csv 的某一列或者 json 的某个 value 作为 decimal 来处理,都必须使用 CAST 函数,例如 cast(_1 as decimal) cast(1.23 as decimal) cast(key as decimal)
  • CSV Object

    select 一个 csv object 一般是通过列序号或者列名来选取指定列数据,或者对某些列做聚合操作,例如一个 test.csv 文件内容如下,包含了各种数据类型的列, csv 默认每一列数据都是字符串 ,需要做对应的 CAST 转换操作,列分隔符之间不要留空格:

    select header1,header2 from BosObject 返回 object 中名称为 header1,header2 的列 fileHeaderInfo 参数需要是"USE" select _1,_3 from BosObject where cast(_1 as int) <= cast(_3 as int) 返回 object 第 1 列和第 3 列的整数,满足第 1 列小于或等于第 3 列 需要 _1,_3 表示的列是整型才能被 CAST 转换,否则会因为不满足条件而跳过 select count(*) from BosObject 返回 object 总行数 select AVG(cast(_1 AS int)), MAX(cast(_1 AS int)), MIN(cast(_1 AS int)) from BosObject 返回 object 中第一列的平均值,最大值,最小值 每一行的第一列都不能包含非整型字符串,否则会直接失败 select SUM(cast(header1 AS float)) from BosObject WHERE cast(header1 AS float) != 1 返回 object 中所有列名为 header1 且值不等于 1 的和 每一行的 header1 列都不能包含非数值型字符串 select * from BosObject where _1 LIKE '%果_' 返回 object 中_1列形式满足"%果_"的行,例如"苹果树"、"果蔬"满足条件,"苹果"不满足条件 LIKE 操作符后字符串使用单引号 select * from BosObject where cast(_1 AS int) % 3 = 0 返回 object 中 _1 列能被 3 整除的所有行 _1 需要是整形字符串,才能使用 % 操作符 select * from BosObject where cast(_1 AS int) between 1 and 2 返回 object 中 _1 列处于 [1,2] 区间的所有行 _1 需要是整形字符串 select * from BosObject where cast(_1 AS timestamp) NOT IN (cast('2006-01-02 15:04:06' as timestamp), cast('2006-01-03 15:04:06' as timestamp)) 返回 object 中 _1 列不在 IN 区间的所有行 _1 需要是日期字符串形式 select * from BosObject where cast(_1 AS int) * cast(_2 AS int) > cast(_3 AS float) + 1 返回 object 中 _1 列形式满足条件表达式计算结果的所有行 _1,_2,_3 需要是满足 CAST 条件的合法字符串形式 SELECT _1,_2 FROM BosObject WHERE cast(_2 as decimal) IN (cast('5.1824349494011866916128' as decimal),cast('5.00000000000001000000000' as decimal)) 返回 object 中 _2 列处于[5.1824349494011866916128,5.00000000000001000000000]区间的所有行的第一列和第二列数据 _2 列被当做 decimal 数据类型进行比较 SELECT MAX(CAST(_3 AS DECIMAL)) FROM BosObject WHERE CAST(_3 AS DECIMAL) >= cast('559062242.92723' as float)) 返回 object 中 _3 列数据大于等于 559062242.92723 的最大值 _3 列被当做 decimal 数据类型进行比较

    JSON Object

    select 一个 json object 一般是通过 key 来选取对应的数据,json 文件包括 LINES 和 DOCUMENT 两种,json 文件内容需满足 官方标准

    JSON DOCUMENT Object

    { "project_name" : "project1" , "completed" : false } , { "project_name" : "project2" , "completed" : true }

    JSON LINES Object

    { "project_name" : "project1" , "completed" : false } , { "project_name" : "project2" , "completed" : true } { "name" : "charles" , "age" : 17 , "org" : "baidu" , "weight" : 65.5 , "projects" : { "project_name" : "project3" , "completed" : false } , { "project_name" : "project4" , "completed" : true }

    常用 SQL 语句

  • Json path 基础形式 field0.field1[n].property1.attributes[*] 表示查找 JSON 文件根节点下的 field 0 节点下 field 1 节点中数组的第 n 个元素,再查找该元素的 property1 中 attributes 数组的全部内容。
  • JSON object SQL 同样可以使用聚合函数,逻辑运算,数学运算等;JSON 中 value 自带数据类型,不需要 CAST 转换,除非需要强制解析成 decimal 或者其它类型。
  • select * from BosObject s where s.org IS NULL AND weight is null 返回 json 文件中满足 name 和 weight 都为空的记录 weight 节点不存在也视为 null

    Parquet Object

    select 一个 Parquet object 一般是通过 key 来选取对应的数据,Parquet是大数据领域流行的一种自解释的列式存储格式,其select使用可以参考json object方式。

    错误返回码

  • server 端返回的错误码可能以 http status code 的形式返回,也可能在 End Message 中的 error-code 返回,以何种形式返回ErrorCode 取决于发生具体的错误类型
  • public
    
    
    
    
        
     void selectCsv(BosClient client, String bucketName, String csvObject) {
        System.out.println("------ select csv object ------");
        SelectObjectRequest request = new SelectObjectRequest(bucketName, csvObject)
            .withSelectType(Constants.SELECT_TYPE_CSV)
            .withExpression("select * from BosObject limit 3")
            .withExpressionType(SelectObjectRequest.ExpressionType.SQL)
            .withInputSerialization(new InputSerialization()
                    .withCompressionType("NONE")
                    .withFileHeaderInfo("NONE")
                    .withRecordDelimiter("\r\n")
                    .withFieldDelimiter(",")
                    .withQuoteCharacter("\"")
                    .withCommentCharacter("#"))
            .withOutputSerialization(new OutputSerialization()
                    .withOutputHeader(false)
                    .withQuoteFields("ALWAYS")
                    .withRecordDelimiter("\n")
                    .withFieldDelimiter(",")
                    .withQuoteCharacter("\""))
            .withRequestProgress(false);
        SelectObjectResponse response = client.selectObject(request);
        //输出查询结果
        printRecords(reponse.getMessages()); 
    public void selectJson(BosClient client, String bucketName, String jsonObject) {
        System.out.println("------ select json object ------");
        SelectObjectRequest request = new SelectObjectRequest(bucketName, jsonkey)
            .withSelectType(Constants.SELECT_TYPE_JSON)
            .withExpression("select * from BosObject where age > 20")
            .withInputSerialization(new InputSerialization()
                    .withCompressionType("NONE")
                    .withJsonType("LINES"))
            .withOutputSerialization(new OutputSerialization()
                    .withRecordDelimiter("\n"))
            .withRequestProgress(false);
        SelectObjectResponse response = client.selectObject(request);
        //输出查询结果
        printRecords(reponse.getMessages()); 
    public void selectParquet(BosClient client, String bucketName, String parquetObject) {
        System.out.println("------ select parquet object ------");
        SelectObjectRequest request = new SelectObjectRequest(bucketName, parquetObject)
            .withSelectType(Constants.SELECT_TYPE_PARQUET)
            .withExpression("select * from BosObject where age > 20")
            .withInputSerialization(new InputSerialization()
                    .withCompressionType("NONE"))
            .withOutputSerialization(new OutputSerialization()
                    .withRecordDelimiter("\n"))
            .withRequestProgress(false);
        SelectObjectResponse response = client.selectObject(request);
        //输出查询结果
        printRecords(reponse.getMessages());
    public void printRecords(SelectObjectResponse.Messages messages) {
        if (messages == null) {
            return;
        while (messages.hasNext()) {
            SelectObjectResponse.CommonMessage message = messages.next();
            if (message.Type.equals("Records")) {
                for (String record: message.getRecords()) {
                    System.out.println(record);
    

    Golang示例

    import ( "github.com/baidubce/bce-sdk-go/services/bos" "github.com/baidubce/bce-sdk-go/services/bos/api" func main() { selectBosObject() func selectBosObject() { // 初始化BosClient AK, SK := "ak", "sk" ENDPOINT := "bj.bcebos.com" bosClient, _ := bos.NewClient(AK, SK, ENDPOINT) // 先确保bucket,object已经存在,object满足csv/json文件格式要求 bucket := "select-bucket" csvObject := "test.csv" fmt.Println("------ select csv object -------") csvArgs := &api.SelectObjectArgs{ SelectType: "csv", SelectRequest: &api.SelectObjectRequest{ Expression: "c2VsZWN0ICogZnJvbSBCb3NPYmplY3Qgd2hlcmUgY2FzdChfMSBBUyBpbnQpICogY2FzdChfMiBBUyBpbnQpID4gY2FzdChfMyBBUyBmbG9hdCkgKyAx", ExpressionType: "SQL", InputSerialization: &api.SelectObjectInput{ CompressionType: "NONE", CsvParams: map[string]string{ "fileHeaderInfo": "IGNORE", "recordDelimiter": "Cg==", "fieldDelimiter": "LA==", "quoteCharacter": "Ig==", "commentCharacter": "Iw==", OutputSerialization: &api.SelectObjectOutput{ OutputHeader: false, CsvParams: map[string]string{ "quoteFields": "ALWAYS", "recordDelimiter": "Cg==", "fieldDelimiter": "LA==", "quoteCharacter": "Ig==", RequestProgress: &api.SelectObjectProgress{ Enabled: true, csvRes, err := bosClient.SelectObject(bucket, csvObject, csvArgs) if err != nil { fmt.Println(err) return parseMessages(csvRes) fmt.Println("------ select json object -------") jsonObject := "test.json" jsonArgs := &api.SelectObjectArgs{ SelectType: "json", SelectRequest: &api.SelectObjectRequest{ Expression: "c2VsZWN0ICogZnJvbSBCb3NPYmplY3QucHJvamVjdHNbKl0ucHJvamVjdF9uYW1l", ExpressionType: "SQL", InputSerialization: &api.SelectObjectInput{ CompressionType: "NONE", JsonParams: map[string]string{ "type": "LINES", OutputSerialization: &api.SelectObjectOutput{ JsonParams: map[string]string{ "recordDelimiter": "Cg==", RequestProgress: &api.SelectObjectProgress{ Enabled: true, jsonRes, err := bosClient.SelectObject(bucket, jsonObject, jsonArgs) if err != nil { fmt.Println(err) return parseMessages(jsonRes) // 解析所有headers保存到map中 func parseHeaders(headers []byte) map[string]string { hm := make(map[string]string) index := 0 for index < len(headers) { // headers key length keyLen := int(headers[index]) index += 1 // headers key key := headers[index : index+keyLen] index += keyLen // headers value length valLenByte := headers[index : index+2] valLen := int(binary.BigEndian.Uint16(valLenByte)) index += 2 // headers value val := headers[index : index+valLen] index += valLen hm[string(key)] = string(val) return hm func parseMessages(res *api.SelectObjectResult) { defer res.Body.Close() reader := bufio.NewReader(res.Body) for { // total length in prelude, 4 bytes p := make([]byte, 4) l, err := io.ReadFull(reader, p) if err != nil || l < 4 { fmt.Printf("read total length err: %+v, len: %d\n", err, l) break totalLen := binary.BigEndian.Uint32(p) // headers length in prelude, 4 bytes l, err = io.ReadFull(reader, p) if err != nil || l < 4 { fmt.Printf("read headers length err: %+v, len: %d\n", err, l) break headersLen := binary.BigEndian.Uint32(p) // headers part headers := make([]byte, headersLen) l, err = io.ReadFull(reader, headers) if err != nil || uint32(l) < headersLen { fmt.Printf("read headers data err: %+v, len: %d\n", err, l) break // 获取header长度,并解析headers内容,判断具体的msg类型;end msg则结束读取, // cont msg则调用回调函数输出进度信息,record msg则输出记录信息 headersMap := parseHeaders(headers) if headersMap["message-type"] == "Records" { // payload part payloadLen := totalLen - headersLen - 12 payload := make([]byte, payloadLen) if _, err := io.ReadFull(reader, payload); err != nil { fmt.Printf("read payload data err: %+v\n", err) // 设置你使用的OutputSerialization字段中的换行符做分行处理 rs := strings.Split(string(payload), "\n") _, err = io.ReadFull(reader, p) crc := binary.BigEndian.Uint32(p) recordsMsg := &api.RecordsMessage{ CommonMessage: api.CommonMessage{ Prelude: api.Prelude{ TotalLen: totalLen, HeadersLen: headersLen, Headers: headersMap, Crc32: crc, Records: rs, fmt.Printf("RecordsMessage: %+v\n", recordsMsg) continue if headersMap["message-type"] == "Cont" { // payload part, progress bs := make([]byte, 8) _, err = io.ReadFull(reader, bs) bytesScanned := binary.BigEndian.Uint64(bs) br := make([]byte, 8) _, err = io.ReadFull(reader, br) bytesReturned := binary.BigEndian.Uint64(br) _, err = io.ReadFull(reader, p) crc := binary.BigEndian.Uint32(p) contMsg := &api.ContinuationMessage{ CommonMessage: api.CommonMessage{ Prelude: api.Prelude{ TotalLen: totalLen, HeadersLen: headersLen, Headers: headersMap, Crc32: crc, BytesScanned: bytesScanned, BytesReturned: bytesReturned, fmt.Printf("ContinuationMessage: %+v\n", contMsg) continue if headersMap["message-type"] == "End" { _, err = io.ReadFull(reader, p) crc := binary.BigEndian.Uint32(p) endMsg := &api.EndMessage{ CommonMessage: api.CommonMessage{ Prelude: api.Prelude{ TotalLen: totalLen, HeadersLen: headersLen, Headers: headersMap, Crc32: crc, fmt.Printf("EndMessage: %+v\n", endMsg) break