read_pulsar 流式表值函数

适用于:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 及更高版本

重要

此功能目前以公共预览版提供。

返回一个表,其中包含从 Pulsar 读取的记录。

该表值函数仅支持流式查询,不支持批量查询。

语法

read_pulsar ( { option_key => option_value } [, ...] )

参数

此函数需要用于选项键的命名参数调用

serviceUrltopic 选项是必需的。

此处对参数的说明很简短。 有关详细说明,请参阅结构化流式处理 Pulsar 文档。

选项 类型 默认 说明
serviceUrl STRING 必需 Pulsar 服务的 URI。
主题 STRING 必需 要从中读取的主题。
predefinedSubscription STRING 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。
subscriptionPrefix STRING 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。
pollTimeoutMs LONG 120000 从 Pulsar 读取消息的超时(以毫秒为单位)。
failOnDataLoss BOOLEAN true 控制数据丢失(例如,删除主题,或者由于保留策略而删除消息)时查询是否失败。
startingOffsets STRING 最新 查询开始时的起点,可以是最早的、最晚的,也可以是一个指定特定偏移的 JSON 字符串。 如果是最晚的,则读取器在开始运行后会读取最新记录。 如果是最早的,则读取器将从最早的偏移开始读取。 用户还可以指定一个指定特定偏移量的 JSON 字符串。
startingTime STRING 指定此项时,Pulsar 源将从指定 startingTime 的位置开始读取消息。

以下参数用于 pulsar 客户端的身份验证:

选项 类型 默认 说明
pulsarClientAuthPluginClassName STRING 身份验证插件的名称。
pulsarClientAuthParams STRING 身份验证插件的参数。
pulsarClientUseKeyStoreTls STRING 是否要使用 KeyStore 进行 tls 身份验证。
pulsarClientTlsTrustStoreType STRING tls 身份验证的 TrustStore 文件类型。
pulsarClientTlsTrustStorePath STRING tls 身份验证的 TrustStore 文件路径。
pulsarClientTlsTrustStorePassword STRING tls 身份验证的 TrustStore 密码。

这些参数用于 pulsar 准入控制的配置和验证。仅当启用准入控制时(当设置 maxBytesPerTrigger 时),才需要 pulsar 管理配置

选项 类型 默认 说明
maxBytesPerTrigger BIGINT 要按微批处理的最大字节数的软限制。 如果指定此项,则还需指定 admin.url。
adminUrl STRING Pulsar serviceHttpUrl 配置。 只有在指定 maxBytesPerTrigger 的情况下才需要。
pulsarAdminAuthPlugin STRING 身份验证插件的名称。
pulsarAdminAuthParams STRING 身份验证插件的参数。
pulsarClientUseKeyStoreTls STRING 是否要使用 KeyStore 进行 tls 身份验证。
pulsarAdminTlsTrustStoreType STRING tls 身份验证的 TrustStore 文件类型。
pulsarAdminTlsTrustStorePath STRING tls 身份验证的 TrustStore 文件路径。
pulsarAdminTlsTrustStorePassword STRING tls 身份验证的 TrustStore 密码。

返回

具有以下架构的 pulsar 记录表。

  • __key STRING NOT NULL:Pulsar 消息键。

  • value BINARY NOT NULL:Pulsar 消息值。

    注意:对于具有 Avro 或 JSON 架构的主题,会对内容进行扩展以保留 Pulsar 主题的字段名称和字段类型,而不是将内容加载到二进制值字段中。

  • __topic STRING NOT NULL:Pulsar 主题名称。

  • __messageId BINARY NOT NULL:Pulsar 消息 ID。

  • __publishTime TIMESTAMP NOT NULL:Pulsar 消息发布时间。

  • __eventTime TIMESTAMP NOT NULL:Pulsar 消息事件时间。

  • __messageProperties MAP<STRING, STRING>:Pulsar 消息属性。

示例

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.