Azure Databricks Kafka 连接器支持多种用于连接到 Kafka 的身份验证方法。 本文介绍 Databricks 上的一些最常见的身份验证方法。 可以在 Kafka 文档中找到受支持的身份验证方法的完整列表。
使用服务主体连接到 Azure 事件中心
Azure Databricks 支持使用事件中心服务对 Spark 作业进行身份验证。 需通过带有 Microsoft Entra ID 的 OAuth 完成此身份验证。
使用 Unity Catalog 服务凭据连接
自 Databricks Runtime 16.1 发布以来,Azure Databricks 支持 Unity 目录服务凭据,以便向 Azure 事件中心进行身份验证。 Databricks 建议使用此方法,尤其是在共享群集或无服务器计算上运行 Kafka 流式处理时。
若要使用 Unity 目录服务凭据进行身份验证,请执行以下步骤:
- 创建新的 Unity Catalog 服务凭据。 如果不熟悉此过程,请参阅 “创建服务凭据 ”,获取有关创建服务凭据的说明。
- 确保附加到您的服务凭据的访问连接器具有连接到由世纪互联运营的 Azure Event Hubs 所需的权限。
- 在 Kafka 配置中,提供 Unity 目录服务凭据的名称作为源选项。 将选项
databricks.serviceCredential设置为服务凭据的名称。
以下示例使用服务凭据将 Kafka 配置为源:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-hostname>:9092",
"subscribe": "<topic>",
"databricks.serviceCredential": "<service-credential-name>",
# Optional: set this only if Databricks can't infer the scope for your Kafka service.
# "databricks.serviceCredential.scope": "https://<event-hubs-server>/.default",
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-hostname>:9092",
"subscribe" -> "<topic>",
"databricks.serviceCredential" -> "<service-credential-name>",
// Optional: set this only if Databricks can't infer the scope for your Kafka service.
// "databricks.serviceCredential.scope" -> "https://<event-hubs-server>/.default",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-hostname>:9092',
subscribe => '<topic>',
serviceCredential => '<service-credential-name>'
);
注意:使用 Unity 目录服务凭据连接到 Kafka 时,不再需要以下选项:
kafka.sasl.mechanismkafka.sasl.jaas.configkafka.security.protocolkafka.sasl.client.callback.handler.classkafka.sasl.oauthbearer.token.endpoint.url
使用客户端 ID 和机密进行连接
Azure Databricks 支持在以下计算环境中使用客户端 ID 和机密进行 Microsoft Entra ID 身份验证:
- 在配置为专用访问模式(以前称为单用户访问模式)的计算环境上运行的 Databricks Runtime 12.2 LTS 及更高版本。
- 在配置为标准访问模式(以前称为共享访问模式)的计算环境上运行的 Databricks Runtime 14.3 LTS 及更高版本。
- 未配置 Unity Catalog 的 Lakeflow Spark 声明式管道。
Azure Databricks 不支持在任何计算环境中使用证书进行 Microsoft Entra ID 身份验证,也不支持在使用 Unity Catalog 配置的 Lakeflow Spark 声明性管道中使用。
此身份验证不适用于使用标准访问模式或 Unity Catalog Lakeflow Spark 声明性管道的计算。
若要使用 Microsoft Entra ID 执行身份验证,必须具有以下值:
租户 ID。 可以在 Microsoft Entra ID 服务选项卡中找到此项。
客户端 ID(也称为应用程序 ID)。
一个客户端密码。 拥有此密码后,应将其作为机密添加到 Databricks 工作区。 若要添加此机密,请参阅机密管理。
EventHubs 主题。 可以在特定“事件中心命名空间”页上“实体”部分下的“事件中心”部分找到主题列表。 若要处理多个主题,可以在事件中心级别设置 IAM 角色。
EventHubs 服务器。 可以在特定“事件中心命名空间”的概述页面找到此项:
此外,若要使用 Entra ID,我们需要告知 Kafka 使用 OAuth SASL 机制(SASL 是通用协议,而 OAuth 是一种 SASL“机制”):
-
kafka.security.protocol应为SASL_SSL -
kafka.sasl.mechanism应为OAUTHBEARER -
kafka.sasl.login.callback.handler.class应该是 Java 类的完全限定名称,值为kafkashaded表示遮蔽 Kafka 类的登录回调处理程序。 请参阅以下示例以获得具体的类。
以下示例将 Kafka 配置为使用客户端 ID 和机密Microsoft Entra ID 身份验证连接到 Azure 事件中心:
Python
# This is the only section you need to modify for auth purposes
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
"kafka.bootstrap.servers": f"{event_hubs_server}:9093", # Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.partner.microsoftonline.cn/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093", // Port 9093 is the EventHubs Kafka port
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.partner.microsoftonline.cn/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
SQL
CREATE OR REFRESH STREAMING TABLE <table_name>
AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<event-hubs-server>:9093',
subscribe => '<event-hubs-topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'OAUTHBEARER',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="<client-id>" clientSecret="<client-secret>" scope="https://<event-hubs-server>/.default" ssl.protocol="SSL";',
`kafka.sasl.oauthbearer.token.endpoint.url` => 'https://login.partner.microsoftonline.cn/<tenant-id>/oauth2/v2.0/token',
`kafka.sasl.login.callback.handler.class` => 'kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler'
);
使用 SASL/PLAIN 进行身份验证
若要使用 SASL/PLAIN(用户名和密码)身份验证连接到 Kafka,请配置以下选项。 使用着色 PlainLoginModule 类名称:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "PLAIN",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'PLAIN',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";'
);
Azure Databricks 建议将密码存储为机密,而不是将其直接包含在代码中。 有关详细信息,请参阅机密管理。
使用 SASL/SCRAM 进行身份验证
若要使用 SASL/SCRAM 连接到 Kafka(SCRAM-SHA-256 或 SCRAM-SHA-512),请配置以下选项。 使用着色 ScramLoginModule 类名称:
Python
kafka_options = {
"kafka.bootstrap.servers": "<bootstrap-server>:9093",
"subscribe": "<topic>",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config":
'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";',
}
df = spark.readStream.format("kafka").options(**kafka_options).load()
Scala
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> "<bootstrap-server>:9093",
"subscribe" -> "<topic>",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512",
"kafka.sasl.jaas.config" ->
"""kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";""",
)
val df = spark.readStream.format("kafka").options(kafkaOptions).load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SASL_SSL',
`kafka.sasl.mechanism` => 'SCRAM-SHA-512',
`kafka.sasl.jaas.config` => 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="<username>" password="<password>";'
);
注释
如果 Kafka 群集配置为使用 SCRAM-SHA-256,请将SCRAM-SHA-512替换为SCRAM-SHA-256。
Azure Databricks 建议将密码存储为机密,而不是将其直接包含在代码中。 有关详细信息,请参阅机密管理。
使用 SSL 将 Azure Databricks 连接到 Kafka
若要启用与 Kafka 的 SSL/TLS 连接,请将 kafka.security.protocol 设置为 SSL,并提供以 kafka. 为前缀的信任存储和密钥存储配置选项。 对于仅需要服务器身份验证(单向 TLS)的 SSL 连接,需要信任存储。 在 Kafka 代理对客户端进行身份验证的相互 TLS(mTLS)环境中,您需要信任库和密钥库。
可以使用以下 SSL/TLS 选项。 有关 SSL 属性的完整列表,请参阅 Confluent 文档中的 Apache Kafka SSL 配置文档 和 使用 SSL 进行加密和身份验证 。
| 选项 | Description |
|---|---|
kafka.security.protocol |
将SSL设置为启用TLS加密。 |
kafka.ssl.truststore.location |
包含受信任 CA 证书的信任存储文件的路径。 |
kafka.ssl.truststore.password |
信任存储文件的密码。 |
kafka.ssl.truststore.type |
信任存储文件格式(默认值:JKS) |
kafka.ssl.keystore.location |
包含客户端证书和私钥的密钥存储文件的路径(mTLS 是必需的)。 |
kafka.ssl.keystore.password |
密钥存储文件的密码。 |
kafka.ssl.key.password |
密钥存储中私钥的密码。 |
kafka.ssl.endpoint.identification.algorithm |
主机名验证算法。 默认为 https。 设置为空字符串以禁用。 |
如果使用 SSL,Databricks 建议:
- 将证书存储在 Unity 目录卷中。 有权从卷中读取的用户可以使用 Kafka 证书。 有关详细信息,请参阅什么是 Unity Catalog 卷?。
- 将您的证书密码作为机密存储在秘密范围中。 有关详细信息,请参阅 “管理机密范围”。
以下示例使用对象存储位置和 Databricks 机密启用 SSL 连接:
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-server>:9093")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope = <certificate-scope-name>, key = <truststore-password-key-name>))
SQL
SELECT * FROM read_kafka(
bootstrapServers => '<bootstrap-server>:9093',
subscribe => '<topic>',
`kafka.security.protocol` => 'SSL',
`kafka.ssl.truststore.location` => '<truststore-location>',
`kafka.ssl.keystore.location` => '<keystore-location>',
`kafka.ssl.keystore.password` => secret('<certificate-scope-name>', '<keystore-password-key-name>'),
`kafka.ssl.truststore.password` => secret('<certificate-scope-name>', '<truststore-password-key-name>')
);
将 Kafka on HDInsight 连接到 Azure Databricks
创建 HDInsight Kafka 群集。
有关说明,请参阅通过 Azure 虚拟网络连接到 Kafka on HDInsight。
对 Kafka 中转站进行配置以播发正确的地址。
按照为 IP 播发配置 Kafka 中的说明进行操作。 如果在 Azure 虚拟机上自行管理 Kafka,请确保将中转站的
advertised.listeners配置设置为主机的内部 IP。创建 Azure Databricks 群集。
将 Kafka 群集与 Azure Databricks 群集进行对等互连。
按照将虚拟网络对等互连中的说明进行操作。
使用 Databricks 着色的 Kafka 类名称
Azure Databricks 捆绑 Kafka 客户端库的专有着色版本。 在身份验证配置选项中引用的所有 Kafka 客户端类名称都必须使用着色类名称前缀,而不是标准开源类名。 这适用于选项中引用的任何类,例如 kafka.sasl.jaas.config, kafka.sasl.login.callback.handler.class和 kafka.sasl.client.callback.handler.class。
使用无阴影类名会导致 RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED 错误。 有关更多详细信息,请参阅 常见问题解答 。
处理潜在错误
无法创建新
KafkaAdminClient如果以下任一身份验证选项不正确,则会引发此内部 Kafka 错误:
- 客户端 ID(也称为应用程序 ID)
- 租户 ID
- 事件中心服务器
若要解决此错误,请验证这些选项的值是否正确。 此外,如果修改示例(如
kafka.security.protocol)中默认提供的配置选项,则可能会看到此错误。未返回任何记录
如果你尝试显示或处理 DataFrame 但未获得结果,则将在 UI 中看到以下内容。
此消息表示身份验证成功,但 EventHubs 未返回任何数据。 一些可能的(尽管并不详尽)原因包括:
- 指定的 EventHubs 主题有误。
-
startingOffsets的默认 Kafka 配置选项是latest,你目前尚未通过主题接收任何数据。 你可以将startingOffsets设为earliest,以开始从 Kafka 的最早偏移量读取数据。