The connector supports reading Google BigQuery tables into Spark's DataFrames, and writing DataFrames back into BigQuery. This is done by using the Spark SQL Data Source API to communicate with BigQuery.
The Storage API streams data in parallel directly from BigQuery via gRPC without using Google Cloud Storage as an intermediary.
It has a number of advantages over using the previous export-based read flow that should generally lead to better read performance:
It does not leave any temporary files in Google Cloud Storage. Rows are read directly from BigQuery servers using the Arrow or Avro wire formats.
The new API allows column and predicate filtering to only read the data you are interested in.
Since BigQuery is backed by a columnar datastore , it can efficiently stream data without reading all columns.
The Storage API supports arbitrary pushdown of predicate filters. Connector version 0.8.0-beta and above support pushdown of arbitrary filters to Bigquery.
There is a known issue in Spark that does not allow pushdown of filters on nested fields. For example - filters like
address.city = "Sunnyvale"
will not get pushdown to Bigquery.
The API rebalances records between readers until they all complete. This means that all Map phases will finish nearly concurrently. See this blog article on how dynamic sharding is similarly used in Google Cloud Dataflow .
See Configuring Partitioning for more details.
Follow these instructions .
If you do not have an Apache Spark environment you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use
spark-submit
on any cluster.
Any Dataproc cluster using the API needs the 'bigquery' or 'cloud-platform' scopes. Dataproc clusters have the 'bigquery' scope by default, so most clusters in enabled projects should work by default e.g.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
The latest version of the connector is publicly available in the following links:
version
Scala 2.13
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.38.0.jar
(HTTP link)
Scala 2.12
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.38.0.jar
(HTTP link)
Scala 2.11
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar
(HTTP link)
The first six versions are Java based connectors targeting Spark 2.4/3.1/3.2/3.3/3.4/3.5 of all Scala versions built on the new
Data Source APIs (Data Source API v2) of Spark.
The final two connectors are Scala based connectors, please use the jar relevant to your Spark installation as outlined
below.
Connector \ Spark
The connector is also available from the
Maven Central
repository. It can be used using the --packages
option or the
spark.jars.packages
configuration property. Use the following value
version
Connector Artifact
Dataproc clusters created using image 2.1 and above, or batches using the Dataproc serverless service come with built-in Spark BigQuery connector.
Using the standard --jars
or --packages
(or alternatively, the spark.jars
/spark.jars.packages
configuration) won't help in this case as the built-in connector takes precedence.
To use another version than the built-in one, please do one of the following:
For Dataproc clusters, using image 2.1 and above, add the following flag on cluster creation to upgrade the version --metadata SPARK_BQ_CONNECTOR_VERSION=0.38.0
, or --metadata SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.3-bigquery-0.38.0.jar
to create the cluster with a different jar. The URL can point to any valid connector JAR for the cluster's Spark version.
For Dataproc serverless batches, add the following property on batch creation to upgrade the version: --properties dataproc.sparkBqConnector.version=0.38.0
, or --properties dataproc.sparkBqConnector.uri=gs://spark-lib/bigquery/spark-3.3-bigquery-0.38.0.jar
to create the batch with a different jar. The URL can point to any valid connector JAR for the runtime's Spark version.
You can run a simple PySpark wordcount against the API without compilation by running
Dataproc image 1.5 and above
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.38.0.jar \
examples/python/shakespeare.py
Dataproc image 1.4 and below
gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.29.0.jar \
examples/python/shakespeare.py
https://codelabs.developers.google.com/codelabs/pyspark-bigquery
The connector uses the cross language Spark SQL Data Source API:
df = spark.read \
.format("bigquery") \
.load("bigquery-public-data.samples.shakespeare")
or the Scala only implicit API:
import com.google.cloud.spark.bigquery._
val df = spark.read.bigquery("bigquery-public-data.samples.shakespeare")
For more information, see additional code samples in
Python,
Scala
Java.
The connector allows you to run any
Standard SQL
SELECT query on BigQuery and fetch its results directly to a Spark Dataframe.
This is easily done as described in the following code sample:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
df = spark.read.format("bigquery").load(sql)
df.show()
Which yields the result
+----------+-------+
| tag| c|
+----------+-------+
|javascript|1643617|
| python|1352904|
| java|1218220|
| android| 913638|
| php| 911806|
| c#| 905331|
| html| 769499|
| jquery| 608071|
| css| 510343|
| c++| 458938|
+----------+-------+
A second option is to use the query
option like this:
df = spark.read.format("bigquery").option("query", sql).load()
Notice that the execution should be faster as only the result is transmitted
over the wire. In a similar fashion the queries can include JOINs more
efficiently then running joins on Spark or use other BigQuery features such as
subqueries,
BigQuery user defined functions,
wildcard tables,
BigQuery ML
and more.
In order to use this feature the following configurations MUST be set:
viewsEnabled
must be set to true
.
materializationDataset
must be set to a dataset where the GCP user has table
creation permission. materializationProject
is optional.
Note: As mentioned in the BigQuery documentation,
the queried tables must be in the same location as the materializationDataset
.
Also, if the tables in the SQL statement
are from projects other than the
parentProject
then use the fully qualified table name i.e.
[project].[dataset].[table]
.
Important: This feature is implemented by running the query on BigQuery and
saving the result into a temporary table, of which Spark will read the results
from. This may add additional costs on your BigQuery account.
The connector has a preliminary support for reading from
BigQuery views. Please
note there are a few caveats:
BigQuery views are not materialized by default, which means that the connector
needs to materialize them before it can read them. This process affects the
read performance, even before running any collect()
or count()
action.
The materialization process can also incur additional costs to your BigQuery
bill.
By default, the materialized views are created in the same project and
dataset. Those can be configured by the optional materializationProject
and materializationDataset
options, respectively. These options can also
be globally set by calling spark.conf.set(...)
before reading the views.
Reading from views is disabled by default. In order to enable it,
either set the viewsEnabled option when reading the specific view
(.option("viewsEnabled", "true")
) or set it globally by calling
spark.conf.set("viewsEnabled", "true")
.
As mentioned in the BigQuery documentation,
the materializationDataset
should be in same location as the view.
Writing DataFrames to BigQuery can be done using two methods: Direct and Indirect.
In this method the data is written directly to BigQuery using the
BigQuery Storage Write API. In order to enable this option, please
set the writeMethod
option to direct
, as shown below:
df.write \
.format("bigquery") \
.option("writeMethod", "direct") \
.save("dataset.table")
Writing to existing partitioned tables (date partitioned, ingestion time partitioned and range
partitioned) in APPEND save mode and OVERWRITE mode (only date and range partitioned) is fully supported by the connector and the BigQuery Storage Write
API. The use of datePartition
, partitionField
, partitionType
, partitionRangeStart
, partitionRangeEnd
, partitionRangeInterval
described below is not supported at this moment by the direct write method.
Important: Please refer to the data ingestion pricing
page regarding the BigQuery Storage Write API pricing.
Important: Please use version 0.24.2 and above for direct writes, as previous
versions have a bug that may cause a table deletion in certain cases.
In this method the data is written first to GCS, and then it is loaded it to BigQuery. A GCS bucket must be configured
to indicate the temporary data location.
df.write \
.format("bigquery") \
.option("temporaryGcsBucket","some-bucket") \
.save("dataset.table")
The data is temporarily stored using the Apache Parquet,
Apache ORC or Apache Avro formats.
The GCS bucket and the format can also be set globally using Spark's RuntimeConfig like this:
spark.conf.set("temporaryGcsBucket","some-bucket")
df.write \
.format("bigquery") \
.save("dataset.table")
When streaming a DataFrame to BigQuery, each batch is written in the same manner as a non-streaming DataFrame.
Note that a HDFS compatible
checkpoint location
(eg: path/to/HDFS/dir
or gs://checkpoint-bucket/checkpointDir
) must be specified.
df.writeStream \
.format("bigquery") \
.option("temporaryGcsBucket","some-bucket") \
.option("checkpointLocation", "some-location") \
.option("table", "dataset.table")
Important: The connector does not configure the GCS connector, in order to avoid conflict with another GCS connector, if exists. In order to use the write capabilities of the connector, please configure the GCS connector on your cluster as explained here.
The API Supports a number of options to configure the read
<style>
table#propertytable td, table th
word-break:break-word
</style>
Property
Meaning
Usage
table
The BigQuery table in the format [[project:]dataset.]table
.
It is recommended to use the path
parameter of
load()
/save()
instead. This option has been
deprecated and will be removed in a future version.
(Deprecated)
Read/Write
dataset
The dataset containing the table. This option should be used with
standard table and views, but not when loading query results.
(Optional unless omitted in table
)
Read/Write
project
The Google Cloud Project ID of the table. This option should be used with
standard table and views, but not when loading query results.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
parentProject
The Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
maxParallelism
The maximal number of partitions to split the data into. Actual number
may be less if BigQuery deems the data small enough. If there are not
enough executors to schedule a reader per partition, some partitions may
be empty.
Important: The old parameter (parallelism
) is
still supported but in deprecated mode. It will ve removed in
version 1.0 of the connector.
(Optional. Defaults to the larger of the preferredMinParallelism and 20,000).)
preferredMinParallelism
The preferred minimal number of partitions to split the data into. Actual number
may be less if BigQuery deems the data small enough. If there are not
enough executors to schedule a reader per partition, some partitions may
be empty.
(Optional. Defaults to the smallest of 3 times the application's default parallelism
and maxParallelism.)
viewsEnabled
Enables the connector to read from views and not only tables. Please read
the relevant section before activating
this option.
(Optional. Defaults to false
)
materializationProject
The project id where the materialized view is going to be created
(Optional. Defaults to view's project id)
materializationDataset
The dataset where the materialized view is going to be created. This
dataset should be in same location as the view or the queried tables.
(Optional. Defaults to view's dataset)
materializationExpirationTimeInMinutes
The expiration time of the temporary table holding the materialized data
of a view or a query, in minutes. Notice that the connector may re-use
the temporary table due to the use of local cache and in order to reduce
BigQuery computation, so very low values may cause errors. The value must
be a positive integer.
(Optional. Defaults to 1440, or 24 hours)
readDataFormat
Data Format for reading from BigQuery. Options : ARROW
, AVRO
(Optional. Defaults to ARROW
)
optimizedEmptyProjection
The connector uses an optimized empty projection (select without any
columns) logic, used for count()
execution. This logic takes
the data directly from the table metadata or performs a much efficient
`SELECT COUNT(*) WHERE...` in case there is a filter. You can cancel the
use of this logic by setting this option to false
.
(Optional, defaults to true
)
pushAllFilters
If set to true
, the connector pushes all the filters Spark can delegate
to BigQuery Storage API. This reduces amount of data that needs to be sent from
BigQuery Storage API servers to Spark clients. This option has been
deprecated and will be removed in a future version.
(Optional, defaults to true
)
(Deprecated)
bigQueryJobLabel
Can be used to add labels to the connector initiated query and load
BigQuery jobs. Multiple labels can be set.
(Optional)
bigQueryTableLabel
Can be used to add labels to the table while writing to a table. Multiple
labels can be set.
(Optional)
Write
traceApplicationName
Application name used to trace BigQuery Storage read and write sessions.
Setting the application name is required to set the trace ID on the
sessions.
(Optional)
traceJobId
Job ID used to trace BigQuery Storage read and write sessions.
(Optional, defaults to the Dataproc job ID is exists, otherwise uses
the Spark application ID)
createDisposition
Specifies whether the job is allowed to create new tables. The permitted
values are:
CREATE_IF_NEEDED
- Configures the job to create the
table if it does not exist.
CREATE_NEVER
- Configures the job to fail if the
table does not exist.
This option takes place only in case Spark has decided to write data
to the table based on the SaveMode.
(Optional. Default to CREATE_IF_NEEDED).
Write
writeMethod
Controls the method
in which the data is written to BigQuery. Available values are direct
to use the BigQuery Storage Write API and indirect
which writes the
data first to GCS and then triggers a BigQuery load operation. See more
(Optional, defaults to indirect
)
Write
writeAtLeastOnce
Guarantees that data is written to BigQuery at least once. This is a lesser
guarantee than exactly once. This is suitable for streaming scenarios
in which data is continuously being written in small batches.
(Optional. Defaults to false
)
Supported only by the `DIRECT` write method and mode is NOT `Overwrite`.
Write
temporaryGcsBucket
The GCS bucket that temporarily holds the data before it is loaded to
BigQuery. Required unless set in the Spark configuration
(spark.conf.set(...)
).
Not supported by the `DIRECT` write method.
Write
persistentGcsBucket
The GCS bucket that holds the data before it is loaded to
BigQuery. If informed, the data won't be deleted after write data
into BigQuery.
Not supported by the `DIRECT` write method.
Write
persistentGcsPath
The GCS path that holds the data before it is loaded to
BigQuery. Used only with persistentGcsBucket
.
Not supported by the `DIRECT` write method.
Write
intermediateFormat
The format of the data before it is loaded to BigQuery, values can be
either "parquet","orc" or "avro". In order to use the Avro format, the
spark-avro package must be added in runtime.
(Optional. Defaults to parquet
). On write only. Supported only for the `INDIRECT` write method.
Write
useAvroLogicalTypes
When loading from Avro (`.option("intermediateFormat", "avro")`), BigQuery uses the underlying Avro types instead of the logical types [by default](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types). Supplying this option converts Avro logical types to their corresponding BigQuery data types.
(Optional. Defaults to false
). On write only.
Write
datePartition
The date partition the data is going to be written to. Should be a date string
given in the format YYYYMMDD
. Can be used to overwrite the data of
a single partition, like this:
df.write.format("bigquery")
.option("datePartition", "20220331")
.mode("overwrite")
.save("table")
(Optional). On write only.
Can also be used with different partition types like:
HOUR: YYYYMMDDHH
MONTH: YYYYMM
YEAR: YYYY
Not supported by the `DIRECT` write method.
Write
partitionField
If this field is specified, the table is partitioned by this field.
For Time partitioning, specify together with the option `partitionType`.
For Integer-range partitioning, specify together with the 3 options: `partitionRangeStart`, `partitionRangeEnd, `partitionRangeInterval`.
The field must be a top-level TIMESTAMP or DATE field for Time partitioning, or INT64 for Integer-range partitioning. Its mode must be NULLABLE
or REQUIRED.
If the option is not set for a Time partitioned table, then the table will be partitioned by pseudo
column, referenced via either'_PARTITIONTIME' as TIMESTAMP
type, or
'_PARTITIONDATE' as DATE
type.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionExpirationMs
Number of milliseconds for which to keep the storage for partitions in the table.
The storage in a partition will have an expiration time of its partition time plus this value.
(Optional).
Not supported by the `DIRECT` write method.
Write
partitionType
Used to specify Time partitioning.
Supported types are: HOUR, DAY, MONTH, YEAR
This option is mandatory for a target table to be Time partitioned.
(Optional. Defaults to DAY if PartitionField is specified).
Not supported by the `DIRECT` write method.
Write
partitionRangeStart
,
partitionRangeEnd
,
partitionRangeInterval
Used to specify Integer-range partitioning.
These options are mandatory for a target table to be Integer-range partitioned.
All 3 options must be specified.
Not supported by the `DIRECT` write method.
Write
clusteredFields
A string of non-repeated, top level columns seperated by comma.
(Optional).
Write
allowFieldAddition
Adds the ALLOW_FIELD_ADDITION
SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true
and false
.
(Optional. Default to false
).
Supported only by the `INDIRECT` write method.
Write
allowFieldRelaxation
Adds the ALLOW_FIELD_RELAXATION
SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true
and false
.
(Optional. Default to false
).
Supported only by the `INDIRECT` write method.
Write
proxyAddress
Address of the proxy server. The proxy must be a HTTP proxy and address should be in the `host:port` format.
Can be alternatively set in the Spark configuration (spark.conf.set(...)
) or in Hadoop
Configuration (fs.gs.proxy.address
).
(Optional. Required only if connecting to GCP via proxy.)
Read/Write
proxyUsername
The userName used to connect to the proxy. Can be alternatively set in the Spark configuration
(spark.conf.set(...)
) or in Hadoop Configuration (fs.gs.proxy.username
).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
proxyPassword
The password used to connect to the proxy. Can be alternatively set in the Spark configuration
(spark.conf.set(...)
) or in Hadoop Configuration (fs.gs.proxy.password
).
(Optional. Required only if connecting to GCP via proxy with authentication.)
Read/Write
httpMaxRetry
The maximum number of retries for the low-level HTTP requests to BigQuery. Can be alternatively set in the
Spark configuration (spark.conf.set("httpMaxRetry", ...)
) or in Hadoop Configuration
(fs.gs.http.max.retry
).
(Optional. Default is 10)
Read/Write
httpConnectTimeout
The timeout in milliseconds to establish a connection with BigQuery. Can be alternatively set in the
Spark configuration (spark.conf.set("httpConnectTimeout", ...)
) or in Hadoop Configuration
(fs.gs.http.connect-timeout
).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
Read/Write
httpReadTimeout
The timeout in milliseconds to read data from an established connection. Can be alternatively set in the
Spark configuration (spark.conf.set("httpReadTimeout", ...)
) or in Hadoop Configuration
(fs.gs.http.read-timeout
).
(Optional. Default is 60000 ms. 0 for an infinite timeout, a negative number for 20000)
arrowCompressionCodec
Compression codec while reading from a BigQuery table when using Arrow format. Options :
ZSTD (Zstandard compression)
,
LZ4_FRAME (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md)
,
COMPRESSION_UNSPECIFIED
. The recommended compression codec is ZSTD
while using Java.
(Optional. Defaults to COMPRESSION_UNSPECIFIED
which means no compression will be used)
cacheExpirationTimeInMinutes
The expiration time of the in-memory cache storing query information.
To disable caching, set the value to 0.
(Optional. Defaults to 15 minutes)
enableModeCheckForSchemaFields
Checks the mode of every field in destination schema to be equal to the mode in corresponding source field schema, during DIRECT write.
Default value is true i.e., the check is done by default. If set to false the mode check is ignored.
Write
enableListInference
Indicates whether to use schema inference specifically when the mode is Parquet (https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions).
Defaults to false.
Write
bqChannelPoolSize
The (fixed) size of the gRPC channel pool created by the BigQueryReadClient.
For optimal performance, this should be set to at least the number of cores on the cluster executors.
createReadSessionTimeoutInSeconds
The timeout in seconds to create a ReadSession when reading a table.
For Extremely large table this value should be increased.
(Optional. Defaults to 600 seconds)
queryJobPriority
Priority levels set for the job while reading data from BigQuery query. The permitted values are:
BATCH
- Query is queued and started as soon as idle resources are available, usually within a few minutes. If the query hasn't started within 3 hours, its priority is changed to INTERACTIVE
.
INTERACTIVE
- Query is executed as soon as possible and count towards the concurrent rate limit and the daily rate limit.
For WRITE, this option will be effective when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement.
(Optional. Defaults to INTERACTIVE
)
Read/Write
destinationTableKmsKeyName
Describes the Cloud KMS encryption key that will be used to protect destination BigQuery
table. The BigQuery Service Account associated with your project requires access to this
encryption key. for further Information about using CMEK with BigQuery see
[here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id).
Notice: The table will be encrypted by the key only if it created by the
connector. A pre-existing unencrypted table won't be encrypted just by setting this option.
(Optional)
Write
allowMapTypeConversion
Boolean config to disable conversion from BigQuery records to Spark MapType
when the record has two subfields with field names as key
and value
.
Default value is true
which allows the conversion.
(Optional)
spark.sql.sources.partitionOverwriteMode
Config to specify the overwrite mode on write when the table is range/time partitioned.
Currently supportd two modes : STATIC
and DYNAMIC
. In STATIC
mode,
the entire table is overwritten. In DYNAMIC
mode, the data is overwritten by partitions of the existing table.
The default value is STATIC
.
(Optional)
Write
enableReadSessionCaching
Boolean config to disable read session caching. Caches BigQuery read sessions to allow for faster Spark query planning.
Default value is true
.
(Optional)
readSessionCacheDurationMins
Config to set the read session caching duration in minutes. Only works if enableReadSessionCaching
is true
(default).
Allows specifying the duration to cache read sessions for. Maximum allowed value is 300
.
Default value is 5
.
(Optional)
bigQueryJobTimeoutInMinutes
Config to set the BigQuery job timeout in minutes.
Default value is 360
minutes.
(Optional)
Read/Write
Options can also be set outside of the code, using the --conf
parameter of spark-submit
or --properties
parameter
of the gcloud dataproc submit spark
. In order to use this, prepend the prefix spark.datasource.bigquery.
to any of
the options, for example spark.conf.set("temporaryGcsBucket", "some-bucket")
can also be set as
--conf spark.datasource.bigquery.temporaryGcsBucket=some-bucket
.
With the exception of DATETIME
and TIME
all BigQuery data types directed map into the corresponding Spark SQL data type. Here are all of the mappings:
BigQuery Standard SQL Data Type
Spark SQL
Data Type
Notes
BooleanType
Spark has no DATETIME type.
Spark string can be written to an existing BQ DATETIME column provided it is in the format for BQ DATETIME literals.
* For Spark 3.4+, BQ DATETIME is read as Spark's TimestampNTZ type i.e. java LocalDateTime
LongType
, StringType
*
Spark has no TIME type. The generated longs, which indicate microseconds since midnight can be safely cast to TimestampType, but this causes the date to be inferred as the current day. Thus times are left as longs and user can cast if they like.
When casting to Timestamp TIME have the same TimeZone issues as DATETIME
* Spark string can be written to an existing BQ TIME column provided it is in the format for BQ TIME literals.
StringType
Spark has no JSON type. The values are read as String. In order to write JSON back to BigQuery, the following conditions are REQUIRED:
Use the INDIRECT
write method
Use the AVRO
intermediate format
The DataFrame field MUST be of type String
and has an entry of sqlType=JSON in its metadata
MapType
BigQuery has no MAP type, therefore similar to other conversions like Apache Avro and BigQuery Load jobs, the connector converts a Spark Map to a REPEATED STRUCT<key,value>.
This means that while writing and reading of maps is available, running a SQL on BigQuery that uses map semantics is not supported.
To refer to the map's values using BigQuery SQL, please check the BigQuery documentation.
Due to these incompatibilities, a few restrictions apply:
Keys can be Strings only
Values can be simple types (not structs)
For INDIRECT write, use the AVRO
intermediate format. DIRECT write is supported as well
The Spark ML Vector and
Matrix are supported,
including their dense and sparse versions. The data is saved as a BigQuery RECORD. Notice that a suffix is added to
the field's description which includes the spark type of the field.
In order to write those types to BigQuery, use the ORC or Avro intermediate format, and have them as column of the
Row (i.e. not a field in a struct).
BigQuery's BigNumeric has a precision of 76.76 (the 77th digit is partial) and scale of 38. Since
this precision and scale is beyond spark's DecimalType (38 scale and 38 precision) support, it means
that BigNumeric fields with precision larger than 38 cannot be used. Once this Spark limitation will
be updated the connector will be updated accordingly.
The Spark Decimal/BigQuery Numeric conversion tries to preserve the parameterization of the type, i.e
NUMERIC(10,2)
will be converted to Decimal(10,2)
and vice versa. Notice however that there are
cases where the parameters are lost.
This means that the parameters will be reverted to the defaults - NUMERIC (38,9) and BIGNUMERIC(76,38).
This means that at the moment, BigNumeric read is supported only from a standard table, but not from
BigQuery view or when reading data from a BigQuery query.
The connector automatically computes column and pushdown filters the DataFrame's SELECT
statement e.g.
spark.read.bigquery("bigquery-public-data:samples.shakespeare")
.select("word")
.where("word = 'Hamlet' or word = 'Claudius'")
.collect()
filters to the column word
and pushed down the predicate filter word = 'hamlet' or word = 'Claudius'
.
If you do not wish to make multiple read requests to BigQuery, you can cache the DataFrame before filtering e.g.:
val cachedDF = spark.read.bigquery("bigquery-public-data:samples.shakespeare").cache()
val rows = cachedDF.select("word")
.where("word = 'Hamlet'")
.collect()
// All of the table was cached and this doesn't require an API call
val otherRows = cachedDF.select("word_count")
.where("word = 'Romeo'")
.collect()
You can also manually specify the filter
option, which will override automatic pushdown and Spark will do the rest of the filtering in the client.
The pseudo columns _PARTITIONDATE and _PARTITIONTIME are not part of the table schema. Therefore in order to query by the partitions of partitioned tables do not use the where() method shown above. Instead, add a filter option in the following manner:
val df = spark.read.format("bigquery")
.option("filter", "_PARTITIONDATE > '2019-01-01'")
.load(TABLE)
By default the connector creates one partition per 400MB in the table being read (before filtering). This should roughly correspond to the maximum number of readers supported by the BigQuery Storage API.
This can be configured explicitly with the maxParallelism
property. BigQuery may limit the number of partitions based on server constraints.
In order to support tracking the usage of BigQuery resources the connectors
offers the following options to tag BigQuery resources:
The connector can launch BigQuery load and query jobs. Adding labels to the jobs
is done in the following manner:
spark.conf.set("bigQueryJobLabel.cost_center", "analytics")
spark.conf.set("bigQueryJobLabel.usage", "nightly_etl")
This will create labels cost_center
=analytics
and usage
=nightly_etl
.
Used to annotate the read and write sessions. The trace ID is of the format
Spark:ApplicationName:JobID
. This is an opt-in option, and to use it the user
need to set the traceApplicationName
property. JobID is auto generated by the
Dataproc job ID, with a fallback to the Spark application ID (such as
application_1648082975639_0001
). The Job ID can be overridden by setting the
traceJobId
option. Notice that the total length of the trace ID cannot be over
256 characters.
The connector can be used in Jupyter notebooks even if
it is not installed on the Spark cluster. It can be added as an external jar in
using the following code:
Python:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.38.0") \
.getOrCreate()
df = spark.read.format("bigquery") \
.load("dataset.table")
Scala:
val spark = SparkSession.builder
.config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.38.0")
.getOrCreate()
val df = spark.read.format("bigquery")
.load("dataset.table")
In case Spark cluster is using Scala 2.12 (it's optional for Spark 2.4.x,
mandatory in 3.0.x), then the relevant package is
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.38.0. In
order to know which Scala version is used, please run the following code:
Python:
spark.sparkContext._jvm.scala.util.Properties.versionString()
Scala:
scala.util.Properties.versionString
Unless you wish to use the implicit Scala API spark.read.bigquery("TABLE_ID")
, there is no need to compile against the connector.
To include the connector in your project:
<dependency>
<groupId>com.google.cloud.spark</groupId>
<artifactId>spark-bigquery-with-dependencies_${scala.version}</artifactId>
<version>0.38.0</version>
</dependency>
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.38.0"
Spark populates a lot of metrics which can be found by the end user in the spark history page. But all these metrics are spark related which are implicitly collected without any change from the connector.
But there are few metrics which are populated from the BigQuery and currently are visible in the application logs which can be read in the driver/executor logs.
From Spark 3.2 onwards, spark has provided the API to expose custom metrics in the spark UI page https://spark.apache.org/docs/3.2.0/api/java/org/apache/spark/sql/connector/metric/CustomMetric.html
Currently, using this API, connector exposes the following bigquery metrics during read
<style>
table#metricstable td, table th
word-break:break-word
</style>
Metric Name
Description
bytes read
number of BigQuery bytes read
rows read
number of BigQuery rows read
scan time
the amount of time spent between read rows response requested to obtained across all the executors, in milliseconds.
parse time
the amount of time spent for parsing the rows read across all the executors, in milliseconds.
spark time
the amount of time spent in spark to process the queries (i.e., apart from scanning and parsing), across all the executors, in milliseconds.
Note: To use the metrics in the Spark UI page, you need to make sure the spark-bigquery-metrics-0.38.0.jar
is the class path before starting the history-server and the connector version is spark-3.2
or above.
See the BigQuery pricing documentation.
You can manually set the number of partitions with the maxParallelism
property. BigQuery may provide fewer partitions than you ask for. See Configuring Partitioning.
You can also always repartition after reading in Spark.
If there are too many partitions the CreateWriteStream or Throughput quotas
may be exceeded. This occurs because while the data within each partition is processed serially, independent
partitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximum
sustained throughput you should file a quota increase request. However, you can also manually reduce the number of
partitions being written by calling coalesce
on the DataFrame to mitigate this problem.
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
A rule of thumb is to have a single partition handle at least 1GB of data.
Also note that a job running with the writeAtLeastOnce
property turned on will not encounter CreateWriteStream
quota errors.
The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple
options to provide it:
The default is to load the JSON key from the GOOGLE_APPLICATION_CREDENTIALS
environment variable, as described
here.
In case the environment variable cannot be changed, the credentials file can be configured as
as a spark option. The file should reside on the same path on all the nodes of the cluster.
// Globally
spark.conf.set("credentialsFile", "</path/to/key/file>")
// Per read/Write
spark.read.format("bigquery").option("credentialsFile", "</path/to/key/file>")
Credentials can also be provided explicitly, either as a parameter or from Spark runtime configuration.
They should be passed in as a base64-encoded string directly.
// Globally
spark.conf.set("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
// Per read/Write
spark.read.format("bigquery").option("credentials", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
In cases where the user has an internal service providing the Google AccessToken, a custom implementation
can be done, creating only the AccessToken and providing its TTL. Token refresh will re-generate a new token. In order
to use this, implement the
com.google.cloud.bigquery.connector.common.AccessTokenProvider
interface. The fully qualified class name of the implementation should be provided in the gcpAccessTokenProvider
option. AccessTokenProvider
must be implemented in Java or other JVM language such as Scala or Kotlin. It must
either have a no-arg constructor or a constructor accepting a single java.util.String
argument. This configuration
parameter can be supplied using the gcpAccessTokenProviderConfig
option. If this is not provided then the no-arg
constructor wil be called. The jar containing the implementation should be on the cluster's classpath.
// Globally
spark.conf.set("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessTokenProvider", "com.example.ExampleAccessTokenProvider")
Service account impersonation can be configured for a specific username and a group name, or
for all users by default using below properties:
gcpImpersonationServiceAccountForUser_<USER_NAME>
(not set by default)
The service account impersonation for a specific user.
gcpImpersonationServiceAccountForGroup_<GROUP_NAME>
(not set by default)
The service account impersonation for a specific group.
gcpImpersonationServiceAccount
(not set by default)
Default service account impersonation for all users.
If any of the above properties are set then the service account specified will be impersonated by
generating a short-lived credentials when accessing BigQuery.
If more than one property is set then the service account associated with the username will take
precedence over the service account associated with the group name for a matching user and group,
which in turn will take precedence over default service account impersonation.
For a simpler application, where access token refresh is not required, another alternative is to pass the access token
as the gcpAccessToken
configuration option. You can get the access token by running
gcloud auth application-default print-access-token
.
// Globally
spark.conf.set("gcpAccessToken", "<access-token>")
// Per read/Write
spark.read.format("bigquery").option("gcpAccessToken", "<acccess-token>")
Important: The CredentialsProvider
and AccessTokenProvider
need to be implemented in Java or
other JVM language such as Scala or Kotlin. The jar containing the implementation should be on the cluster's classpath.
Notice: Only one of the above options should be provided.
To connect to a forward proxy and to authenticate the user credentials, configure the following options.
proxyAddress
: Address of the proxy server. The proxy must be an HTTP proxy and address should be in the host:port
format.
proxyUsername
: The userName used to connect to the proxy.
proxyPassword
: The password used to connect to the proxy.
val df = spark.read.format("bigquery")
.option("proxyAddress", "http://my-proxy:1234")
.option("proxyUsername", "my-username")
.option("proxyPassword", "my-password")
.load("some-table")
The same proxy parameters can also be set globally using Spark's RuntimeConfig like this:
spark.conf.set("proxyAddress", "http://my-proxy:1234")
spark.conf.set("proxyUsername", "my-username")
spark.conf.set("proxyPassword", "my-password")
val df = spark.read.format("bigquery")
.load("some-table")
You can set the following in the hadoop configuration as well.
fs.gs.proxy.address
(similar to "proxyAddress"), fs.gs.proxy.username
(similar to "proxyUsername") and
fs.gs.proxy.password
(similar to "proxyPassword").
If the same parameter is set at multiple places the order of priority is as follows:
option("key", "value") > spark.conf > hadoop configuration
BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables.
Topics
bigquery
spark
google-cloud
google-bigquery
bigquery-storage-api
google-cloud-dataproc