使用结构化流式处理写入 Azure Synapse

重要

本文档已过时,将来可能不会更新。

Azure Synapse 连接器为 Azure Synapse 提供高效且可缩放的结构化流式写入支持,通过提供一致的用户体验和批量写入功能,并使用 COPY 在 Azure Databricks 群集与 Azure Synapse 实例之间进行大规模数据传输。

Azure Databricks 与 Synapse 之间的结构化流式处理支持为配置增量 ETL 作业提供简单的语义。 用于将 Azure Databricks 中的数据加载到 Synapse 的模型会造成可能不符合准实时工作负载 SLA 要求的延迟。 请参阅在 Azure Synapse Analytics 中查询数据

支持用于流式写入 Synapse 的输出模式

Azure Synapse 连接器支持用于记录追加和聚合的 AppendComplete 输出模式。 有关输出模式和兼容性矩阵的更多详细信息,请参阅结构化流式处理指南

Synapse 容错语义

默认情况下,Azure Synapse 流式处理提供端到端“恰好一次”保证,可确保将数据写入 Azure Synapse 表,方法是:将 DBFS 中的检查点位置、Azure Synapse 中的检查点表以及锁定机制组合使用,从而可靠地跟踪查询进度,以确保流式处理可以应对任何类型的故障、重试和查询重启。

也可为 Azure Synapse 流式处理选择限制较少的“至少一次”语义,方法是将 spark.databricks.sqldw.streaming.exactlyOnce.enabled 选项设置为 false,这样,如果在连接到 Azure Synapse 时出现间歇性故障,或者查询意外终止,则会进行数据复制。

用于写入 Azure Synapse 的结构化流式处理语法

以下代码示例演示如何在 Scala 和 Python 中使用结构化流式处理流式写入 Synapse:

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

有关完整的配置列表,请参阅在 Azure Synapse Analytics 中查询数据

Synapse 流式处理检查点表管理

Azure Synapse 连接器不会删除在新的流式查询启动时创建的流式处理检查点表。 此行为与通常为对象存储指定的 checkpointLocation 相一致。 Databricks 建议定期删除将来不会运行的查询的检查点表。

默认情况下,所有检查点表的名称都为 <prefix>_<query-id>,其中 <prefix> 是一个可配置前缀(默认值为 databricks_streaming_checkpoint),query_id 是删除了 _ 字符的流式查询 ID。

若要查找陈旧的或已删除的流式查询的所有检查点表,请运行以下查询:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

可以通过 Spark SQL 配置选项 spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix 来配置前缀。

Azure Databricks Synapse 连接器流式处理选项参考

除了批处理选项外,Spark SQL 中提供的 OPTIONS 还支持以下流式处理选项:

参数 必须 默认 说明
checkpointLocation 无默认值 DBFS 上的位置,可供结构化流式处理用来写入元数据和检查点信息。 请参阅结构化流式处理编程指南中的 Recovering from Failures with Checkpointing(使用检查点功能从故障中恢复)。
numStreamingTempDirsToKeep 0 指示要保留多少(最新的)临时目录,以便定期清理流式处理中的微型批。 如果将此项设置为 0,则系统会在微型批提交后立即触发目录删除操作;如果将此项设置为其他值,则系统会保留所设定数量的最新微型批并删除其余目录。 使用 -1 可禁用定期清理。

备注

checkpointLocationnumStreamingTempDirsToKeep 仅适用于将数据从 Azure Databricks 流式写入到 Azure Synapse 中的新表。