从 Apache Pulsar 流式传输

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 14.1 及更高版本中,可以使用结构化流式传输功能从 Azure Databricks 上的 Apache Pulsar 流式传输数据。

结构化流式传输为从 Pulsar 源读取的数据提供一次性处理语义。

语法示例

下面是使用结构化流式传输从 Pulsar 进行读取的基本示例:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

必须始终提供 service.url 和下列选项之一来指定主题:

  • topic
  • topics
  • topicsPattern

有关选项的完整列表,请参阅配置 Pulsar 流式传输读取的选项

向 Pulsar 进行身份验证

Azure Databricks 支持向 Pulsar 进行信任存储和密钥存储身份验证。 Databricks 建议使用机密来存储配置详细信息。

可以在流式传输配置期间设置以下选项:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

如果流式传输使用 PulsarAdmin,还必须设置以下项:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

下面的示例演示了如何配置验证选项:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar 架构

从 Pulsar 读取的记录架构取决于主题如何对其架构进行编码。

  • 对于具有 Avro 或 JSON 架构的主题,字段名称和字段类型将保留在生成的 Spark 数据帧中。
  • 对于没有架构或具有 Pulsar 中简单数据类型的主题,有效负载将加载到 value 列。
  • 如果将读取器配置为读取具有不同架构的多个主题,请设置 allowDifferentTopicSchemas 以将原始内容加载到 value 列。

Pulsar 记录具有以下元数据字段:

类型
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

配置 Pulsar 流式传输读取的选项

所有选项都在使用 .option("<optionName>", "<optionValue>") 语法进行结构化流式传输读取的过程中配置。 也可以使用选项配置身份验证。 请参阅向 Pulsar 进行身份验证

下表描述了 Pulsar 的所需配置。 必须仅指定 topictopicstopicsPattern 选项中的一个。

选项 默认值 说明
service.url Pulsar 服务的 Pulsar serviceUrl 配置。
topic 要使用的主题的主题名称字符串。
topics 要使用的主题的逗号分隔列表。
topicsPattern 与要使用的主题相匹配的 Java 正则表达式字符串。

下表描述了 Pulsar 支持的其他选项:

选项 默认值 说明
predefinedSubscription 连接器用于跟踪 Spark 应用程序进度的预定义订阅名称。
subscriptionPrefix 连接器用于生成随机订阅以跟踪 Spark 应用程序进度的前缀。
pollTimeoutMs 120000 从 Pulsar 读取消息的超时(以毫秒为单位)。
waitingForNonExistedTopic false 连接器是否应等待,直到创建所需的主题。
failOnDataLoss true 控制数据丢失(例如,删除主题,或者由于保留策略而删除消息)时查询是否失败。
allowDifferentTopicSchemas false 如果读取了具有不同架构的多个主题,请使用此参数关闭基于架构的自动主题值反序列化。 仅当此项为 true 时,才会返回原始值。
startingOffsets latest 如果为 latest,读取器在开始运行后会读取最新记录。 如果为 earliest,读取器将从最早的偏移量开始读取。 用户还可以指定一个指定特定偏移量的 JSON 字符串。
maxBytesPerTrigger 要按微批处理的最大字节数的软限制。 如果指定此项,还需指定 admin.url
admin.url Pulsar serviceHttpUrl 配置。 仅当指定 maxBytesPerTrigger 时才需要。

还可以使用以下模式指定任何 Pulsar 客户端、管理员和读取器配置:

模式 配置选项的链接
pulsar.client.* Pulsar 客户端配置
pulsar.admin.* Pulsar 管理员配置
pulsar.reader.* Pulsar 读取器配置

构造起始偏移量 JSON

可以手动构造消息 ID 来指定特定的偏移量,并将其作为 JSON 传递给 startingOffsets 选项。 以下代码示例演示了此语法:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()