AWS Glue for Spark 中适用于 ETL 的连接类型和选项
在 AWS Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用
connectionType
参数指定连接类型。它们使用
connectionOptions
或
options
参数指定连接选项。
connectionType
参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的
connectionOptions
(或
options
)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。
有关展示如何设置和使用连接选项的代码示例,请参阅每种连接类型的主页。
自定义和 AWS Marketplace connectionType 值
这些功能包括:
"connectionType": "marketplace.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。
"connectionType": "marketplace.spark"
:指定与 Apache Spark 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。
"connectionType": "marketplace.jdbc"
:指定与 JDBC 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。
"connectionType": "custom.athena"
:指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。
"connectionType": "custom.spark"
:指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。
"connectionType": "custom.jdbc"
:指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。
适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项
connectionName
– 字符串,必需,与连接器关联的连接的名称。
url
– 字符串,必需,用于建立与数据源的连接且带占位符(
$
{
}
)的 JDBC URL。占位符
$
{
secretKey}
替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。
secretId
或
user/password
– 字符串,必需,用于检索 URL 的凭证。
dbTable
或
query
– 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定
dbTable
或
query
,但不能同时指定两者。
partitionColumn
– 字符串,可选,用于分区的整数列的名称。此选项仅在包含
lowerBound
、
upperBound
和
numPartitions
时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的
JDBC 转换到其他数据库
lowerBound
和
upperBound
值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。
注意
使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用分区列的查询结尾附加
WHERE
子句,以测试查询。
如果您的查询格式为
SELECT col1 FROM table1 WHERE col2=val"
,则通过
AND
和使用分区列的表达式扩展
WHERE
子句,以测试查询。
lowerBound
– 整数,可选,用于确定分区步长的最小
partitionColumn
值。
upperBound
– 整数,可选,用于确定分区步长的最大
partitionColumn
值。
numPartitions
– 整数,可选,分区数。此值以及
lowerBound
(包含)和
upperBound
(排除)为用于拆分
partitionColumn
而生成的
WHERE
子句表达式构成分区步长。
重要
请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。
filterPredicate
– 字符串,可选,用于筛选源数据的额外条件子句。例如:
BillingCity='Mountain View'
使用
查询
(而不是
表
名称)时,您应验证查询是否适用于指定的
filterPredicate
。例如:
如果您的查询格式为
"SELECT col1 FROM table1"
,则在使用筛选条件谓词的查询结尾附加
WHERE
子句,以测试查询。
如果您的查询格式为
"SELECT col1 FROM table1 WHERE col2=val"
,则通过
AND
和使用筛选条件谓词的表达式扩展
WHERE
子句,以测试查询。
dataTypeMapping
– 目录,可选,用于构建从
JDBC
数据类型到
Glue
数据类型的映射的自定义数据类型映射。例如,选项
"dataTypeMapping":
{
"FLOAT":"STRING"}
会通过调用驱动程序的
ResultSet.getString()
方法,将 JDBC 类型
FLOAT
的数据字段映射到 Java
String
类型,并将其用于构建 AWS Glue 记录。
ResultSet
对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。
目前受支持的 AWS Glue 数据类型包括:
DOUBLE -> DOUBLE
如果将自定义数据类型映射与选项
dataTypeMapping
结合使用,则可以覆盖默认数据类型映射。只有
dataTypeMapping
选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 AWS Glue
STRING
数据类型。
以下 Python 代码示例演示了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()适用于类型 custom.athena 或 marketplace.athena 的连接选项
className
– 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如"com.amazonaws.athena.connectors"
)的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。
tableName
– 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称all_log_streams
,这意味着返回的动态数据框将包含日志组中所有日志流的数据。
schemaName
– 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output
。
connectionName
– 字符串,必需,与连接器关联的连接的名称。有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述
文件。 以下 Python 代码示例演示了如何从使用 AWS Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()适用于类型 custom.spark 或 marketplace.spark 的连接选项
connectionName
– 字符串,必需,与连接器关联的连接的名称。其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀
es
开头,正如适用于 Apache Hadoop 的 Elasticsearch文档中所述。Spark 与 Snowflake 的连接使用 sfUser
和sfPassword
等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。 以下 Python 代码示例演示了如何从使用
marketplace.spark
连接的 OpenSearch 数据存储读取数据。import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()本节中的选项以
connection_options
形式提供,但不是专门适用于一个连接器。配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅使用作业书签。
connectionType
|
dynamodb | Amazon DynamoDB 数据库 | kinesis | Amazon Kinesis Data Streams | Amazon S3 | documentdb | Amazon DocumentDB (with MongoDB compatibility) 数据库 | opensearch | Amazon OpenSearch Service 。 | redshift |
Amazon Redshift
|
kafka |
Kafka
|
azurecosmos | Azure Cosmos for NoSQL。 | azuresql | Azure SQL。 | bigquery | Google BigQuery。 | mongodb |
MongoDB
|
sqlserver | Microsoft SQL Server 数据库(请参阅 JDBC 连接 ) | mysql |
MySQL
|
oracle |
Oracle
|
postgresql |
PostgreSQL
|
saphana | SAP HANA。 | snowflake |
Snowflake
|
teradata | Teradata Vantage。 | vertica | Vertica。 | custom.* | Spark、Athena 或 JDBC 数据存储(请参阅 自定义和 AWS Marketplace connectionType 值 ) | marketplace.* | Spark、Athena 或 JDBC 数据存储(请参阅 自定义和 AWS Marketplace connectionType 值 ) |
---|