添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

查询计划

句法优化器

初始查询计划

查询最直接的查询计划非常接近其 SQL 语法结构,查询计划是树状的,执行从叶子节点开始,沿着树结构逐步上升。

1
2
3
4
5
6
7
8
9
10
11
12
13
- Limit[5]
- Sort[orders_sum DESC]
- LateralJoin[2]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey]
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
- EnforceSingleRow[region_name := r.name]
- Filter[r.regionkey = n.regionkey]
- TableScan[region]

TableScan 算子从底层存储中访问表,并返回包含表中所有行的结果集;
Filter 算子接收行并在每一行数据上应用过滤条件,只留下满足条件的行;
CrossJoin 算子从两个子节点接收数据集,返回两个数据集中行的所有组合,它可能会将其中一个数据集存放在内存中,从而避免多次访问底层存储;

谓词下推

将过滤条件移动到尽可能接近数据源的位置,使数据量在查询开始后尽可能早地开始缩减。
案例中,将原 Filter 算子的一部分条件保留在新的简化 Filter 算子中,另一部分和下层的 CrossJoin 算子合并为新的 InnerJoin 算子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // 原始 Filter 算子
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]

...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // 简化后的 Filter 算子
- InnerJoin[o.custkey = c.custkey] // 合并后新产生的 InnerJoin 算子
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
```

#### Cross Join 消除

```txt
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // 先过滤 nationkey 列
- InnerJoin[o.custkey = c.custkey] // 然后是 Inner Join custkey
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey] // 重新排列为 custkey 在前面
- InnerJoin[n.nationkey = c.nationkey] // nationkey 在后面
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...

局部聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- Aggregate[by custkey; totalprice := sum(totalprice)] // 局部预聚合,聚合用户维度的订单金额
- TableScan[orders]
...

Lateral Join 去关联化

Lateral Join 可以通过如下方式实现:使用 for 循环迭代一个数据集中的所有行,并对每一行执行另一次查询。

Presto 将子查询去关联化,将所有的相关条件拉取上来并形成一个标准的 Left Join。

1
2
3
4
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey)
AS region_name
FROM nation n
1
2
3
4
SELECT
r.name AS region_name,
n.name AS nation_name
FROM nation n LEFT OUTER JOIN region r ON r.regionkey = n.regionkey

平时写 odps sql ,也是习惯用下面这种标准的 left join ,如曝光表 left join 点击表 left join 行为事件表等等。

Semi-join (IN) 去关联化

子查询不只用于在查询中拉取信息,也常用于配合 IN 谓词过滤行。

下面这个查询,用于找出客户和物品供应商来自同一个国家(地区)的订单,查询这样的订单非常有用,这样可以绕过分发中心直接从供应商发货到消费者,
以此来降低运送成本:

1
2
3
4
5
6
7
8
9
10
11
12
SELECT DISTINCT o.orderkey
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer ON o.custkey = c.custkey
WHERE c.nationkey IN (
-- 多次调用的子查询
SELECT s.nationkey
FROM part p
JOIN partsupp ps ON p.partkey = ps.partkey
JOIN supplier s ON ps.suppkey = s.suppkey
WHERE p.partkey = l.partkey
);

基于代价的优化器 (CBO)

