从 Apache Pulsar 流式传输
重要
此功能目前以公共预览版提供。
在 Databricks Runtime 14.1 及更高版本中,可以使用结构化流式传输功能从 Azure Databricks 上的 Apache Pulsar 流式传输数据。
结构化流式传输为从 Pulsar 源读取的数据提供一次性处理语义。
语法示例
下面是使用结构化流式传输从 Pulsar 进行读取的基本示例:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
必须始终提供 service.url
和下列选项之一来指定主题:
topic
topics
topicsPattern
有关选项的完整列表,请参阅配置 Pulsar 流式传输读取的选项。
向 Pulsar 进行身份验证
Azure Databricks 支持向 Pulsar 进行信任存储和密钥存储身份验证。 Databricks 建议使用机密来存储配置详细信息。
可以在流式传输配置期间设置以下选项:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
如果流式传输使用 PulsarAdmin
,还必须设置以下项:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
下面的示例演示了如何配置验证选项:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar 架构
从 Pulsar 读取的记录架构取决于主题如何对其架构进行编码。
- 对于具有 Avro 或 JSON 架构的主题,字段名称和字段类型将保留在生成的 Spark 数据帧中。
- 对于没有架构或具有 Pulsar 中简单数据类型的主题,有效负载将加载到
value
列。 - 如果将读取器配置为读取具有不同架构的多个主题,请设置
allowDifferentTopicSchemas
以将原始内容加载到value
列。
Pulsar 记录具有以下元数据字段:
列 | 类型 |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
配置 Pulsar 流式传输读取的选项
所有选项都在使用 .option("<optionName>", "<optionValue>")
语法进行结构化流式传输读取的过程中配置。 也可以使用选项配置身份验证。 请参阅向 Pulsar 进行身份验证。
下表描述了 Pulsar 的所需配置。 必须仅指定 topic
、topics
或 topicsPattern
选项中的一个。
选项 | 默认值 | 说明 |
---|---|---|
service.url |
无 | Pulsar 服务的 Pulsar serviceUrl 配置。 |
topic |
无 | 要使用的主题的主题名称字符串。 |
topics |
无 | 要使用的主题的逗号分隔列表。 |
topicsPattern |
无 | 与要使用的主题相匹配的 Java 正则表达式字符串。 |
下表描述了 Pulsar 支持的其他选项:
选项 | 默认值 | 说明 |
---|---|---|
predefinedSubscription |
无 | 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。 |
subscriptionPrefix |
无 | 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。 |
pollTimeoutMs |
120000 | 从 Pulsar 读取消息的超时(以毫秒为单位)。 |
waitingForNonExistedTopic |
false |
连接器是否应等待,直到创建所需的主题。 |
failOnDataLoss |
true |
控制数据丢失(例如,删除主题,或者由于保留策略而删除消息)时查询是否失败。 |
allowDifferentTopicSchemas |
false |
如果读取了具有不同架构的多个主题,请使用此参数关闭基于架构的自动主题值反序列化。 仅当此项为 true 时,才会返回原始值。 |
startingOffsets |
latest |
如果为 latest ,读取器在开始运行后会读取最新记录。 如果为 earliest ,读取器将从最早的偏移量开始读取。 用户还可以指定一个指定特定偏移量的 JSON 字符串。 |
maxBytesPerTrigger |
无 | 要按微批处理的最大字节数的软限制。 如果指定此项,还需指定 admin.url 。 |
admin.url |
无 | Pulsar serviceHttpUrl 配置。 仅当指定 maxBytesPerTrigger 时才需要。 |
还可以使用以下模式指定任何 Pulsar 客户端、管理员和读取器配置:
模式 | 配置选项的链接 |
---|---|
pulsar.client.* |
Pulsar 客户端配置 |
pulsar.admin.* |
Pulsar 管理员配置 |
pulsar.reader.* |
Pulsar 读取器配置 |
构造起始偏移量 JSON
可以手动构造消息 ID 来指定特定的偏移量,并将其作为 JSON 传递给 startingOffsets
选项。 以下代码示例演示了此语法:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()