spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog.
Catalog configuration
A catalog is created and named by adding a property spark.sql.catalog.(catalog-name)
with an implementation class for its value.
Iceberg supplies two implementations:
org.apache.iceberg.spark.SparkCatalog
supports a Hive Metastore or a Hadoop warehouse as a catalog
org.apache.iceberg.spark.SparkSessionCatalog
adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables
Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:
Property
Values
Description
spark.sql.catalog.catalog-name.type
hive
, hadoop
, rest
, glue
, jdbc
or nessie
The underlying Iceberg catalog implementation, HiveCatalog
, HadoopCatalog
, RESTCatalog
, GlueCatalog
, JdbcCatalog
, NessieCatalog
or left unset if using a custom catalog
spark.sql.catalog.catalog-name.catalog-impl
The custom Iceberg catalog implementation. If type
is null, catalog-impl
must not be null.
spark.sql.catalog.catalog-name.io-impl
The custom FileIO implementation.
spark.sql.catalog.catalog-name.metrics-reporter-impl
The custom MetricsReporter implementation.
spark.sql.catalog.catalog-name.default-namespace
default
The default current namespace for the catalog
spark.sql.catalog.catalog-name.uri
thrift://host:port
Hive metastore URL for hive typed catalog, REST URL for REST typed catalog
spark.sql.catalog.catalog-name.warehouse
hdfs://nn:8020/warehouse/path
Base path for the warehouse directory
spark.sql.catalog.catalog-name.cache-enabled
true
or false
Whether to enable catalog cache, default value is true
spark.sql.catalog.catalog-name.cache.expiration-interval-ms
30000
(30 seconds)
Duration after which cached catalog entries are expired; Only effective if cache-enabled
is true
. -1
disables cache expiration and 0
disables caching entirely, irrespective of cache-enabled
. Default is 30000
(30 seconds)
spark.sql.catalog.catalog-name.table-default.propertyKey
Default Iceberg table property value for property key propertyKey, which will be set on tables created by this catalog if not overridden
spark.sql.catalog.catalog-name.table-override.propertyKey
Enforced Iceberg table property value for property key propertyKey, which cannot be overridden by user
Additional properties can be found in common catalog configuration.
Using catalogs
Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod
and hadoop_prod
can be used to prefix database and table names that will be loaded from those catalogs.
SELECT * FROM hive_prod.db.table; -- load db.table from catalog hive_prod
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
USE hive_prod.db;
SELECT * FROM table; -- load db.table from catalog hive_prod
To see the current catalog and namespace, run SHOW CURRENT NAMESPACE
.
To add Iceberg table support to Spark's built-in catalog, configure spark_catalog
to use Iceberg's SparkSessionCatalog
.
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
Spark's built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg's SparkSessionCatalog
as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
Similar to configuring Hadoop properties by using spark.hadoop.*
, it's possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix spark.sql.catalog.(catalog-name).hadoop.*
. These properties will take precedence over values configured globally using spark.hadoop.*
and will only affect Iceberg tables.
spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000
Spark supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property. Here is an example:
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like CALL
for stored procedures or ALTER TABLE ... WRITE ORDERED BY
.
Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property:
Spark extensions property
Iceberg extensions implementation
Runtime configuration
Read options
Spark read options are passed when configuring the DataFrameReader, like this:
// time travel
spark.read
.option("snapshot-id", 10963874102873L)
.table("catalog.db.table")
as-of-timestamp
(latest)
A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
split-size
As per table property
Overrides this table's read.split.target-size and read.split.metadata-target-size
lookback
As per table property
Overrides this table's read.split.planning-lookback
file-open-cost
As per table property
Overrides this table's read.split.open-file-cost
vectorization-enabled
As per table property
Overrides this table's read.parquet.vectorization.enabled
batch-size
As per table property
Overrides this table's read.parquet.vectorization.batch-size
stream-from-timestamp
(none)
A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used
Spark write options are passed when configuring the DataFrameWriter, like this:
// write with Avro instead of Parquet
df.write
.option("write-format", "avro")
.option("snapshot-property.key", "value")
.insertInto("catalog.db.table")
write-format
Table write.format.default
File format to use for this write operation; parquet, avro, or orc
target-file-size-bytes
As per table property
Overrides this table's write.target-file-size-bytes
check-nullability
Sets the nullable check on fields
snapshot-property.custom-key
Adds an entry with custom-key and corresponding value in the snapshot summary (the snapshot-property.
prefix is only required for DSv2)
fanout-enabled
false
Overrides this table's write.spark.fanout.enabled
check-ordering
Checks if input schema and table schema are same
isolation-level
Desired isolation level for Dataframe overwrite operations. null
=> no checks (for idempotent writes), serializable
=> check for concurrent inserts or deletes in destination partitions, snapshot
=> checks for concurrent deletes in destination partitions.
validate-from-snapshot-id
If isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via Table API or Snapshots table. If null, the table's oldest known snapshot is used.
compression-codec
Table write.(fileformat).compression-codec
Overrides this table's compression codec for this write
compression-level
Table write.(fileformat).compression-level
Overrides this table's compression level for Parquet and Avro tables for this write
compression-strategy
Table write.orc.compression-strategy
Overrides this table's compression strategy for ORC tables for this write
CommitMetadata provides an interface to add custom metadata to a snapshot summary during a SQL execution, which can be beneficial for purposes such as auditing or change tracking. If properties start with snapshot-property.
, then that prefix will be removed from each property. Here is an example: