FAQ

有关将 Kafka 与 Azure Databricks 配合使用的常见问题。

为什么收到不支持或无法识别 Kafka 选项的错误?

设置 Kafka 本机配置选项时,一个常见的错误是忘记 kafka. 前缀。 直接传递到 Kafka 客户端的所有选项都必须以 kafka.以下前缀:

# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")

# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")

特定于 Spark Kafka 连接器(如 subscribestartingOffsetsmaxOffsetsPerTrigger)的选项不需要前缀。 请参阅选项以获取完整列表。

为什么我收到关于 Kafka 遮蔽类的错误?

Azure Databricks 需要使用带阴影的 Kafka 类(带 kafkashaded.shadedmskiam.前缀)。 如果您看到类似RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED的错误,则必须使用阴影类名。

  • org.apache.kafka.* 类需要 kafkashaded. 前缀。 例如:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule
  • software.amazon.msk.* 类需要 shadedmskiam. 前缀。 例如:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule

为什么我在连接到 Kafka 时收到TimeoutException

常见原因包括:

  • 网络连接:计算群集无法访问 Kafka 中转站。 检查防火墙规则、安全组和VPC配置。
  • 错误的启动服务器:验证 kafka.bootstrap.servers 主机名和端口是否正确。
  • DNS 解析:确保可以从 Azure Databricks 网络解析 Kafka 中转站主机名。
  • SSL/TLS 问题:如果使用 SSL,请验证证书是否已正确配置。

对于专用链接或虚拟私有云对等连接设置,请确保正确的网络路由已配置到位。

我应该对 Kafka 使用批处理模式还是流式处理模式?

这取决于你的用例:

  • 流式处理模式spark.readStream):需要连续数据处理或低延迟引入时使用。
  • 批处理模式spark.read):用于一次性数据加载、回填或调试。 需要同时具备startingOffsetsendingOffsets

有关配置触发器间隔(例如AvailableNowProcessingTime)的详细信息,请参阅“配置结构化流式处理触发器间隔”。

是否可以从单个流中的多个 Kafka 主题中读取内容?

是的,可以使用:

  • subscribe:提供以逗号分隔的主题列表,例如 .option("subscribe", "topic1,topic2")
  • subscribePattern:使用 Java 正则表达式模式匹配主题名称,例如 .option("subscribePattern", "topic-.*")

如何将 Kafka 与 Lakeflow Spark 声明性管道配合使用?

Lakeflow Spark 声明性管道为 Kafka 源提供本机支持。 可以定义一个从 Kafka 读取的流表。

Python

import dlt

@dlt.table
def kafka_bronze():
  return (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:port>")
    .option("subscribe", "<topic>")
    .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>'
);

请参阅 管道中加载数据 以获取有关 Lakeflow Spark 声明性管道中流媒体源的更多详细信息。

如何反序列化 Kafka 键和值列?

keyvalue以二进制形式返回(BINARY类型)。 使用 DataFrame 操作根据数据格式反序列化它们:

  • 字符串数据:用于 cast("string") 将二进制文件转换为字符串。
  • JSON 数据:转换到字符串后使用 from_json() 。 请参阅 from_json 函数
  • Avro 数据:使用 from_avro() 来反序列化 Avro 编码的数据。 请参阅流式读取和写入 Avro 数据
  • 协议缓冲区:用于 from_protobuf() 反序列化 protobuf 数据。 请参阅读取和写入协议缓冲区

为什么会出现幂等写入错误?

Databricks Runtime 13.3 LTS 及更高版本包含默认会启用幂等写入的较新版本的 kafka-clients 库。 如果 Kafka 群集使用版本 2.8.0 或更低版本且配置了 ACL 但未IDEMPOTENT_WRITE启用 ACL,则写入失败,并显示: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state

通过升级到 Kafka 2.8.0 或更高版本或者在配置结构化流式处理写入器时设置 .option("kafka.enable.idempotence", "false") 来解决此错误。

KAFKA_DATA_LOSS_ERROR 是什么,如何解决它?

当 Kafka 源检测到存储在检查点中的偏移量不再在 Kafka 中可用时,会发生此错误,通常是因为:

  • 流已暂停的时间超过 Kafka 保留期。
  • 已删除 Kafka 主题数据或重新创建主题。
  • Kafka 中转站遇到数据丢失。

