read_kafka 表值函数

适用于:check marked yes Databricks SQL check marked yes Databricks Runtime 13.1 及更高版本

从 Apache Kafka 群集读取数据,并以表格形式返回数据。

可从一个或多个 Kafka 主题中读取数据。 支持批处理查询和流式处理引入。

语法

read_kafka([option_key => option_value ] [, ...])

参数

此函数需要命名参数调用

  • option_key:要配置的选项的名称。 对于包含点 (.) 的选项必须使用反引号。
  • option_value:用于设置选项的常数表达式。 接受文本和标量函数。

返回

记录从使用以下架构的 Apache Kafka 群集中读取的内容:

  • key BINARY:Kafka 记录的密钥。
  • value BINARY NOT NULL:Kafka 记录的值。
  • topic STRING NOT NULL:从其中读取记录的 Kafka 主题的名称。
  • partition INT NOT NULL:从其中读取记录的 Kafka 分区的 ID。
  • offset BIGINT NOT NULL:Kafka“TopicPartition”中记录的偏移量。
  • timestamp TIMESTAMP NOT NULL:记录的时间戳值。 “timestampType”列定义此时间戳对应的内容。
  • timestampType INTEGER NOT NULL:“timestamp”列中指定的时间戳的类型。
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>:作为记录一部分而提供的标头值(如已启用)。

示例

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

选项

可以在 Apache Spark 文档中查看详细的选项列表。

必需选项

提供以下用于连接到 Kafka 群集的选项。

选项
bootstrapServers

类型:String

以逗号分隔的主机/商品对列表,均指向 Kafka 群集。

默认值: 无

仅提供以下其中一个选项来配置从哪个 Kafka 主题拉取数据。

选项
assign

类型:String

一个 JSON 字符串,其中包含供使用的特定主题分区。 例如,对于 '{"topicA":[0,1],"topicB":[2,4]}',将会使用 topicA 的第 0 个和第 1 个分区。

默认值: 无
subscribe

类型:String

要从中读取内容的 Kafka 主题的逗号分隔列表。

默认值: 无
subscribePattern

类型:String

与要订阅的主题匹配的正则表达式。

默认值: 无

杂项选项

read_kafka 可以在批处理查询和流式处理查询中使用。 以下选项指定它们适用于的查询类型。

选项
endingOffsets

类型:String 查询类型:仅限批处理

进行批处理查询前要读取的偏移,要么 "latest" 来指定最新记录,要么 JSON 字符串指定每个 TopicPartition 的结束偏移。 在 JSON 中,可使用 -1 作为偏移量来表示最新。 不允许将 -2(最早)作为偏移。

默认值:30"latest"
endingOffsetsByTimestamp

类型:String 查询类型:仅限批处理

一个 JSON 字符串,指定要为每个 TopicPartition 读取到的结束时间戳。 需要以自 1970-01-01 00:00:00 UTC 起的时间戳的长值提供时间戳(以毫秒为单位),例如
1686444353000。 请参阅下方备注,详细了解时间戳的行为方式。
endingOffsetsByTimestamp 优先于 endingOffsets

默认值: 无
endingTimestamp

类型:String 查询类型:仅限批处理

时间戳的字符串值(以毫秒为单位),起始时间:
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 如果 Kafka 未返回匹配的偏移,则偏移将设置为最新的。 请参阅下方备注,详细了解时间戳的行为方式。 请注意:endingTimestamp 优先于 endingOffsetsByTimestamp 以及
endingOffsets

默认值: 无
includeHeaders

类型:Boolean 查询类型:流式处理和批处理

是否在行中包含 Kafka 标头。

默认值:30false
kafka.<consumer_option>

类型:String 查询类型:流式处理和批处理

可使用“kafka.”前缀传入的任何 Kafka 使用者特定选项。 提供时,需要使用反引号将这些选项括起来,否则将会收到分析程序错误。 可在 Kafka 文档中找到这些选项。

注意:不应使用此函数设置以下选项:
key.deserializervalue.deserializerbootstrap.serversgroup.id

默认值: 无
maxOffsetsPerTrigger

类型:Long 查询类型:仅限流式处理

每个触发器间隔处理的最大偏移或行数的速率限制。 指定的编移总数将按比例分配到 TopicPartitions 中。

默认值: 无
startingOffsets

类型:String 查询类型:流式处理和批处理

启动查询时的起点,可以是 "earliest"(从最早偏移开始),也可以是 "latest"(从最新的偏移开始),或是指定每个 TopicPartition 的起始偏移的 JSON 字符串。 在 JSON 中,可使用 -2 作为偏移量来表示最早,使用 -1 表示最新。

注意:对于批处理查询,不允许使用最新(无论是隐式还是在 JSON 中使用 -1)。 仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。

默认值:流式处理的为 "latest",批量处理的为 "earliest"
startingOffsetsByTimestamp

类型:String 查询类型:流式处理和批处理

一个 JSON 字符串,指定每个 TopicPartition 的起始时间戳。 需要以自 1970-01-01 00:00:00 UTC 起的时间戳的长值提供时间戳(以毫秒为单位),例如 1686444353000。 请参阅下方备注,详细了解时间戳的行为方式。 如果 Kafka 不返回匹配的偏移,则行为将会遵循选项“startingOffsetsByTimestampStrategy”的值。
startingOffsetsByTimestamp 优先于 startingOffsets

注意:仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。

默认值: 无
startingOffsetsByTimestampStrategy

类型:String 查询类型:流式处理和批处理

当时间戳(全局或每个分区)指定的起始偏移与返回的偏移 Kafka 不匹配时,使用此策略。 可以使用以下策略:

* "error":使查询失败
* "latest":为这些分区分配最新偏移量,以便 Spark 可以在以后的微批处理中从这些分区读取较新的记录。

默认值:30"error"
startingTimestamp

类型:String 查询类型:流式处理和批处理

时间戳的字符串值(以毫秒为单位),起始时间:
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 请参阅下方备注,详细了解时间戳的行为方式。 如果 Kafka 不返回匹配的偏移,则行为将会遵循选项“startingOffsetsByTimestampStrategy”的值。
startingTimestamp 优先于 startingOffsetsByTimestampstartingOffsets

注意:仅在启动新的查询时,才能采用流式处理查询。 重启后的流式处理查询会从查询检查点中定义的偏移继续。 查询期间新发现的分区将最早启动。

默认值: 无

注意

每个分区返回的偏移是其时间戳大于或等于相应分区中给定时间戳的最早的偏移。 如果 Kafka 不返回匹配的偏移(请查看每个选项的说明),则各选项的行为方式有所不同。

Spark 只是将时间戳信息传递给 KafkaConsumer.offsetsForTimes,不会解释或解释有关该值的原因。 有关 KafkaConsumer.offsetsForTimes 的详细信息,请参阅此文档。 此外,此处时间戳的含义可能因 Kafka 配置 (log.message.timestamp.type) 而异。 有关详细信息,请参阅 Apache Kafka 文档