添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement . We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql中是一张表的数据,现在是想把这张表的数据组装成对象的集合放在Es中,类似这样的数据结构
{ "_index" : "trade_flink", "_type" : "_doc", "_id" : "T210329133820491", "_score" : 1.0, "_source" : { "virtual" : 0, "shop" : 110, "id" : "T210329133820491", "tradeItems" : [ { "productSale" : "10353454", "purchaseQuantity" : 20, "id" : 128679552, }, { "productSale" : "10352384", "purchaseQuantity" : 30, "id" : 128679553, } ], "status" : "DELIVERING", "customer" : 1702627547 } }

1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定 @OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));

1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定 @OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));

thanks 这个问题已经解决了

1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定 @OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));

thanks 这个问题已经解决了

老哥,请问你那边是怎么解决的,上面的老哥写的没有看懂

1.将es表中tradeItems 类型为ARRAY<ROW< productSale STRING, purchaseQuantity INT, id INT>>
2.定义一个udf es 传入groupby后的拼接group_concat es(COLLECT( DISTINCT CONCAT(productSale,':',purchaseQuantity,':'id)))
3.udf 中构建出org.apache.flink.types.Row 的数组 每个row是嵌套中的每个元素 ArrayList rows = new ArrayList<>();rows.toArray(new Row[0])
4. udf输出的格式指定 @OverRide
public TypeInformation getResultType(Class[] signature) {
return Types.OBJECT_ARRAY(Types.ROW_NAMED("productSale,purchaseQuantity,id".split(","), Types.STRING, Types.INT,Types.INT));

老哥,代码块可以发给我看下不?你这个步骤没看懂

对应订单es模型(orderitems使用nested object),如何把 orderitem 构造成 ARRAY<ROW< id INT, orderid INT, productid INT, price INT, >>写入elasticsearch #1484