有关将 Kafka 与 Azure Databricks 配合使用的常见问题。
为什么收到不支持或无法识别 Kafka 选项的错误?
设置 Kafka 本机配置选项时,一个常见的错误是忘记 kafka. 前缀。 直接传递到 Kafka 客户端的所有选项都必须以 kafka.以下前缀:
# Incorrect - missing the kafka. prefix
.option("security.protocol", "SASL_SSL")
.option("sasl.mechanism", "PLAIN")
# Correct - using the kafka. prefix
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
特定于 Spark Kafka 连接器(如 subscribe、startingOffsets 和 maxOffsetsPerTrigger)的选项不需要前缀。 请参阅选项以获取完整列表。
为什么我收到关于 Kafka 遮蔽类的错误?
Azure Databricks 需要使用带阴影的 Kafka 类(带 kafkashaded. 或 shadedmskiam.前缀)。 如果您看到类似RESTRICTED_STREAMING_OPTION_PERMISSION_ENFORCED的错误,则必须使用阴影类名。
-
org.apache.kafka.*类需要kafkashaded.前缀。 例如:kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule -
software.amazon.msk.*类需要shadedmskiam.前缀。 例如:shadedmskiam.software.amazon.msk.auth.iam.IAMLoginModule
为什么我在连接到 Kafka 时收到TimeoutException?
常见原因包括:
- 网络连接:计算群集无法访问 Kafka 中转站。 检查防火墙规则、安全组和VPC配置。
-
错误的启动服务器:验证
kafka.bootstrap.servers主机名和端口是否正确。 - DNS 解析:确保可以从 Azure Databricks 网络解析 Kafka 中转站主机名。
- SSL/TLS 问题:如果使用 SSL,请验证证书是否已正确配置。
对于专用链接或虚拟私有云对等连接设置,请确保正确的网络路由已配置到位。
我应该对 Kafka 使用批处理模式还是流式处理模式?
这取决于你的用例:
-
流式处理模式 (
spark.readStream):需要连续数据处理或低延迟引入时使用。 -
批处理模式 (
spark.read):用于一次性数据加载、回填或调试。 需要同时具备startingOffsets和endingOffsets。
有关配置触发器间隔(例如,AvailableNow和ProcessingTime)的详细信息,请参阅“配置结构化流式处理触发器间隔”。
是否可以从单个流中的多个 Kafka 主题中读取内容?
是的,可以使用:
-
subscribe:提供以逗号分隔的主题列表,例如.option("subscribe", "topic1,topic2")。 -
subscribePattern:使用 Java 正则表达式模式匹配主题名称,例如.option("subscribePattern", "topic-.*")。
如何将 Kafka 与 Lakeflow Spark 声明性管道配合使用?
Lakeflow Spark 声明性管道为 Kafka 源提供本机支持。 可以定义一个从 Kafka 读取的流表。
Python
import dlt
@dlt.table
def kafka_bronze():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_bronze AS
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>'
);
请参阅 管道中加载数据 以获取有关 Lakeflow Spark 声明性管道中流媒体源的更多详细信息。
如何反序列化 Kafka 键和值列?
列keyvalue以二进制形式返回(BINARY类型)。 使用 DataFrame 操作根据数据格式反序列化它们:
-
字符串数据:用于
cast("string")将二进制文件转换为字符串。 -
JSON 数据:转换到字符串后使用
from_json()。 请参阅from_json函数。 -
Avro 数据:使用
from_avro()来反序列化 Avro 编码的数据。 请参阅流式读取和写入 Avro 数据。 -
协议缓冲区:用于
from_protobuf()反序列化 protobuf 数据。 请参阅读取和写入协议缓冲区。
为什么会出现幂等写入错误?
Databricks Runtime 13.3 LTS 及更高版本包含默认会启用幂等写入的较新版本的 kafka-clients 库。 如果 Kafka 群集使用版本 2.8.0 或更低版本且配置了 ACL 但未IDEMPOTENT_WRITE启用 ACL,则写入失败,并显示: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
通过升级到 Kafka 2.8.0 或更高版本或者在配置结构化流式处理写入器时设置 .option("kafka.enable.idempotence", "false") 来解决此错误。
KAFKA_DATA_LOSS_ERROR 是什么,如何解决它?
当 Kafka 源检测到存储在检查点中的偏移量不再在 Kafka 中可用时,会发生此错误,通常是因为:
- 流已暂停的时间超过 Kafka 保留期。
- 已删除 Kafka 主题数据或重新创建主题。
- Kafka 中转站遇到数据丢失。
若要解决问题,请执行以下操作:
-
如果可以接受数据丢失:设置为
.option("failOnDataLoss", "false")允许流从最早的可用偏移量继续。 -
如果数据丢失不可接受:重置检查点并从偏移量重新开始处理
earliest,或还原缺少的 Kafka 数据。
请参阅KAFKA_DATA_LOSS错误条件以获取更多信息。
如何控制从 Kafka 读取数据的速率?
使用 maxOffsetsPerTrigger 选项可以限制每个微批处理所处理的偏移量数量(大约等于记录数)。 这有助于防止大量批处理导致下游处理不堪重负,或者在赶上积压工作时导致内存问题。
Python
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
Scala
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:port>")
.option("subscribe", "<topic>")
.option("maxOffsetsPerTrigger", 10000)
.load()
SQL
SELECT * FROM STREAM read_kafka(
bootstrapServers => '<server:port>',
subscribe => '<topic>',
maxOffsetsPerTrigger => '10000'
);
或者,使用类似 minPartitions 或 maxRecordsPerPartition 控制为每个批处理创建多少个 Spark 分区的选项。
如何监视流距最新 Kafka 偏移量有多远?
使用avgOffsetsBehindLatest、maxOffsetsBehindLatest和minOffsetsBehindLatest这些在流式处理查询进度中提供的指标。 在所有订阅的主题分区中,这些报告了您的流比最新可用偏移量落后的偏移量数量。 请查看 在 Azure Databricks 上监视结构化流式查询。
还可以用于 estimatedTotalBytesBehindLatest 估计尚未处理的数据的总字节数。
为什么 Kafka 流初始化速度较慢?
Kafka 流需要时间去完成某些任务。
- 连接到 Kafka 群集并提取元数据。
- 识别主题分区。
- 提取初始偏移量。
对于本地或远程 Kafka 群集,网络延迟可能会显著影响初始化时间。 如果您经常重启触发或定时运行的管道,请考虑使用连续流模式以避免重复初始化开销。
为什么不添加更多 Spark 执行程序来增加 Kafka 吞吐量?
Kafka 代理饱和后,添加更多 Spark 执行程序会增加成本,而不会增加吞吐量。
表明 Kafka 是瓶颈:
- 尽管添加了更多核心,但吞吐量仍会停滞不前。
- Kafka 中转站 CPU 或网络利用率很高。
- Spark 任务快速完成,但等待新数据。
若要解决此问题,请通过添加新的代理节点或增加分区数量,以分担负载和扩展 Kafka 集群。
如何优化 Kafka 流式处理的成本和计算利用率?
对于“微批处理模式”和“AvailableNow”模式:
- 调整群集大小:监视指标,并为峰值负载设置适当的固定群集大小。
-
使用
maxOffsetsPerTrigger:限制批大小以在负载高峰期间控制资源使用情况。 - 避免自动缩放:流式处理作业持续运行,添加或删除节点会导致任务重新均衡开销。
-
减少数据偏斜:倾斜分区会导致某些任务处理的数据比其他任务要大得多,从而导致拖延任务,减缓整体批处理的完成速度,并导致计算资源在闲置任务上被浪费。
minPartitions使用此选项可将大型 Kafka 分区拆分为较小的 Spark 分区,以便进行更均衡的处理。
对于实时模式,正确调整大小尤其重要,因为任务在等待数据时可以保持空闲状态。 关键注意事项:
- 设置
maxPartitions后,每个任务处理多个 Kafka 分区以减少开销。 - 优化
spark.sql.shuffle.partitions处理大量数据流的作业。
有关实时模式下的群集大小指导,请参阅 计算资源大小。
为什么我的流没有返回记录,尽管主题中存在数据?
常见原因包括:
-
错误
startingOffsets设置:默认值为latest,仅读取流启动后到达的新数据。 将startingOffsets设置为earliest以读取现有数据。 - 主题名称错误:验证是否订阅了正确的主题。
- 身份验证问题:流可能已成功连接,但缺少从主题中读取的权限。 检查 Kafka ACL 列表。
-
偏移过期:如果流已停止很长时间,并且检查点中的偏移量已过期(被 Kafka 保留策略删除),则可能需要重置检查点或调整
failOnDataLoss。