You signed in with another tab or window.
Reload
to refresh your session.
You signed out in another tab or window.
Reload
to refresh your session.
You switched accounts on another tab or window.
Reload
to refresh your session.
By clicking “Sign up for GitHub”, you agree to our
terms of service
and
privacy statement
. We’ll occasionally send you account related emails.
Already on GitHub?
Sign in
to your account
mysql中是一张表的数据,现在是想把这张表的数据组装成对象的集合放在Es中,类似这样的数据结构
{ "_index" : "trade_flink", "_type" : "_doc", "_id" : "T210329133820491", "_score" : 1.0, "_source" : { "virtual" : 0, "shop" : 110, "id" : "T210329133820491", "tradeItems" : [ { "productSale" : "10353454", "purchaseQuantity" : 20, "id" : 128679552, }, { "productSale" : "10352384", "purchaseQuantity" : 30, "id" : 128679553, } ], "status" : "DELIVERING", "customer" : 1702627547 } }
1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定
@OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));
1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定
@OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));
thanks 这个问题已经解决了
1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定
@OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));
thanks 这个问题已经解决了
老哥,请问你那边是怎么解决的,上面的老哥写的没有看懂
1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定
@OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));
老哥,代码块可以发给我看下不?你这个步骤没看懂
对应订单es模型(orderitems使用nested object),如何把 orderitem 构造成 ARRAY<ROW< id INT, orderid INT, productid INT, price INT, >>写入elasticsearch
#1484