目前用户想对存储在 BOS 的 object 内容进行筛选过滤,只能先通过 GetObject 接口下载单个 object ,然后再在本地对数据分析过滤;而 SelectObject 接口将把筛选过滤工作集成到 BOS 服务层,从而减少客户从 BOS 下载数据的网络带宽和延迟,同时也能节省客户筛选数据消耗的 CPU 和内存等资源,从而降低客户访问BOS中数据所需的应用程序成本。
SelectObject 典型的应用场景是和大数据产品结合使用,替换原来处理 BOS 数据的 GetObject 接口,用于提取日志文件指定内容,数据分析筛选等工作。
select 一个 csv object 一般是通过列序号或者列名来选取指定列数据,或者对某些列做聚合操作,例如一个 test.csv 文件内容如下,包含了各种数据类型的列,
csv 默认每一列数据都是字符串
,需要做对应的 CAST 转换操作,列分隔符之间不要留空格:
select header1,header2 from BosObject
返回 object 中名称为 header1,header2 的列
fileHeaderInfo 参数需要是"USE"
select _1,_3 from BosObject where cast(_1 as int) <= cast(_3 as int)
返回 object 第 1 列和第 3 列的整数,满足第 1 列小于或等于第 3 列
需要 _1,_3 表示的列是整型才能被 CAST 转换,否则会因为不满足条件而跳过
select count(*) from BosObject
返回 object 总行数
select AVG(cast(_1 AS int)), MAX(cast(_1 AS int)), MIN(cast(_1 AS int)) from BosObject
返回 object 中第一列的平均值,最大值,最小值
每一行的第一列都不能包含非整型字符串,否则会直接失败
select SUM(cast(header1 AS float)) from BosObject WHERE cast(header1 AS float) != 1
返回 object 中所有列名为 header1 且值不等于 1 的和
每一行的 header1 列都不能包含非数值型字符串
select * from BosObject where _1 LIKE '%果_'
返回 object 中_1列形式满足"%果_"的行,例如"苹果树"、"果蔬"满足条件,"苹果"不满足条件
LIKE 操作符后字符串使用单引号
select * from BosObject where cast(_1 AS int) % 3 = 0
返回 object 中 _1 列能被 3 整除的所有行
_1 需要是整形字符串,才能使用 % 操作符
select * from BosObject where cast(_1 AS int) between 1 and 2
返回 object 中 _1 列处于 [1,2] 区间的所有行
_1 需要是整形字符串
select * from BosObject where cast(_1 AS timestamp) NOT IN (cast('2006-01-02 15:04:06' as timestamp), cast('2006-01-03 15:04:06' as timestamp))
返回 object 中 _1 列不在 IN 区间的所有行
_1 需要是日期字符串形式
select * from BosObject where cast(_1 AS int) * cast(_2 AS int) > cast(_3 AS float) + 1
返回 object 中 _1 列形式满足条件表达式计算结果的所有行
_1,_2,_3 需要是满足 CAST 条件的合法字符串形式
SELECT _1,_2 FROM BosObject WHERE cast(_2 as decimal) IN (cast('5.1824349494011866916128' as decimal),cast('5.00000000000001000000000' as decimal))
返回 object 中 _2 列处于[5.1824349494011866916128,5.00000000000001000000000]区间的所有行的第一列和第二列数据
_2 列被当做 decimal 数据类型进行比较
SELECT MAX(CAST(_3 AS DECIMAL)) FROM BosObject WHERE CAST(_3 AS DECIMAL) >= cast('559062242.92723' as float))
返回 object 中 _3 列数据大于等于 559062242.92723 的最大值
_3 列被当做 decimal 数据类型进行比较
select 一个 json object 一般是通过 key 来选取对应的数据,json 文件包括 LINES 和 DOCUMENT 两种,json 文件内容需满足
官方标准
。
select * from BosObject s where s.org IS NULL AND weight is null
返回 json 文件中满足 name 和 weight 都为空的记录
weight 节点不存在也视为 null
select 一个 Parquet object 一般是通过 key 来选取对应的数据,Parquet是大数据领域流行的一种自解释的列式存储格式,其select使用可以参考json object方式。
public
void selectCsv ( BosClient client, String bucketName, String csvObject) {
System . out. println ( "------ select csv object ------" ) ;
SelectObjectRequest request = new SelectObjectRequest ( bucketName, csvObject)
. withSelectType ( Constants . SELECT_TYPE_CSV)
. withExpression ( "select * from BosObject limit 3" )
. withExpressionType ( SelectObjectRequest . ExpressionType . SQL)
. withInputSerialization ( new InputSerialization ( )
. withCompressionType ( "NONE" )
. withFileHeaderInfo ( "NONE" )
. withRecordDelimiter ( "\r\n" )
. withFieldDelimiter ( "," )
. withQuoteCharacter ( "\"" )
. withCommentCharacter ( "#" ) )
. withOutputSerialization ( new OutputSerialization ( )
. withOutputHeader ( false )
. withQuoteFields ( "ALWAYS" )
. withRecordDelimiter ( "\n" )
. withFieldDelimiter ( "," )
. withQuoteCharacter ( "\"" ) )
. withRequestProgress ( false ) ;
SelectObjectResponse response = client. selectObject ( request) ;
printRecords ( reponse. getMessages ( ) ) ;
public void selectJson ( BosClient client, String bucketName, String jsonObject) {
System . out. println ( "------ select json object ------" ) ;
SelectObjectRequest request = new SelectObjectRequest ( bucketName, jsonkey)
. withSelectType ( Constants . SELECT_TYPE_JSON)
. withExpression ( "select * from BosObject where age > 20" )
. withInputSerialization ( new InputSerialization ( )
. withCompressionType ( "NONE" )
. withJsonType ( "LINES" ) )
. withOutputSerialization ( new OutputSerialization ( )
. withRecordDelimiter ( "\n" ) )
. withRequestProgress ( false ) ;
SelectObjectResponse response = client. selectObject ( request) ;
printRecords ( reponse. getMessages ( ) ) ;
public void selectParquet ( BosClient client, String bucketName, String parquetObject) {
System . out. println ( "------ select parquet object ------" ) ;
SelectObjectRequest request = new SelectObjectRequest ( bucketName, parquetObject)
. withSelectType ( Constants . SELECT_TYPE_PARQUET)
. withExpression ( "select * from BosObject where age > 20" )
. withInputSerialization ( new InputSerialization ( )
. withCompressionType ( "NONE" ) )
. withOutputSerialization ( new OutputSerialization ( )
. withRecordDelimiter ( "\n" ) )
. withRequestProgress ( false ) ;
SelectObjectResponse response = client. selectObject ( request) ;
printRecords ( reponse. getMessages ( ) ) ;
public void printRecords ( SelectObjectResponse . Messages messages) {
if ( messages == null ) {
return ;
while ( messages. hasNext ( ) ) {
SelectObjectResponse . CommonMessage message = messages. next ( ) ;
if ( message. Type . equals ( "Records" ) ) {
for ( String record: message. getRecords ( ) ) {
System . out. println ( record) ;
Golang示例
import (
"github.com/baidubce/bce-sdk-go/services/bos"
"github.com/baidubce/bce-sdk-go/services/bos/api"
func main ( ) {
selectBosObject ( )
func selectBosObject ( ) {
AK, SK := "ak" , "sk"
ENDPOINT := "bj.bcebos.com"
bosClient, _ := bos. NewClient ( AK, SK, ENDPOINT)
bucket := "select-bucket"
csvObject := "test.csv"
fmt. Println ( "------ select csv object -------" )
csvArgs := & api. SelectObjectArgs{
SelectType: "csv" ,
SelectRequest: & api. SelectObjectRequest{
Expression: "c2VsZWN0ICogZnJvbSBCb3NPYmplY3Qgd2hlcmUgY2FzdChfMSBBUyBpbnQpICogY2FzdChfMiBBUyBpbnQpID4gY2FzdChfMyBBUyBmbG9hdCkgKyAx" ,
ExpressionType: "SQL" ,
InputSerialization: & api. SelectObjectInput{
CompressionType: "NONE" ,
CsvParams: map [ string ] string {
"fileHeaderInfo" : "IGNORE" ,
"recordDelimiter" : "Cg==" ,
"fieldDelimiter" : "LA==" ,
"quoteCharacter" : "Ig==" ,
"commentCharacter" : "Iw==" ,
OutputSerialization: & api. SelectObjectOutput{
OutputHeader: false ,
CsvParams: map [ string ] string {
"quoteFields" : "ALWAYS" ,
"recordDelimiter" : "Cg==" ,
"fieldDelimiter" : "LA==" ,
"quoteCharacter" : "Ig==" ,
RequestProgress: & api. SelectObjectProgress{
Enabled: true ,
csvRes, err := bosClient. SelectObject ( bucket, csvObject, csvArgs)
if err != nil {
fmt. Println ( err)
return
parseMessages ( csvRes)
fmt. Println ( "------ select json object -------" )
jsonObject := "test.json"
jsonArgs := & api. SelectObjectArgs{
SelectType: "json" ,
SelectRequest: & api. SelectObjectRequest{
Expression: "c2VsZWN0ICogZnJvbSBCb3NPYmplY3QucHJvamVjdHNbKl0ucHJvamVjdF9uYW1l" ,
ExpressionType: "SQL" ,
InputSerialization: & api. SelectObjectInput{
CompressionType: "NONE" ,
JsonParams: map [ string ] string {
"type" : "LINES" ,
OutputSerialization: & api. SelectObjectOutput{
JsonParams: map [ string ] string {
"recordDelimiter" : "Cg==" ,
RequestProgress: & api. SelectObjectProgress{
Enabled: true ,
jsonRes, err := bosClient. SelectObject ( bucket, jsonObject, jsonArgs)
if err != nil {
fmt. Println ( err)
return
parseMessages ( jsonRes)
func parseHeaders ( headers [ ] byte ) map [ string ] string {
hm := make ( map [ string ] string )
index := 0
for index < len ( headers) {
keyLen := int ( headers[ index] )
index += 1
key := headers[ index : index+ keyLen]
index += keyLen
valLenByte := headers[ index : index+ 2 ]
valLen := int ( binary. BigEndian. Uint16 ( valLenByte) )
index += 2
val := headers[ index : index+ valLen]
index += valLen
hm[ string ( key) ] = string ( val)
return hm
func parseMessages ( res * api. SelectObjectResult) {
defer res. Body. Close ( )
reader := bufio. NewReader ( res. Body)
for {
p := make ( [ ] byte , 4 )
l, err := io. ReadFull ( reader, p)
if err != nil || l < 4 {
fmt. Printf ( "read total length err: %+v, len: %d\n" , err, l)
break
totalLen := binary. BigEndian. Uint32 ( p)
l, err = io. ReadFull ( reader, p)
if err != nil || l < 4 {
fmt. Printf ( "read headers length err: %+v, len: %d\n" , err, l)
break
headersLen := binary. BigEndian. Uint32 ( p)
headers := make ( [ ] byte , headersLen)
l, err = io. ReadFull ( reader, headers)
if err != nil || uint32 ( l) < headersLen {
fmt. Printf ( "read headers data err: %+v, len: %d\n" , err, l)
break
headersMap := parseHeaders ( headers)
if headersMap[ "message-type" ] == "Records" {
payloadLen := totalLen - headersLen - 12
payload := make ( [ ] byte , payloadLen)
if _ , err := io. ReadFull ( reader, payload) ; err != nil {
fmt. Printf ( "read payload data err: %+v\n" , err)
rs := strings. Split ( string ( payload) , "\n" )
_ , err = io. ReadFull ( reader, p)
crc := binary. BigEndian. Uint32 ( p)
recordsMsg := & api. RecordsMessage{
CommonMessage: api. CommonMessage{
Prelude: api. Prelude{
TotalLen: totalLen,
HeadersLen: headersLen,
Headers: headersMap,
Crc32: crc,
Records: rs,
fmt. Printf ( "RecordsMessage: %+v\n" , recordsMsg)
continue
if headersMap[ "message-type" ] == "Cont" {
bs := make ( [ ] byte , 8 )
_ , err = io. ReadFull ( reader, bs)
bytesScanned := binary. BigEndian. Uint64 ( bs)
br := make ( [ ] byte , 8 )
_ , err = io. ReadFull ( reader, br)
bytesReturned := binary. BigEndian. Uint64 ( br)
_ , err = io. ReadFull ( reader, p)
crc := binary. BigEndian. Uint32 ( p)
contMsg := & api. ContinuationMessage{
CommonMessage: api. CommonMessage{
Prelude: api. Prelude{
TotalLen: totalLen,
HeadersLen: headersLen,
Headers: headersMap,
Crc32: crc,
BytesScanned: bytesScanned,
BytesReturned: bytesReturned,
fmt. Printf ( "ContinuationMessage: %+v\n" , contMsg)
continue
if headersMap[ "message-type" ] == "End" {
_ , err = io. ReadFull ( reader, p)
crc := binary. BigEndian. Uint32 ( p)
endMsg := & api. EndMessage{
CommonMessage: api. CommonMessage{
Prelude: api. Prelude{
TotalLen: totalLen,
HeadersLen: headersLen,
Headers: headersMap,
Crc32: crc,
fmt. Printf ( "EndMessage: %+v\n" , endMsg)
break