适用于:
Databricks SQL
Databricks Runtime 13.1 及更高版本
从 Apache Kafka 群集读取数据,并以表格形式返回数据。
可从一个或多个 Kafka 主题中读取数据。 支持批处理查询和流式处理引入。
read_kafka([option_key => option_value ] [, ...])
此函数需要命名参数调用。
option_key
:要配置的选项的名称。 对于包含点 (.
) 的选项必须使用反引号。
option_value
:用于设置选项的常数表达式。 接受文本和标量函数。
记录从使用以下架构的 Apache Kafka 群集中读取的内容:
key BINARY
:Kafka 记录的密钥。
value BINARY NOT NULL
:Kafka 记录的值。
topic STRING NOT NULL
:从其中读取记录的 Kafka 主题的名称。
partition INT NOT NULL
:从其中读取记录的 Kafka 分区的 ID。
offset BIGINT NOT NULL
:Kafka“TopicPartition
”中记录的偏移量。
timestamp TIMESTAMP NOT NULL
:记录的时间戳值。 “timestampType
”列定义此时间戳对应的内容。
timestampType INTEGER NOT NULL
:“timestamp
”列中指定的时间戳的类型。
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
:作为记录一部分而提供的标头值(如已启用)。
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events',
startingOffsets => 'earliest',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
可以在 Apache Spark 文档中查看详细的选项列表。
提供以下用于连接到 Kafka 群集的选项。
assign
类型:String
一个 JSON 字符串,其中包含供使用的特定主题分区。 例如,对于 '{"topicA":[0,1],"topicB":[2,4]}'
,将会从 topicA 的第 0 个和第 1 个分区使用数据。
默认值: 无
subscribe
类型:String
要从中读取内容的 Kafka 主题的逗号分隔列表。
默认值: 无
subscribePattern
类型:String
与要订阅的主题匹配的正则表达式。
默认值: 无
read_kafka
可以在批处理查询和流式处理查询中使用。 以下选项指定它们适用于的查询类型。
endingOffsets
类型:String
查询类型:仅限批处理
进行批处理查询前要读取的偏移,要么 "latest"
来指定最新记录,要么 JSON 字符串指定每个 TopicPartition 的结束偏移。 在 JSON 中,可使用 -1
作为偏移量来表示最新。 不允许将 -2
(最早)作为偏移。
默认值:30"latest"
endingOffsetsByTimestamp
类型:String
查询类型:仅限批处理
一个 JSON 字符串,指定要为每个 TopicPartition 读取到的结束时间戳。 需要以自 1970-01-01 00:00:00 UTC
起的时间戳的长值提供时间戳(以毫秒为单位),例如
1686444353000
。 请参阅下方备注,详细了解时间戳的行为方式。
endingOffsetsByTimestamp
优先于 endingOffsets
。
默认值: 无
endingTimestamp
类型:String
查询类型:仅限批处理
时间戳的字符串值(以毫秒为单位),起始时间:
1970-01-01 00:00:00 UTC
,例如 "1686444353000"
。 如果 Kafka 未返回匹配的偏移,则偏移将设置为最新的。 请参阅下方备注,详细了解时间戳的行为方式。 请注意:endingTimestamp
优先于 endingOffsetsByTimestamp
以及
endingOffsets
。
默认值: 无
includeHeaders
类型:Boolean
查询类型:流式处理和批处理
是否在行中包含 Kafka 标头。
默认值:30false
kafka.<consumer_option>
类型:String
查询类型:流式处理和批处理
可使用“kafka.
”前缀传入的任何 Kafka 使用者特定选项。 提供时,需要使用反引号将这些选项括起来,否则将会收到分析程序错误。 可在 Kafka 文档中找到这些选项。
注意:不应使用此函数设置以下选项:
key.deserializer
、value.deserializer
、bootstrap.servers
、group.id
默认值: 无
maxOffsetsPerTrigger
类型:Long
查询类型:仅限流式处理
每个触发器间隔处理的最大偏移或行数的速率限制。 指定的编移总数将按比例分配到 TopicPartitions 中。
默认值: 无
startingOffsets
类型:String
查询类型:流式处理和批处理
启动查询时的起点,可以是 "earliest"
(从最早偏移开始),也可以是 "latest"
(从最新的偏移开始),或是指定每个 TopicPartition 的起始偏移的 JSON 字符串。 在 JSON 中,可使用 -2
作为偏移量来表示最早,使用 -1
表示最新。
注意:对于批处理查询,不允许使用最新(无论是隐式还是在 JSON 中使用 -1)。 仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。
默认值:流式处理的为 "latest"
,批量处理的为 "earliest"
startingOffsetsByTimestamp
类型:String
查询类型:流式处理和批处理
一个 JSON 字符串,指定每个 TopicPartition 的起始时间戳。 需要以自 1970-01-01 00:00:00 UTC
起的时间戳的长值提供时间戳(以毫秒为单位),例如 1686444353000
。 请参阅下方备注,详细了解时间戳的行为方式。 如果 Kafka 不返回匹配的偏移,则行为将会遵循选项 startingOffsetsByTimestampStrategy
的值。
startingOffsetsByTimestamp
优先于 startingOffsets
。
注意:仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。
默认值: 无
startingOffsetsByTimestampStrategy
类型:String
查询类型:流式处理和批处理
当时间戳(全局或每个分区)指定的起始偏移与返回的偏移 Kafka 不匹配时,使用此策略。 可以使用以下策略:
* "error"
:使查询失败
* "latest"
:为这些分区分配最新偏移量,以便 Spark 可以在以后的微批处理中从这些分区读取较新的记录。
默认值:30"error"
startingTimestamp
类型:String
查询类型:流式处理和批处理
时间戳的字符串值(以毫秒为单位),起始时间:
1970-01-01 00:00:00 UTC
,例如 "1686444353000"
。 请参阅下方备注,详细了解时间戳的行为方式。 如果 Kafka 不返回匹配的偏移,则行为将会遵循选项 startingOffsetsByTimestampStrategy
的值。
startingTimestamp
优先于 startingOffsetsByTimestamp
和 startingOffsets
。
注意:仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。
默认值: 无
每个分区返回的偏移是其时间戳大于或等于相应分区中给定时间戳的最早的偏移。 如果 Kafka 不返回匹配的偏移,则各选项的行为会有所不同(请查看每个选项的说明)。
Spark 只是将时间戳信息传递给 KafkaConsumer.offsetsForTimes
,不会解释或说明有关该值的原因。 有关 KafkaConsumer.offsetsForTimes
的详细信息,请参阅此文档。 此外,此处时间戳的含义可能因 Kafka 配置 (log.message.timestamp.type
) 而异。 有关详细信息,请参阅 Apache Kafka 文档。
CREATE STREAMING TABLE
read_files 表值函数
read_pubsub 流式处理表值函数