若要解决问题,请执行以下操作:

  • 如果可以接受数据丢失:设置为 .option("failOnDataLoss", "false") 允许流从最早的可用偏移量继续。
  • 如果数据丢失不可接受:重置检查点并从偏移量重新开始处理earliest,或还原缺少的 Kafka 数据。

请参阅KAFKA_DATA_LOSS错误条件以获取更多信息。

如何控制从 Kafka 读取数据的速率?

使用 maxOffsetsPerTrigger 选项可以限制每个微批处理所处理的偏移量数量(大约等于记录数)。 这有助于防止大量批处理导致下游处理不堪重负,或者在赶上积压工作时导致内存问题。

Python

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()
)

Scala

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:port>")
  .option("subscribe", "<topic>")
  .option("maxOffsetsPerTrigger", 10000)
  .load()

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:port>',
  subscribe => '<topic>',
  maxOffsetsPerTrigger => '10000'
);

或者,使用类似 minPartitionsmaxRecordsPerPartition 控制为每个批处理创建多少个 Spark 分区的选项。

如何监视流距最新 Kafka 偏移量有多远?

使用avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest这些在流式处理查询进度中提供的指标。 在所有订阅的主题分区中,这些报告了您的流比最新可用偏移量落后的偏移量数量。 请查看 在 Azure Databricks 上监视结构化流式查询

还可以用于 estimatedTotalBytesBehindLatest 估计尚未处理的数据的总字节数。

为什么 Kafka 流初始化速度较慢?

Kafka 流需要时间去完成某些任务。

  1. 连接到 Kafka 群集并提取元数据。
  2. 识别主题分区。
  3. 提取初始偏移量。

对于本地或远程 Kafka 群集,网络延迟可能会显著影响初始化时间。 如果您经常重启触发或定时运行的管道,请考虑使用连续流模式以避免重复初始化开销。

为什么不添加更多 Spark 执行程序来增加 Kafka 吞吐量?

Kafka 代理饱和后,添加更多 Spark 执行程序会增加成本,而不会增加吞吐量。

表明 Kafka 是瓶颈:

  • 尽管添加了更多核心,但吞吐量仍会停滞不前。
  • Kafka 中转站 CPU 或网络利用率很高。
  • Spark 任务快速完成,但等待新数据。

若要解决此问题,请通过添加新的代理节点或增加分区数量,以分担负载和扩展 Kafka 集群。

如何优化 Kafka 流式处理的成本和计算利用率?

对于“微批处理模式”和“AvailableNow”模式:

  • 调整群集大小:监视指标,并为峰值负载设置适当的固定群集大小。
  • 使用 maxOffsetsPerTrigger:限制批大小以在负载高峰期间控制资源使用情况。
  • 避免自动缩放:流式处理作业持续运行,添加或删除节点会导致任务重新均衡开销。
  • 减少数据偏斜:倾斜分区会导致某些任务处理的数据比其他任务要大得多,从而导致拖延任务,减缓整体批处理的完成速度,并导致计算资源在闲置任务上被浪费。 minPartitions使用此选项可将大型 Kafka 分区拆分为较小的 Spark 分区,以便进行更均衡的处理。

对于实时模式,正确调整大小尤其重要,因为任务在等待数据时可以保持空闲状态。 关键注意事项:

  • 设置 maxPartitions 后,每个任务处理多个 Kafka 分区以减少开销。
  • 优化 spark.sql.shuffle.partitions 处理大量数据流的作业。

有关实时模式下的群集大小指导,请参阅 计算资源大小

为什么我的流没有返回记录,尽管主题中存在数据?

常见原因包括:

  • 错误 startingOffsets 设置:默认值为 latest,仅读取流启动后到达的新数据。 将startingOffsets设置为earliest以读取现有数据。
  • 主题名称错误:验证是否订阅了正确的主题。
  • 身份验证问题:流可能已成功连接,但缺少从主题中读取的权限。 检查 Kafka ACL 列表。
  • 偏移过期:如果流已停止很长时间,并且检查点中的偏移量已过期(被 Kafka 保留策略删除),则可能需要重置检查点或调整 failOnDataLoss