重要
本文档已过时,将来可能不会更新。
Azure Synapse 连接器为 Azure Synapse 提供高效且可缩放的结构化流式写入支持,通过提供一致的用户体验和批量写入功能,并使用 COPY
在 Azure Databricks 群集与 Azure Synapse 实例之间进行大规模数据传输。
Azure Databricks 与 Synapse 之间的结构化流式处理支持为配置增量 ETL 作业提供简单的语义。 用于将 Azure Databricks 中的数据加载到 Synapse 的模型会造成可能不符合准实时工作负载 SLA 要求的延迟。 请参阅在 Azure Synapse Analytics 中查询数据。
Azure Synapse 连接器支持用于记录追加和聚合的 Append
和 Complete
输出模式。 有关输出模式和兼容性矩阵的更多详细信息,请参阅结构化流式处理指南。
默认情况下,Azure Synapse 流式处理提供端到端“恰好一次”保证,可确保将数据写入 Azure Synapse 表,方法是:将 DBFS 中的检查点位置、Azure Synapse 中的检查点表以及锁定机制组合使用,从而可靠地跟踪查询进度,以确保流式处理可以应对任何类型的故障、重试和查询重启。
也可为 Azure Synapse 流式处理选择限制较少的“至少一次”语义,方法是将 spark.databricks.sqldw.streaming.exactlyOnce.enabled
选项设置为 false
,这样,如果在连接到 Azure Synapse 时出现间歇性故障,或者查询意外终止,则会进行数据复制。
以下代码示例演示如何在 Scala 和 Python 中使用结构化流式处理流式写入 Synapse:
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
有关完整的配置列表,请参阅在 Azure Synapse Analytics 中查询数据。
Azure Synapse 连接器不会删除在新的流式查询启动时创建的流式处理检查点表。 此行为与通常为对象存储指定的 checkpointLocation
相一致。 Databricks 建议定期删除将来不会运行的查询的检查点表。
默认情况下,所有检查点表的名称都为 <prefix>_<query-id>
,其中 <prefix>
是一个可配置前缀(默认值为 databricks_streaming_checkpoint
),query_id
是删除了 _
字符的流式查询 ID。
若要查找陈旧的或已删除的流式查询的所有检查点表,请运行以下查询:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
可以通过 Spark SQL 配置选项 spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
来配置前缀。
除了批处理选项外,Spark SQL 中提供的 OPTIONS
还支持以下流式处理选项:
参数 | 必须 | 默认 | 说明 |
---|---|---|---|
checkpointLocation |
是 | 无默认值 | DBFS 上的位置,可供结构化流式处理用来写入元数据和检查点信息。 请参阅结构化流式处理编程指南中的 Recovering from Failures with Checkpointing(使用检查点功能从故障中恢复)。 |
numStreamingTempDirsToKeep |
否 | 0 | 指示要保留多少(最新的)临时目录,以便定期清理流式处理中的微型批。 如果将此项设置为 0 ,则系统会在微型批提交后立即触发目录删除操作;如果将此项设置为其他值,则系统会保留所设定数量的最新微型批并删除其余目录。 使用 -1 可禁用定期清理。 |
备注
checkpointLocation
和 numStreamingTempDirsToKeep
仅适用于将数据从 Azure Databricks 流式写入到 Azure Synapse 中的新表。