February 2024: This post was reviewed and updated to reflect changes in Amazon Athena engine version 3, including cost-based optimization and query result reuse.
Amazon Athena
is an interactive analytics service built on open source frameworks that make it straightforward to analyze data stored using open table and file formats in
Amazon Simple Storage Service
(Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena is easy to use, simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.
In this post, we review the top 10 tips that can improve query performance. We focus on aspects related to storing data in Amazon S3 and tuning specific to queries.
Storage
This section discusses how to structure your data so that you can get the most out of Athena. You can apply the same practices to
Amazon EMR
data processing applications such as Spark, Trino, Presto, and Hive when your data is stored in Amazon S3. We discuss the following best practices:
Partition your data
Bucket your data
Use compression
Optimize file size
Use columnar file formats
1. Partition your data
Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, and region. Partitions act as virtual columns. You define them at table creation, and they can help reduce the amount of data scanned per query, thereby improving performance. You can restrict the amount of data scanned by a query by specifying filters based on the partition. For more details, see
Partitioning data in Athena
.
The following example shows a dataset partitioned by year, stored in an S3 bucket:
$ aws s3 ls s3://athena-examples/flight/parquet/
PRE year=1987/
PRE year=1988/
PRE year=1989/
PRE year=1990/
PRE year=1991/
PRE year=1992/
PRE year=1993/
A table for this dataset would have a
PARTITIONED BY (year STRING)
clause to tell Athena it is partitioned by year. After the table was created, each partition would have to be added, for example with
ALTER TABLE ADD PARTITION
, using an
AWS Glue
crawler, or by running
MSCK REPAIR TABLE
. The table could also be configured to use partition projection (see the bonus tip section for how to set that up).
When querying a partitioned table, you can restrict the partitions that are scanned by using the partition key in the
WHERE
clause:
SELECT dest, origin FROM flights WHERE year = '1991'
When this query runs, Athena will see that there is a predicate (filter) on the year partition key, and only load data from matching partitions. For this query, that means that only data in
s3://athena-examples/flight/parquet/year=1991/
will be read.
Datasets can have multiple partitions keys. The following is an example from the
NOAA Global Historical Climatology Network dataset from the Registry of Open Data on AWS
. The dataset is partitioned by station and element, and the listing is for one particular station:
$ aws s3 ls --no-sign-request s3://noaa-ghcn-pds/parquet/by_station/STATION=ASN00023351/
PRE ELEMENT=DAPR/
PRE ELEMENT=DWPR/
PRE ELEMENT=MDPR/
PRE ELEMENT=PRCP/
A table for this dataset would have a
PARTITIONED BY (station STRING, element STRING)
clause to tell Athena it is partitioned this way.
When deciding the columns on which to partition, consider the following:
Pick partition keys that support your queries. Work backward from your queries and find fields that are often used to filter the dataset.
Partition keys should have a relatively low cardinality. As the number of partitions in your table increases, the higher the overhead of retrieving and processing the partition metadata, and the smaller your files. Partition keys with too many values can negate the benefit of partitioning.
If your data is heavily skewed to one partition value, and most queries use that value, the overhead may negate the benefit of partitioning.
Datasets that grow over time should generally be partitioned by date. A common pattern is queries that look at specific windows of time, for example the last week or last month. Partitioniong by date ensures that the amount of data read by these queries remains constant even as the size of the full dataset grows over time.
The following table compares query runtimes between a partitioned and non-partitioned table. The table is from the industry standard benchmark dataset TPC-H. Both versions of the table contain 74 GB data, uncompressed and stored in text format. The partitioned table is partitioned by the
l_shipdate
column and has 2,526 partitions.
Query
Non- Partitioned Table
Partitioned Table
Savings
Runtime
Data scanned
Runtime
Data scanned
WHERE l_shipdate = '1996-09-01'
In the output, look for the
SOURCE
fragment, and within it the label
PARTITION_KEY
. This line will tell you which partitions will be read by the query:
Fragment 1 [SOURCE]
Output layout: [count_0]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
│ count_0 := count(*)
└─ TableScan[table = awsdatacatalog:tpc_h:lineitem]
Layout: []
Estimates: {rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
l_shipdate:string:PARTITION_KEY
:: [[1996-09-01]]
If a table has multiple partition keys, there will be a line for each. If the query matches multiple values for a partition key, each value will be included in the output. For example, if the query was changed to select a range of values for
l_shipdate
, the last two lines could instead look like the following:
l_shipdate:string:PARTITION_KEY
:: [[1996-09-01], [1996-09-02], [1996-09-03], [1996-09-04], [1996-09-05]]
Partitioning also has a penalty if a partition filter isn’t used in the query, as shown in the following table. Make sure not to over-partition the data. Over-partitioning leads to greater quantity of smaller files, which hurts performance. We cover this in more detail later in this post.
Query
Non- Partitioned Table
Partitioned Table
Savings
Runtime
Data scanned
Runtime
Data scanned
Another penalty from partitioning is the time it takes to find the partitions that match the query, called
partition pruning
. A way to mitigate this is to enable partition indexes on the table. This can lead to better performance when a table has tens of thousands of partitions (or more). With partition indexes, only the metadata for the partition value in the query’s filter is retrieved from the catalog, instead of retrieving all the partitions’ metadata. The result is faster queries for such highly partitioned tables. The following table compares query runtimes between a partitioned table with no partition indexing and with partition indexing. The table contains approximately 100,000 partitions and uncompressed text data. The orders table is partitioned by the
o_custkey
column.
Query
Partition indexing = disabled
Partition indexing = enabled
Speed up
Runtime
Runtime
To learn more about the benefits of the AWS Glue Data Catalog’s partition indexing in Athena, refer to
Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes
.
See the section on creating optimized datasets later in this post for examples on how to partition your data.
2. Bucket your data
Another way to reduce the amount of data a query has to read is to bucket the data within each partition.
Bucketing
is a technique for distributing records into separate files based on the value of one of the columns. This ensures that all records with the same value will be in the same file. Bucketing is useful when you have a column with high cardinality (many distinct values) and many of your queries look up specific values of the column. Good candidates for bucketing are columns such as IDs for users or devices.
In Athena, if you have a dataset that is already bucketed, you can specify the bucketed column inside your
CREATE TABLE
statement by specifying
CLUSTERED BY (
<bucketed columns>
) INTO
<number of buckets>
BUCKETS
. Athena supports datasets bucketed with Hive or Spark, and you can create bucketed datasets with
CREATE TABLE AS
(CTAS) in Athena.
The following table shows the difference in a customer table where the
c_custkey
column is used to create 32 buckets. The customer table is 2.29 GB in size.
Query
Non- Bucketed Table
Bucketed Table Using c_custkey as Clustered Column
Savings
Runtime
Data scanned
Runtime
Data scanned
Running
EXPLAIN ANALYZE
on the preceding query shows how bucketing helped in reading less data from Amazon S3 for the customer table. The following snippets of the
EXPLAIN ANALYZE
output on the non-bucketed and bucketed tables’ query highlights input rows and size of data to understand the difference.
The following is the output for the non-bucketed table:
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE]
Layout: []
Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
CPU: 19.48s (99.94%), Scheduled: 37.43s (99.97%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
Input avg.: 202702.70 rows
, Input std.dev.: 4.83%
c_custkey := c_custkey:int:REGULAR
Input: 15000000 rows (2.29GB), Filtered: 100.00%, Physical input: 2.29GB, Physical input time: 0.00ms
The following is the output for the bucketed table:
─ ScanFilterProject[table = awsdatacatalog:tpc_h:customer buckets=32, filterPredicate = ("c_custkey" = 12677856), projectLocality = LOCAL, protectedBarrier = NONE]
Layout: []
Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
CPU: 654.00ms (100.00%), Scheduled: 1.13s (100.00%), Blocked: 0.00ns (0.00%), Output: 1 row (0B)
Input avg.: 156250.00 rows
, Input std.dev.: 22.35%
c_custkey := c_custkey:int:REGULAR
Input: 468750 rows (72.94MB), Filtered: 100.00%, Physical input: 72.94MB, Physical input time: 0.00ns
You can find out more about how to work with bucketed data in Athena by reading the following resources:
Partitioning and bucketing in Athena
Examples of CTAS queries: creating bucketed and partitioned tables
See the section on creating optimized datasets later in this post for examples on how to bucket your data.
3. Use compression
Compressing your data can speed up your queries significantly. The smaller data sizes reduce the data scanned from Amazon S3, resulting in lower costs of running queries. It also reduces the network traffic from Amazon S3 to Athena.
Athena supports a variety of compression formats, including common formats like gzip, Snappy, and zstd. For the whole list of supported formats, see
Athena compression support
.
Querying compressed text data, such as JSON and CSV, requires special consideration. When Athena reads data, it assigns different ranges of files to different nodes, to maximize parallel processing of the data. Each range is known as a
split
and files that can be read in parallel are called
splittable
. Most of the common compression formats are not splittable—they require the reader to start from the beginning of the file. This means that if a dataset is a single compressed CSV file, for example, only one node can be used for query processing.
When you create datasets consisting of compressed text files, aim for a balance between the number of files and the file size. We discuss this more in the next section on optimizing file sizes.
Parquet and ORC files are always splittable because these formats compress sections of the files separately, and have metadata that contains the locations within the files for the different sections.
The
gzip
format provides good compression ratios and has a wide range support across other tools and services. The
zstd
(Zstandard) format is a newer compression format with a good balance between performance and compression ratio. The bzip2 and LZO compression formats are splittable, but are not recommended if you want performance and compatibility.
See the section on creating optimized datasets later in this post for examples on how to compress your data.
4. Optimize file sizes
Queries run more efficiently when data can be read in parallel, and as much data as possible can be read in a single read request. There is an overhead in reading each file, for example getting metadata, making the request to Amazon S3, and setting up compression dictionaries. This is usually not noticeable, but as the number of files grows, it can add up. To maximize the performance of queries, you should aim for a balance between the number of files and their size.
A general guideline is to aim for splits that are around 128 MB. A split is a part of a file, for example a byte range of an uncompressed text file, or a page of a Parquet file. As discussed in the section on compression, most compressed text files are not splittable, and are therefore processed as a single split. Analytics-optimized formats like Parquet and ORC are always splittable.
One reason why you may end up with many small files is over-partitioning, as discussed in the previous section on partitioning. An indication that your query performance suffers from too many small files is if the planning phase in the query stats is more than a few percent of the total running time. In the worst case, your queries may fail with an Amazon S3 error saying “Please reduce your request rate.” This happens when the number of files is so great that Athena exceeds Amazon S3 service quotas. For more information, see
Best practices design patterns: optimizing Amazon S3 performance
.
One remedy to solve your small file problem is to use the
S3DistCP
utility on Amazon EMR. You can use it to combine smaller files into larger objects. You can also use S3DistCP to move large amounts of data in an optimized fashion from HDFS to Amazon S3, Amazon S3 to Amazon S3, and Amazon S3 to HDFS. We discuss another alternative to reprocessing data using Athena Spark at the end of this section.
Some benefits of having fewer, larger files include faster listing, fewer Amazon S3 requests, and less metadata to manage.
For example, the following table compares the difference between a query that has to read 100,000 files with a query for the same dataset stored as a single file. Both sets of files contain the same rows, stored as uncompressed text files. The total amount of data is 74 GB.
Query
Number of Files
Runtime
File size, the number of files, and whether the files are compressed can make a big difference to query performance. When the data is not compressed, Athena can process files in parallel, in optimal sizes. This makes processing a single uncompressed text file more efficient than 100,000 files. When the data is compressed, the number and size of the files makes an even bigger difference, but in this case, it’s important to have enough files to allow Athena to process the dataset in parallel.
See the section on creating optimized datasets later in this post for examples on how to rewrite your datasets to combine small files.
5. Use columnar file formats
Apache Parquet
and
Apache ORC
are popular file formats for analytics workloads. They are often described as
columnar
file formats because they store data not by row, but by column. They also have features that allow query engines to reduce the amount of data that needs to be loaded in different ways. For example, by storing and compressing columns separately, you can achieve higher compression ratios, and only the columns referenced in a query need to be read.
Columnar file formats use multiple compression strategies for data. For example, a column with many repeated values can be encoded using run length encoding, where the value is stored once along with a repetition count, or dictionary encoded, where each value is replaced by a pointer to a lookup table. Textual data can be compressed with standard compression formats like gzip, Snappy, and zstd. For more details, see
Athena compression support
.
Parquet and ORC can be tuned for different datasets. For example, it can be beneficial to increase the block (Parquet) or stripe (ORC) size in some situations. When a dataset has many columns, we recommend increasing the size from the default 128 MB in Parquet and 64 MB in ORC. This ensures that enough values for each column are stored together and fewer reads are required.
Another way to tune a dataset using a columnar file format is to keep the data sorted by a column that is often included in queries. Parquet and ORC store metadata such as the minimum and maximum value of a column each block of data. This means that the query engine can skip reading a block of data if it sees that the values it contains don’t match the query. This is called
predicate pushdown
. For example, data often has some kind of timestamp, and by keeping it sorted within the files by this property, a query that looks for a specific range of time can skip reading data from blocks that are before or after the timestamp.
You can combine sorting by timestamp with partitioning to yield even better performance gains and cost savings. Let’s say that you’re often doing aggregations over a time window of a few hours. Partitioning by hour would be possible, but could risk producing too many and too small files. Instead, you can partition by day and sort the data by timestamp. This way, coarse-grained partitioning is used to reduce the set of files that will be included in the query to only those in matching partitions, and the sorting will be used to skip blocks within the remaining files. Just remember to include filters on both the partition keys and the timestamp column.
The following table compares runtimes and data scanned for the same dataset in text gzip, Parquet gzip with no sorting, and Parquet gzip with sorting on
l_partkey
.
Query
SELECT l_orderkey
FROM lineitem
WHERE l_partkey = 17766770
Savings Compared to Text Format
Text gzip data
Runtime
11.9 seconds
Data Scanned
23.7 GB
Parquet gzip data with no sorting
Runtime
2.1 seconds
~82% faster
Data Scanned
2.0 GB
$0.009
~91% cheaper
Parquet gzip data sorted on
l_partkey
Runtime
1.1 second
~90% faster
Data Scanned
38.8 MB
$0.0001
~99.9% cheaper
Creating optimized datasets
In this section, we show you how to use Athena Spark to transform a dataset and apply the optimizations that we discussed in the previous sections. You can also use this code in most other Spark runtimes, for example
Amazon EMR Serverless
or
AWS Glue ETL
. You can also use Athena SQL to transform data and apply many of the optimizations described in this post, but Athena Spark provides more configuration options and control over the process.
The following code first makes the
tpc_h
database the default. The location of this database will be used to determine where data is written. It then creates a new table called
customer_optimized
by completing the following actions:
Read all rows in the table customer
Reduce the number of files that will be written per bucket per partition to four, using coalesce
Sort records by
c_name
with
sortWithinPartitions
Write the records partitioned by
c_mktsegment
and
c_nationkey
with
partitionBy
and bucketed by
c_custkey
into 32 buckets with
bucketBy
into Parquet files compressed with
zstd
See the following code:
spark.sql("use tpc_h")
spark\
.read.table("customer")\
.coalesce(4)\
.sortWithinPartitions("c_name")\
.write\
.partitionBy("c_mktsegment", "c_nationkey")\
.bucketBy(32, "c_custkey")
.saveAsTable("customer_optimized", format="parquet", compression="gzip")
This example shows all the optimizations at the same time. Depending on your use case, you may only need one or a few of them. Refer back to the earlier sections in this post for when each is of most use.
For more information about how to process data using Amazon EMR, EMR Serverless, AWS Glue ETL, or Athena SQL, see the following resources:
Extract, Transform and Load data into S3 data lake using CTAS and INSERT INTO statements in Amazon Athena
Simplify your ETL and ML pipelines using the Amazon Athena UNLOAD feature
Build a Data Lake Foundation with AWS Glue and Amazon S3
Converting a large dataset to Parquet
Converting to columnar formats
Query tuning
The Athena SQL engine is built on the open source distributed query engines
Trino
and
Presto
. Understanding how it works provides insight into how you can optimize queries when running them. This section details the following best practices:
Optimize
ORDER BY
Optimize joins
Optimize
GROUP BY
Use approximate functions
Only include the columns that you need
6. Optimize ORDER BY
The ORDER BY clause returns the results of a query in sort order. Athena uses distributed sort to run the sort operation in parallel on multiple nodes. If you’re using the
ORDER BY
clause to look at the top or bottom N values, use a
LIMIT
clause to reduce the cost of the sort, which results in a faster query runtime.
For example, the following table summarizes the runtime for a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.
Query
Runtime
7. Optimize joins
Choosing the right join order is critical for better query performance. When you join two tables, specify the larger table on the left side of the join and the smaller table on the right side. For the most common type of joins that use equality conditions, Athena builds a lookup table from the table on the right and distributes it to the worker nodes. It then streams the table on the left, joining rows by looking up matching values in the lookup table. This is called a
distributed hash join
. Because the lookup table built from the table on the right side is kept in memory, the smaller that table is, the less memory will be used, and the faster the join will run.
Because the hash table is distributed across the worker nodes, data skew can affect performance. If many rows have the same values for the columns used in the join condition, one node can end up having to process a large part of the join itself, while other nodes are idle. For best performance, make sure that the columns in your join conditions have as uniform a distribution of values as possible.
The following table shows the runtimes on a dataset with 74 GB total data, uncompressed in text format. The lineitem table has around 600 million rows, and the part table around 20 million.
Query
Runtime
You can use the execution details visualizer on the Athena console to inspect the order that joins are executed. The visualizer also shows the number of rows that are joined from each table. See
Viewing statistics and execution details for completed queries
for more information on how to use the visualizer.
When you join three or more tables, consider joining the large table with the smallest table first to reduce the intermediate result and then join with the other tables.
Cost-based join optimizations
If a table has table statistics in the AWS Glue Data Catalog, Athena will use these to perform join reordering and aggregation pushdown using cost-based optimization (“cost” here refers to computational cost). When a table has statistics, the query optimizer is able to see which order of the tables is the most efficient and can perform the optimization automatically. This means you don’t have to manually ensure that the larger table is on the left side of the join.
For more information about using the cost-based optimizer, see
Speed up queries with the cost-based optimizer in Amazon Athena
and
Using the cost-based optimizer
.
Joining partitioned tables
When querying partitioned tables, it’s best to include filters on all partition keys, for all tables. This ensures that the query planner can skip as much as possible of listing and reading files.
In the following example, the
orders
and
lineitem
tables are partitioned by the order date:
o_orderdate
in
orders
, and
l_orderdate
in
lineitem
. In the first query, there is no predicate on
l_orderdate
, and the engine must scan all partitions in the
lineitem
table. When the order date is added to the join condition, the query planner can see that it only needs to load a single partition for two tables, and the amount of data scanned is reduced significantly.
Query
Data scanned
Runtime
JOIN orders ON (l_orderkey = o_orderkey
AND
l_orderdate = o_orderdate
)
AND o_orderdate = '1993-07-08'
35.4 MB
2 seconds
As described earlier, you can use
EXPLAIN
to inspect which partitions will be read by the query. This can be particularly important to do when joining multiple partitioned tables.
Sometimes, the partitions of one or more of the tables involved in a query depends on information discovered when running the query. In the worst case, this means that all partitions have to be read, because the query planner can’t determine the partitions from analyzing the query. However, in these cases, Athena can often skip reading partitions while the query runs using a mechanism called
dynamic partitioning pruning
. For example, when the engine sees that the join condition involves a partition key, and the number of values on the right side is low, it can broadcast this information between the worker nodes. The worker nodes then use this information to skip reading files in partitions that would otherwise be filtered out later by the join condition.
In the following example, the
orders
and
lineitem
table are partitioned by the order date (
o_orderdate
in
orders
,
l_orderdate
in
lineitem
). The
lineitem
table is about 75 GB of CSV in total, and orders is about 16 GB. The query calculates the average price of line items from orders with a specific set of criteria that appear in about 10% of the partitions. Because these partitions are not known in advance, in the worst case, this would mean that 90 GB of data would have to be scanned, but in practice it scans only 26.5 GB:
SELECT AVG(l_extendedprice)
FROM lineitem
JOIN orders ON (l_orderkey = o_orderkey AND l_orderdate = o_orderdate)
WHERE o_clerk = 'Clerk#000094772'
AND o_orderpriority = '1-URGENT'
AND o_orderstatus = 'F'
When the query runs, Athena collects the values of
o_orderdate
for rows that match the predicates. It broadcasts these across the cluster so that the nodes can skip reading partitions of the
lineitem
table that don’t match.
You can use the
EXPLAIN
command to confirm that Athena will perform dynamic partition pruning. Look for
dynamicFilterAssignment
in the output. For the query in this example, the explain plan looks like the following code:
Fragment 1 [HASH]
Output layout: [avg_4]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [avg_4:row(double, bigint)]
│ Estimates: {rows: 1 (55B), cpu: ?, memory: 55B, network: 0B}
│ avg_4 := avg("l_extendedprice")
└─ InnerJoin[criteria = ("l_orderkey" = "o_orderkey") AND ("l_orderdate" = "o_orderdate"), hash = [$hashvalue, $hashvalue_6], distribution = PARTITIONED]
│ Layout: [l_extendedprice:double]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ Distribution: PARTITIONED
│
dynamicFilterAssignments = {o_orderkey -> #df_562, o_orderdate -> #df_563}
├─ RemoteSource[sourceFragmentIds = [2]]
│ Layout: [l_orderkey:integer, l_extendedprice:double, l_orderdate:varchar, $hashvalue:bigint]
└─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_6], arguments = ["o_orderkey", "o_orderdate"]]
│ Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_6:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [o_orderkey:integer, o_orderdate:varchar, $hashvalue_7:bigint]
For more information about dynamic partition pruning, see
Dynamic filtering
in the Trino documentation.
Beware of cross joins
The join condition can also make a big difference in join performance. If the join condition is complex, for example if it uses
LIKE
or
>
, it will be much more computationally demanding. In the worst case, every record from one side of the join must be compared to every record on the other side of the join. This is called a
cross join
. Whenever possible, use equality conditions.
You can use the
EXPLAIN
command to see what kind of join Athena will perform. For example, if you run
EXPLAIN
on a query with a join condition like
ON t1.name LIKE (t2.prefix || '%')
, you will see something like the following in the output:
Fragment 1 [HASH]
└─ CrossJoin[]
When you see a cross join in a query plan, consider rewriting the query to instead use an equality condition. Unless one of the tables is very small, queries with cross joins run a big risk of exceeding query timeout limits.
8. Optimize GROUP BY
When performing aggregations, you should include as few columns as possible in the
GROUP BY
clause to reduce the amount of CPU and memory required. Additionally, make sure that you are grouping by columns that have as uniform distribution of values as possible.
One cause of performance issues in queries with aggregations is data skew. This can happen when many rows have the same values for the columns in the
GROUP BY
clause. During aggregation, the rows are distributed across the worker nodes based on a hash of the columns in the
GROUP BY
clause. If the data is skewed, one node may have to process a large part of the aggregation itself, while other nodes are idle.
Redundant columns are often added to
GROUP BY
clauses because the SQL language requires that an expression is either in the
GROUP BY
clause or uses an aggregate function. For example, if you have a table with a
customer_id
and
customer_name
column, you often end up writing
GROUP BY c_custkey, c_name
when you want to aggregate over customers, even though there will only be one name for any one
customer_id
. One way around this that can speed up queries with many redundant columns in the
GROUP BY
clause is the
ARBITRARY
function. It’s an aggregate function that, as the name suggests, returns an arbitrary value from the group.
In this example, we want to know the number of orders per customer. When we join the customer table with the
orders
table, there will be one row per order, and we use
GROUP BY c_custkey
to aggregate these per customer. We want the customer name in the results, and use
ARBITRARY(c_name)
to avoid having to add the
c_name
column to the
GROUP BY
clause:
SELECT c_custkey,
ARBITRARY(c_name) AS c_name,
COUNT(*) AS order_count
FROM customer
JOIN orders ON (customer.c_custkey = orders.o_custkey)
GROUP BY c_custkey
Whenever you can, you should remove unnecessary columns from the
GROUP BY
clause. The speedup will not be noticeable when there is only a single column, like in the previous example. However, it can be critical for performance on queries over large datasets where there are many columns in the
GROUP BY
clause.
9. Use approximate functions
For exploring large datasets, a common use case is to find the count of distinct values for a certain column using
COUNT(DISTINCT column)
. An example is looking at the number of unique users visiting a webpage.
When an exact number may not be required (for instance, if you’re looking for which webpages to deep dive into), consider using
approx_distinct(column)
. This function tries to minimize the memory usage by counting unique hashes of values instead of entire strings. The drawback is that there is a standard error of 2.3%.
The following table summarizes the speedup on a dataset with a 74 GB table, uncompressed in text format, with approximately 600 million rows.
Query
Runtime
For more information, see
Approximate aggregate functions
in the Trino documentation.
10. Only include the columns that you need
When running your queries, limit the final
SELECT
statement to only the columns that you need instead of selecting all columns. Trimming the number of columns reduces the amount of data that needs to be processed through the entire query pipeline, and reduces the amount of data written in the final result. This especially helps when you’re querying tables that have a large number of columns that are string-based, and when you perform multiple joins or aggregations. For columnar formats, it reduces the data scanned from Amazon S3 because only specific columns’ data is read.
The following table summarizes the speedup on a dataset with a 7.25 GB table, uncompressed in text format, with approximately 60 million rows.
Query
Runtime
SELECT c_name, l_quantity, o_totalprice
FROM lineitem, orders, customer
WHERE l_orderkey = o_orderkey
AND c_custkey = o_custkey
5.2 seconds
Speedup
73% faster
Bonus tips
In this section, we provide additional performance tuning tips, and new performance-oriented features launched since the first version of this post.
Optimize partition processing using partition projection
Processing partition information can be a bottleneck for Athena queries when you have a very large number of partitions and aren’t using AWS Glue partition indexing. You can use
partition projection
in Athena to speed up query processing of highly partitioned tables and automate partition management. Partition projection helps minimize this overhead by allowing you to query partitions by calculating partition information rather than retrieving it from a metastore. It eliminates the need to add partitions’ metadata to the AWS Glue table.
In partition projection, partition values and locations are calculated from configuration rather than read from a repository like the AWS Glue Data Catalog. Because in-memory operations are usually faster than remote operations, partition projection can reduce the runtime of queries against highly partitioned tables. Depending on the specific characteristics of the query and underlying data, partition projection can significantly reduce query runtime for queries that are constrained by partition metadata retrieval.
Using partition projection is ideal when your partitions’ schemas are the same or if the tables’ schema always accurately describes the partitions schemas. It can be used to partition very high cardinality columns like IDs, or date ranges at very fine granularity.
See
Partition projection with Amazon Athena
for more details.
Speed up queries producing large result sets using UNLOAD
Running a
SELECT
query in Athena produces a single result file in Amazon S3 in uncompressed CSV format. If your query is expected to output a large result, then significant time is spent in writing results as one single file to Amazon S3. With
UNLOAD
, you can split the results into multiple files in Amazon S3, which reduces the time spent in the writing phase. You can also specify the result format (ORC, Parquet, Avro, JSON, or TEXTFILE) and compression type (defaults to gzip for Parquet, JSON, and TEXTFILE; and zlib for ORC) for the result set.
The following table shows a comparison between
SELECT
and
UNLOAD
statements. The query is expected to output approximately 13 GB of uncompressed data.
Query
SELECT * FROM lineitem LIMIT 85700000
UNLOAD (SELECT * FROM lineitem LIMIT 85700000) to <s3-output-location> with (format=’TEXTFILE’)
Savings
Runtime
362 seconds
33.2 seconds
~90% faster
Result set
13 GB (CSV, uncompressed)
3.2 GB (CSV, gzip compressed)
~75% reduced storage
Reuse query results when the data hasn’t changed
Data lake datasets are often updated only daily, or a few times per day, but are also often queried more frequently. You may have a query that runs to populate a dashboard or every time a view in your application is accessed. If the data hasn’t changed since the last time it ran, there is no need to compute the result again. In fact, it will take longer and cost more to do so. In these situations, you can use query result reuse. This is a feature where you tell Athena that if the same query was run in, for example, the last 15 minutes, it should return the result for that run instead of computing it again. If there is such a result, Athena returns this immediately, and no data will be scanned.
For more information about query result reuse, see
Reduce cost and improve query performance with Amazon Athena Query Result Reuse
and
Reusing query results
.
Conclusion
This post covered our top 10 tips for optimizing your interactive analysis on Athena SQL. You can apply these same practices when using Trino on Amazon EMR.
You can also view the
Turkic translated version of this post
.
About the Authors
Mert Hocanin
is a Principal Big Data Architect with AWS Lake Formation.
Pathik Shah
is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.
Theo Tolv
is a Senior Analytics Architect based in Stockholm, Sweden. He’s worked with small and big data for most of his career, and has built applications running on AWS since 2008. In his spare time, he likes to tinker with electronics and read space opera.
Audit History
Last reviewed and updated in February 2024 by Theo Tolv | Sr. Analytics Architect