read_pulsar
流式表值函数
适用于: Databricks SQL Databricks Runtime 14.1 及更高版本
重要
此功能目前以公共预览版提供。
返回一个表,其中包含从 Pulsar 读取的记录。
该表值函数仅支持流式查询,不支持批量查询。
语法
read_pulsar ( { option_key => option_value } [, ...] )
参数
此函数需要用于选项键的命名参数调用。
serviceUrl
和 topic
选项是必需的。
此处对参数的说明很简短。 有关详细说明,请参阅结构化流式处理 Pulsar 文档。
选项 | 类型 | 默认 | 说明 |
---|---|---|---|
serviceUrl | STRING | 必需 | Pulsar 服务的 URI。 |
主题 | STRING | 必需 | 要从中读取的主题。 |
predefinedSubscription | STRING | 无 | 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。 |
subscriptionPrefix | STRING | 无 | 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。 |
pollTimeoutMs | LONG | 120000 | 从 Pulsar 读取消息的超时(以毫秒为单位)。 |
failOnDataLoss | BOOLEAN | true | 控制数据丢失(例如,删除主题,或者由于保留策略而删除消息)时查询是否失败。 |
startingOffsets | STRING | 最新 | 查询开始时的起点,可以是最早的、最晚的,也可以是一个指定特定偏移的 JSON 字符串。 如果是最晚的,则读取器在开始运行后会读取最新记录。 如果是最早的,则读取器将从最早的偏移开始读取。 用户还可以指定一个指定特定偏移量的 JSON 字符串。 |
startingTime | STRING | 无 | 指定此项时,Pulsar 源将从指定 startingTime 的位置开始读取消息。 |
以下参数用于 pulsar 客户端的身份验证:
选项 | 类型 | 默认 | 说明 |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | 无 | 身份验证插件的名称。 |
pulsarClientAuthParams | STRING | 无 | 身份验证插件的参数。 |
pulsarClientUseKeyStoreTls | STRING | 无 | 是否要使用 KeyStore 进行 tls 身份验证。 |
pulsarClientTlsTrustStoreType | STRING | 无 | tls 身份验证的 TrustStore 文件类型。 |
pulsarClientTlsTrustStorePath | STRING | 无 | tls 身份验证的 TrustStore 文件路径。 |
pulsarClientTlsTrustStorePassword | STRING | 无 | tls 身份验证的 TrustStore 密码。 |
这些参数用于 pulsar 准入控制的配置和验证。仅当启用准入控制时(当设置 maxBytesPerTrigger 时),才需要 pulsar 管理配置
选项 | 类型 | 默认 | 说明 |
---|---|---|---|
maxBytesPerTrigger | BIGINT | 无 | 要按微批处理的最大字节数的软限制。 如果指定此项,则还需指定 admin.url。 |
adminUrl | STRING | 无 | Pulsar serviceHttpUrl 配置。 只有在指定 maxBytesPerTrigger 的情况下才需要。 |
pulsarAdminAuthPlugin | STRING | 无 | 身份验证插件的名称。 |
pulsarAdminAuthParams | STRING | 无 | 身份验证插件的参数。 |
pulsarClientUseKeyStoreTls | STRING | 无 | 是否要使用 KeyStore 进行 tls 身份验证。 |
pulsarAdminTlsTrustStoreType | STRING | 无 | tls 身份验证的 TrustStore 文件类型。 |
pulsarAdminTlsTrustStorePath | STRING | 无 | tls 身份验证的 TrustStore 文件路径。 |
pulsarAdminTlsTrustStorePassword | STRING | 无 | tls 身份验证的 TrustStore 密码。 |
返回
具有以下架构的 pulsar 记录表。
__key STRING NOT NULL
:Pulsar 消息键。value BINARY NOT NULL
:Pulsar 消息值。注意:对于具有 Avro 或 JSON 架构的主题,会对内容进行扩展以保留 Pulsar 主题的字段名称和字段类型,而不是将内容加载到二进制值字段中。
__topic STRING NOT NULL
:Pulsar 主题名称。__messageId BINARY NOT NULL
:Pulsar 消息 ID。__publishTime TIMESTAMP NOT NULL
:Pulsar 消息发布时间。__eventTime TIMESTAMP NOT NULL
:Pulsar 消息事件时间。__messageProperties MAP<STRING, STRING>
:Pulsar 消息属性。
示例
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.