Cost Based Optimizer ,计划转换时不但基于本身的形状,也将查询数据的形状考虑在內:

  • CPU 时间
  • 基于代价优化的查询案例:

    1
    2
    3
    4
    5
    6
    7
    8
    SELECT
    n.name AS nation_name,
    avg(extendedprice) AS avg_price
    FROM nation n, orders o, customer c, lineitem l
    WHERE n.nationkey = c.nationkey
    AND c.custkey = o.custkey
    AND o.orderkey = l.orderkey
    GROUP BY n.nationkey, n.name;

    如果不基于代价进行决策,查询优化器就会使用规则来优化此查询的初始计划。这个计划完全由 SQL 查询的语法结构所决定:

    1
    2
    3
    4
    5
    6
    7
    8
    - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
    - InnerJoin[o.orderkey = l.orderkey]
    - InnerJoin[c.custkey = o.custkey]
    - InnerJoin[n.nationkey = c.nationkey]
    - TableScan[nation]
    - TableScan[customer]
    - TableScan[orders]
    - TableScan[lineitem]

    下面变换一个 SQL ,仅仅改变一下 WHERE 语句中条件的顺序:

    1
    2
    3
    4
    5
    6
    7
    8
    SELECT
    n.name AS nation_name,
    avg(extendedprice) AS avg_price
    FROM nation n, orders o, customer c, lineitem l
    WHERE c.custkey = o.custkey
    AND o.orderkey = l.orderkey
    AND n.nationkey = c.nationkey
    GROUP BY n.nationkey, n.name;

    就产生了一个具有不同 Join 顺序的查询计划:

    1
    2
    3
    4
    5
    6
    7
    8
    - Aggregate[by nationkey...; orders_sum := sum(totalprice)]
    - InnerJoin[n.nationkey = c.nationkey]
    - InnerJoin[o.orderkey = l.orderkey]
    - InnerJoin[c.custkey = o.custkey]
    - TableScan[customer]
    - TableScan[orders]
    - TableScan[lineitem]
    - TableScan[nation]

    从时间复杂度的角度来看,无论是将 nation 表 Join 到 customer 表,还是反过来将 customer 表 Join 到 nation 表,都无关紧要,
    两个表都要处理,在使用 Hash Join 时,总的运行时间与输出行数成正比。
    然而,时间复杂度并不是唯一的考量标准,通常对于处理数据的程序,尤其是大规模数据库系统来说,内存使用和网络流量也很重要。

    连接器

    blackhole Connector

    它作为任何数据的最终消费者,类似于 UNIX 操作系统中的 null 设备(/dev/null)。可以把它作为从其他 catalog 中读取并插入数据的目标,实际上不写入任何内容,所以可以用它来衡量 catalog
    读取的性能。

    /etc/catalog/blackhole.properties:

    1
    connector.name = blackhole

    JMX Connector

    /etc/catalog/jmx.properties:

    1
    connector.name = jmx

    为最新数据提供了名为 current 的 schema :

    1
    2
    3
    SHOW TABLES FROM jmx.current;

    SELECT vmname, uptime, node FROM jmx.current."java.lang.type=runtime";

    Memory Connector

    可以像使用临时数据库一样使用内存连接器,所有的数据都存储在集群的内存中,停止集群就会销毁数据,可以用作调试。

    /etc/catalog/memory.properties:

    1
    connector.name = memory

    tpch Connector

    TPC-H 是一款面向商品零售业的决策支持系统测试基准,它定义了8张表,22个查询,遵循 SQL92 。TPC-H 基准的数据库模式遵循第三范式,其数据维护功能仅仅限制了潜在的对索引的过度使用,而没有测试 DBMS 执行 ETL 的能力。同时,新兴的数据仓库开始采用新的模型,如星型模型、雪花模型,TPC-H 已经不能精准反应当今数据库系统的真实性能。

    /etc/catalog/tpch.properties:

    1
    2
    connector.name = tpch
    tpch.splits-per-node = 4

    从 TPC 官网下载 TPC-H 的 zip 包 ,需要填写一些个人信息,之后会发送一个下载链接到邮箱中:

    1
    2
    3
    cd ./916c6f4e-1935-4f81-ad6f-04165831ae11-tpc-h-tool/TPC-H_Tools_v3.0.0/dbgen
    cp makefile.suite Makefile
    vi Makefile

    修改 Makefile 的 103 ~ 111 行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    CC      = gcc
    # Current values for DATABASE are: INFORMIX, DB2, TDAT (Teradata)
    # SQLSERVER, SYBASE, ORACLE, VECTORWISE
    # Current values for MACHINE are: ATT, DOS, HP, IBM, ICL, MVS,
    # SGI, SUN, U2200, VMS, LINUX, WIN32
    # Current values for WORKLOAD are: TPCH
    DATABASE= ORACLE
    MACHINE = LINUX
    WORKLOAD = TPCH

    执行 make :

    1
    make

    在 Mac 上编译可能会报如下错误,因为 mac 下的 malloc 头文件移动到了 sys 下:

    1
    2
    3
    4
    5
    6
    gcc -g -DDBNAME=\"dss\" -DLINUX -DORACLE -DTPCH -DRNG_TEST -D_FILE_OFFSET_BITS=64    -c -o bm_utils.o bm_utils.c
    bm_utils.c:71:10: fatal error: 'malloc.h' file not found
    #include <malloc.h>
    ^~~~~~~~~~
    1 error generated.
    make: *** [bm_utils.o] Error 1

    修改 bm_utils.c 文件的第71行, varsub.c 文件的第44行:

    1
    #include <malloc.h>
    1
    #include <sys/malloc.h>

    查看 TPC-H_Tools_v3.0.0/dbgen 目录下生成了两个新的命令 dbgen qgen ,分别用来生成数据与生成 sql:

    1
    2
    3
    ls -lrt
    -rwxr-xr-x 1 staff staff 105842 3 28 11:29 dbgen
    -rwxr-xr-x 1 staff staff 100977 3 28 11:31 qgen

    生成数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # 将 dbgen、qgen 命令拷贝至一个单独的目录,便于测试
    cp dbgen tpch-kit
    cp dists.dss tpch-kit
    cp queries tpch-kit
    cp qgen tpch-kit


    # -s 参数用于控制数据集规模,1GB;-f 参数强制覆盖已有数据;
    cd tpch-kit
    ./dbgen -s 1 -f

    ls -lrt *.tpl
    -rw-r--r-- 1 staff staff 1409184 3 28 11:42 supplier.tbl
    -rw-r--r-- 1 staff staff 24346144 3 28 11:42 customer.tbl
    -rw-r--r-- 1 staff staff 171952161 3 28 11:42 orders.tbl
    -rw-r--r-- 1 staff staff 759863287 3 28 11:42 lineitem.tbl
    -rw-r--r-- 1 staff staff 24135125 3 28 11:42 part.tbl
    -rw-r--r-- 1 staff staff 118984616 3 28 11:42 partsupp.tbl
    -rw-r--r-- 1 staff staff 2224 3 28 11:42 nation.tbl
    -rw-r--r-- 1 staff staff 389 3 28 11:42 region.tbl

    可以将 .tpl 数据转换为 csv :

    1
    for i in `ls *.tbl`; do sed 's/|$//' $i > ${i/tbl/csv}; echo $i; done;

    生成 query sql , DSS_QUERY=./queries 是 sql 模板, -s 表示数据集规模:

    1
    2
    3
    cd tpch-kit
    mkdir q
    for id in `seq 1 22`; do DSS_QUERY=./queries ./qgen -s 1 $id -b ./dists.dss > q/$id.sql; done

    可能会产生如下报错:

    1
    Open failed for ./queries/1.sql at qgen.c:170

    需要设置下环境变量:

    1
    2
    3
    4
    5
    sudo vi /etc/profile
    export DSS_CONFIG=tpch-kit/dbgen
    export DSS_QUERY=$DSS_CONFIG/PATH_TO_QUERIES_FOLDER

    source /etc/profile

    tpch-kit/dbgen/q 目录将会生成22个 query sql 文件:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    cd tpch-kit/q
    ls -lrt
    -rw-r--r--@ 1 staff staff 598 3 28 14:02 1.sql
    -rw-r--r-- 1 staff staff 769 3 28 14:02 2.sql
    -rw-r--r-- 1 staff staff 478 3 28 14:02 3.sql
    -rw-r--r-- 1 staff staff 421 3 28 14:02 4.sql
    -rw-r--r-- 1 staff staff 556 3 28 14:02 5.sql
    -rw-r--r-- 1 staff staff 311 3 28 14:02 6.sql
    -rw-r--r-- 1 staff staff 893 3 28 14:02 7.sql
    -rw-r--r-- 1 staff staff 868 3 28 14:02 8.sql
    -rw-r--r-- 1 staff staff 672 3 28 14:02 9.sql
    -rw-r--r-- 1 staff staff 592 3 28 14:02 10.sql
    -rw-r--r-- 1 staff staff 568 3 28 14:02 11.sql
    -rw-r--r-- 1 staff staff 659 3 28 14:02 12.sql
    -rw-r--r-- 1 staff staff 413 3 28 14:02 13.sql
    -rw-r--r-- 1 staff staff 395 3 28 14:02 14.sql
    -rw-r--r-- 1 staff staff 594 3 28 14:02 15.sql
    -rw-r--r-- 1 staff staff 550 3 28 14:02 16.sql
    -rw-r--r-- 1 staff staff 350 3 28 14:02 17.sql
    -rw-r--r-- 1 staff staff 516 3 28 14:02 18.sql
    -rw-r--r-- 1 staff staff 1041 3 28 14:02 19.sql
    -rw-r--r-- 1 staff staff 670 3 28 14:02 20.sql
    -rw-r--r-- 1 staff staff 727 3 28 14:02 21.sql
    -rw-r--r-- 1 staff staff 726 3 28 14:02 22.sql

    tpcds Connector

    TPC-DS 采用星型、雪花等多维数据模式,包含7张事实表,17张维度表,平均每张表含有18列。其工作负载包含99个 SQL 查询,覆盖 SQL92 和 2003 的核心部分以及 OLAP 。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值是有倾斜的,与真实数据一致。TPC-DS 是与真实场景非常接近的一个测试集。

    /etc/catalog/tpcds.properties:

    1
    2
    connector.name = tpcds
    tpcds.splits-per-node = 4

    安装 TPC-DS 工具

    由于从 TPC 官网下载的 zip 包:

    生成数据时,会报如下错误,暂未找到解决办法:

    因此直接从 下载并进行编译。

    编译 tpcds :

    1
    2
    3
    cd ../tpc-ds-tool/tools
    cp Makefile.suite Makefile
    make

    在 MacOS 上编译,可能会出现如下几个报错:

    错误1 :因为 values.h 是 GNU 的库:

    1
    2
    3
    4
    5
    6
    In file included from mkheader.c:37:
    ./porting.h:46:10: fatal error: 'values.h' file not found
    #include <values.h>
    ^~~~~~~~~~
    1 error generated.
    make: *** [mkheader.o] Error 1

    修改 porting.h 文件的第46行:

    1
    #include <values.h>
    1
    2
    #include <limits.h>
    #include <float.h>

    错误2:malloc.h 头文件位置不对:

    1
    2
    3
    4
    5
    date.c:40:10: fatal error: 'malloc.h' file not found
    #include <malloc.h>
    ^~~~~~~~~~
    1 error generated.
    make: *** [date.o] Error 1

    修改 date.c 文件的第42行, dist.c 文件的第54行, misc.c 文件的第45行, tokenizer.l 的第50行:

    1
    #include <malloc.h>
    1
    #include <sys/malloc.h>

    错误3:缺少宏定义

    1
    2
    3
    4
    genrand.c:87:12: error: use of undeclared identifier 'MAXINT'
    s += MAXINT;
    ^
    ......

    因为 mac 和 linux 的 h 文件差异,部分宏 mac 并没有,因此直接在 genrand.h 中自己添加即可:

    1
    #define MAXINT 4096000

    查看 tpc-ds-tool/tools 目录下生成了两个新的命令 dbgen qgen ,分别用来生成数据与生成 sql:

    1
    2
    3
    ls -lrt *gen
    -rwxr-xr-x 1 staff staff 329603 3 28 16:24 dsdgen
    -rwxr-xr-x 1 staff staff 249219 3 28 16:25 dsqgen

    创建 TPC-DS 测试需要用到的表

    1. 准备好创建表语句
    2. TPC-DS 已经提前准备好了创建表相关的 SQL 文件,位于 tools 目录下:

    3. tpcds.sql:创建25张表
    4. tpcds_ri.sql:创建表与表之间关系的 sql 语句
    5. tpcds_source.sql
    6. 利用数据库连接工具(如:navicat)创建相应的数据库和表
    7. 生成测试数据

      1
      2
      3
      4
      5
      cd tpc-ds-tool
      mkdir data
      cd tools

      ./dsdgen -SCALE 1GB -FORCE -DIR ../data

      之后通常会通过命令将测试数据 load 到具体的数据源存储中,用于后续测试。

      生成99个 query sql

      1
      2
      3
      4
      5
      6
      7
      8
      9
      cd tpc-ds-tool
      mkdir sql
      cd tools

      # for循环命令,生成99个 query sql
      for id in `seq 1 99`; do ./dsqgen -DIRECTORY ../query_templates -TEMPLATE "query${id}.tpl" -DIALECT oracle -FILTER Y > ../sql/query${id}.sql; done

      # 生成单个 query sql 示例
      ./dsqgen -DIRECTORY ../query_templates -TEMPLATE "query8.tpl" -DIALECT oracle -VERBOSE Y > ../sql/query8.sql

      RDBMS Connector

      /etc/catalog/postgresql.properties:

      1
      2
      3
      4
      connector.name = 'postgresql'
      connectot-url = jdbc:postgresql://db.example.com:5432/database
      connector-user = root
      connector-password = secret

      Hive Connector

      /etc/catalog/hive.properties:

      1
      2
      connector.name = hive-hadoop2
      hive.metastore.uri = thrift://example.net:9083

      建表语句:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      CREATE TABLE hive.web.page_views (
      view_time timestamp,
      user_id bigint,
      page_url varchar,
      view_date date,
      country varchar
      ) WITH (
      format = 'ORC',
      external_location = 's3://starburst-external/pageviews',
      partitioned_by = ARRAY['view_date', 'country']
      )

      Presto 自动发现和添加分区的命令:

      1
      2
      3
      4
      5
      CALL system.sync_partition_metadata (
      'web',
      'page_views',
      'FULL'
      )

      Phoenix Connector

      /etc/catalog/bigtables.properties:

      1
      2
      connector.name = phoenix
      phoenix.connection-url = jdbc:phoenix:zookeeper1.zookeeper2:2181:/hbase

      Druid Connector

      /etc/catalog/druid.properties:

      1
      2
      3
      connector.name = druid
      druid.coordinator-url = http://localhost:8081
      druid.broker-url = http://localhost:8082

      Kafka Connector

      /etc/catalog/trafficstream.properties:

      1
      2
      3
      connector.name = kafka
      kafka.table-names = web.pages,web.users
      kafka.nodes = trafficstream.example.com:9092

      kafka 的消息过期配置项:

      1
      2
      3
      log.retention.hours = 168
      log.segment.bytes = 1073741824
      log.clenup.policy = delete

      可以定期将 kafka 的数据迁移至 hdfs:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      -- 创建 hdfs 表
      CREATE TABLE hdfs.web.page_views
      WITH (
      format = 'ORC',
      partitioned_by = ARRAY['view_date']
      )
      AS
      SELECT *
      FROM trafficstream.web.page_views;

      -- 定期查询导入
      INSERT INTO hdfs.web.page_views
      SELECT *
      FROM trafficstream.web.page_views;

      Iceberg Connector

      /etc/catalog/iceberg.properties:

      1
      2
      3
      4
      5
      connector.name = iceberg
      hive.metastore.uri = thrift://example.net:9083
      iceberg.catalog.type = hive
      iceberg.file-format = PARQUET
      iceberg.compression-codec = GZIP

      联邦查询

      1
      2
      3
      4
      5
      6
      7
      SELECT f.uniquecarrier, c.description, count(*) AS ct
      FROM hive.ontime.flights_orc f, -- hive 事实明细表
      postgresql.airline.carrier c -- postgresql 关系维表
      WHERE c.code = f.uniquecarrier
      GROUP BY f.uniquecarrier, c.description
      ORDER BY count(*) DESC
      LIMIT 10;

      参考资料

      参考书籍:《Presto实战》、《Presto技术内幕》
      f8-2019-demo
      官网
      presto 代码库
      presto 论文
      presto 语言 libraries
      jupyter notebook
      TPC 官网
      【大数据之数据仓库】TPCH工具使用指南
      【大数据之数据仓库】基准测试之TPCH
      Compiling TPC-H tools for Mac
      tpch-kit包
      【大数据之数据仓库】TPCDS工具使用指南
      【大数据之数据仓库】基准测试之TPCDS
      tpcds-kit包
      MacOS 下编译 tpcds
      DEFINE 缺失修复记录