添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
AWS Glue for Spark 中适用于 ETL 的连接类型和选项 - AWS Glue

AWS Glue for Spark 中适用于 ETL 的连接类型和选项

在 AWS Glue for Spark 中,各种 PysPark 和 Scala 方法和转换使用 connectionType 参数指定连接类型。它们使用 connectionOptions options 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions (或 options )参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅每种连接类型的主页。

自定义和 AWS Marketplace connectionType 值

这些功能包括:

"connectionType": "marketplace.athena" :指定与 Amazon Athena 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "marketplace.spark" :指定与 Apache Spark 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "marketplace.jdbc" :指定与 JDBC 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "custom.athena" :指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

"connectionType": "custom.spark" :指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

"connectionType": "custom.jdbc" :指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项

connectionName – 字符串,必需,与连接器关联的连接的名称。

url – 字符串,必需,用于建立与数据源的连接且带占位符( $ { } )的 JDBC URL。占位符 $ { secretKey} 替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

secretId user/password – 字符串,必需,用于检索 URL 的凭证。

dbTable query – 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定 dbTable query ,但不能同时指定两者。

partitionColumn – 字符串,可选,用于分区的整数列的名称。此选项仅在包含 lowerBound upperBound numPartitions 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL、DataFrame 和 Dataset 指南》中的 JDBC 转换到其他数据库

lowerBound upperBound 值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。

注意

使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:

如果您的查询格式为 "SELECT col1 FROM table1" ,则在使用分区列的查询结尾附加 WHERE 子句,以测试查询。

如果您的查询格式为 SELECT col1 FROM table1 WHERE col2=val" ,则通过 AND 和使用分区列的表达式扩展 WHERE 子句,以测试查询。

lowerBound – 整数,可选,用于确定分区步长的最小 partitionColumn 值。

upperBound – 整数,可选,用于确定分区步长的最大 partitionColumn 值。

numPartitions – 整数,可选,分区数。此值以及 lowerBound (包含)和 upperBound (排除)为用于拆分 partitionColumn 而生成的 WHERE 子句表达式构成分区步长。

重要

请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

filterPredicate – 字符串,可选,用于筛选源数据的额外条件子句。例如:

BillingCity='Mountain View'

使用 查询 (而不是 名称)时,您应验证查询是否适用于指定的 filterPredicate 。例如:

如果您的查询格式为 "SELECT col1 FROM table1" ,则在使用筛选条件谓词的查询结尾附加 WHERE 子句,以测试查询。

如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val" ,则通过 AND 和使用筛选条件谓词的表达式扩展 WHERE 子句,以测试查询。

dataTypeMapping – 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项 "dataTypeMapping": { "FLOAT":"STRING"} 会通过调用驱动程序的 ResultSet.getString() 方法,将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,并将其用于构建 AWS Glue 记录。 ResultSet 对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。

目前受支持的 AWS Glue 数据类型包括:

DOUBLE -> DOUBLE

如果将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 AWS Glue STRING 数据类型。

以下 Python 代码示例演示了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.athena 或 marketplace.athena 的连接选项

className – 字符串,必需,驱动程序类名称。当您使用 Athena-CloudWatch 连接器时,此参数值是类名称(例如 "com.amazonaws.athena.connectors")的前缀。Athena-CloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

tableName – 字符串,必需,要读取的 CloudWatch 日志流的名称。此代码段使用特别视图名称 all_log_streams,这意味着返回的动态数据框将包含日志组中所有日志流的数据。

schemaName – 字符串,必需,要从中读取数据的 CloudWatch 日志流的名称。例如,/aws-glue/jobs/output

connectionName – 字符串,必需,与连接器关联的连接的名称。

有关此连接器的其他选项,请参阅 GitHub 上的 Amazon Athena CloudWatch 连接器自述文件。

以下 Python 代码示例演示了如何从使用 AWS Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

适用于类型 custom.spark 或 marketplace.spark 的连接选项

connectionName – 字符串,必需,与连接器关联的连接的名称。

其他选项取决于数据存储。例如,OpenSearch 配置选项以前缀 es 开头,正如适用于 Apache Hadoop 的 Elasticsearch 文档中所述。Spark 与 Snowflake 的连接使用 sfUsersfPassword 等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。

以下 Python 代码示例演示了如何从使用 marketplace.spark 连接的 OpenSearch 数据存储读取数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

本节中的选项以 connection_options 形式提供,但不是专门适用于一个连接器。

配置书签时通常使用以下参数。它们可能适用于 Amazon S3 或 JDBC 工作流程。有关更多信息,请参阅使用作业书签

connectionType dynamodb Amazon DynamoDB 数据库 kinesis Amazon Kinesis Data Streams Amazon S3 documentdb Amazon DocumentDB (with MongoDB compatibility) 数据库 opensearch Amazon OpenSearch Service redshift Amazon Redshift 数据库 kafka Kafka Amazon Managed Streaming for Apache Kafka azurecosmos Azure Cosmos for NoSQL。 azuresql Azure SQL。 bigquery Google BigQuery。 mongodb MongoDB 数据库,包括 MongoDB Atlas。 sqlserver Microsoft SQL Server 数据库(请参阅 JDBC 连接 mysql MySQL 数据库(请参阅 JDBC 连接 oracle Oracle 数据库(请参阅 JDBC 连接 postgresql PostgreSQL 数据库(请参阅 JDBC 连接 saphana SAP HANA。 snowflake Snowflake 数据湖 teradata Teradata Vantage。 vertica Vertica。 custom.* Spark、Athena 或 JDBC 数据存储(请参阅 自定义和 AWS Marketplace connectionType 值 marketplace.* Spark、Athena 或 JDBC 数据存储(请参阅 自定义和 AWS Marketplace connectionType 值