映射数据流性能和优化指南Mapping data flows performance and tuning guide

适用于: Azure 数据工厂 Azure Synapse Analytics

Azure 数据工厂中的映射数据流提供一个无代码界面,用于大规模设计和运行数据转换。Mapping data flows in Azure Data Factory provide a code-free interface to design and run data transformations at scale. 如果你不熟悉映射数据流,请参阅映射数据流概述If you're not familiar with mapping data flows, see the Mapping Data Flow Overview. 本文重点介绍调整和优化数据流以使它们达到性能基准的各种方法。This article highlights various ways to tune and optimize your data flows so that they meet your performance benchmarks.

测试数据流逻辑Testing data flow logic

从 ADF UX 设计和测试数据流时,使用调试模式可以针对实时 Spark 群集进行交互式测试。When designing and testing data flows from the ADF UX, debug mode allows you to interactively test against a live Spark cluster. 这样,无需等待群集预热即可预览数据和执行数据流。This allows you to preview data and execute your data flows without waiting for a cluster to warm up. 有关详细信息,请参阅调试模式For more information, see Debug Mode.

监视数据流性能Monitoring data flow performance

使用调试模式验证转换逻辑后,以管道中活动的形式端到端地运行数据流。Once you verify your transformation logic using debug mode, run your data flow end-to-end as an activity in a pipeline. 数据流使用执行数据流活动在管道内进行操作化。Data flows are operationalized in a pipeline using the execute data flow activity. 相较于其他 Azure 数据工厂活动,数据流活动具有独特的监视体验,其中显示了转换逻辑的详细执行计划和性能配置文件。The data flow activity has a unique monitoring experience compared to other Azure Data Factory activities that displays a detailed execution plan and performance profile of the transformation logic. 若要查看数据流的详细监视信息,请单击管道的活动运行输出中的眼镜图标。To view detailed monitoring information of a data flow, click on the eyeglasses icon in the activity run output of a pipeline. 有关详细信息,请参阅监视映射数据流For more information, see Monitoring mapping data flows.

数据流监视器Data Flow Monitor

监视数据流性能时,需注意四个潜在瓶颈:When monitoring data flow performance, there are four possible bottlenecks to look out for:

  • 群集启动时间Cluster start-up time
  • 从源中读取Reading from a source
  • 转换时间Transformation time
  • 写入接收器Writing to a sink

数据流监视Data Flow Monitoring

群集启动时间是指启动 Apache Spark 群集所需的时间。Cluster start-up time is the time it takes to spin up an Apache Spark cluster. 该值位于监视屏幕的右上角。This value is located in the top-right corner of the monitoring screen. 数据流在实时模型上运行,其中每个作业都使用独立的群集。Data flows run on a just-in-time model where each job uses an isolated cluster. 该启动时间通常为 3-5 分钟。This start-up time generally takes 3-5 minutes. 对于顺序作业,可以通过启用生存时间值来缩短该时间。For sequential jobs, this can be reduced by enabling a time to live value. 有关详细信息,请参阅优化 Azure Integration RuntimeFor more information, see optimizing the Azure Integration Runtime.

数据流利用 Spark 优化器对“阶段”中的业务逻辑进行重新排序并运行,从而尽可能快地执行。Data flows utilize a Spark optimizer that reorders and runs your business logic in 'stages' to perform as quickly as possible. 对于数据流写入的每个接收器,监视输出列出每个转换阶段的持续时间,以及将数据写入接收器所需的时间。For each sink that your data flow writes to, the monitoring output lists the duration of each transformation stage, along with the time it takes to write data into the sink. 最长的持续时间可能就是数据流的瓶颈。The time that is the largest is likely the bottleneck of your data flow. 如果耗时最久的转换阶段包含源,则建议考虑进一步优化读取时间。If the transformation stage that takes the largest contains a source, then you may want to look at further optimizing your read time. 如果转换耗时久,则可能需要重新分区或增加集成运行时的大小。If a transformation is taking a long time, then you may need to repartition or increase the size of your integration runtime. 如果接收器处理时间耗时久,则可能需要扩展数据库或验证是否未输出到单个文件。If the sink processing time is large, you may need to scale up your database or verify you are not outputting to a single file.

确定数据流的瓶颈后,请使用以下优化策略来优化性能。Once you have identified the bottleneck of your data flow, use the below optimizations strategies to improve performance.

“优化”选项卡Optimize tab

“优化”选项卡包含用于配置 Spark 群集的分区方案的设置。The Optimize tab contains settings to configure the partitioning scheme of the Spark cluster. 此选项卡存在于数据流的每个转换中,可指定是否要在转换完成后对数据进行重新分区。This tab exists in every transformation of data flow and specifies whether you want to repartition the data after the transformation has completed. 调整分区可对跨计算节点的数据分布以及数据局部优化提供控制度,但会对数据流的总体性能同时造成正面影响和负面影响。Adjusting the partitioning provides control over the distribution of your data across compute nodes and data locality optimizations that can have both positive and negative effects on your overall data flow performance.

屏幕截图显示“优化”选项卡,其中包含“分区选项”、“分区类型”和“分区数”。

默认选中“使用当前分区”,这指示 Azure 数据工厂保留转换的当前输出分区。By default, Use current partitioning is selected which instructs Azure Data Factory keep the current output partitioning of the transformation. 由于对数据进行重新分区需要时间,因此在大多数情况下建议选择“使用当前分区”。As repartitioning data takes time, Use current partitioning is recommended in most scenarios. 建议对数据进行重新分区的情况包括:“进行会严重倾斜数据的聚合和联接后”或“在 SQL DB 上使用源分区时”。Scenarios where you may want to repartition your data include after aggregates and joins that significantly skew your data or when using Source partitioning on a SQL DB.

若要更改任何转换的分区,请选择“优化”选项卡并选择“设置分区”单选按钮 。To change the partitioning on any transformation, select the Optimize tab and select the Set Partitioning radio button. 你会看到一系列用于分区的选项。You are presented with a series of options for partitioning. 最佳的分区方法根据数据量、候选键、null 值和基数而异。The best method of partitioning differs based on your data volumes, candidate keys, null values, and cardinality.

重要

单一分区将所有分布式数据合并到一个分区。Single partition combines all the distributed data into a single partition. 此操作非常慢,还会严重影响所有下游转换和写入。This is a very slow operation that also significantly affects all downstream transformation and writes. Azure 数据工厂强烈反对使用此选项,除非有明确的业务原因。The Azure Data Factory highly recommends against using this option unless there is an explicit business reason to do so.

以下分区选项在每个转换中都可用:The following partitioning options are available in every transformation:

轮循机制Round robin

轮循机制将数据平均分布到各个分区。Round robin distributes data equally across partitions. 如果没有适当的候选键用于实施可靠、智能的分区策略,请使用轮循机制。Use round-robin when you don't have good key candidates to implement a solid, smart partitioning strategy. 可以设置物理分区数目。You can set the number of physical partitions.

哈希Hash

Azure 数据工厂将生成列的哈希来生成统一的分区,使包含类似值的行划归到同一分区。Azure Data Factory produces a hash of columns to produce uniform partitions such that rows with similar values fall in the same partition. 使用“哈希”选项时,请测试是否存在分区倾斜。When you use the Hash option, test for possible partition skew. 可以设置物理分区数目。You can set the number of physical partitions.

动态范围Dynamic range

动态范围基于提供的列或表达式使用 Spark 动态范围。The dynamic range uses Spark dynamic ranges based on the columns or expressions that you provide. 可以设置物理分区数目。You can set the number of physical partitions.

固定范围Fixed range

生成一个表达式用于在分区的数据列中提供值的固定范围。Build an expression that provides a fixed range for values within your partitioned data columns. 使用此选项之前,应该充分了解你的数据,以避免分区倾斜。To avoid partition skew, you should have a good understanding of your data before you use this option. 为表达式输入的值将用作分区函数的一部分。The values you enter for the expression are used as part of a partition function. 可以设置物理分区数目。You can set the number of physical partitions.

Key

如果你已充分了解数据的基数,则键分区可能是不错的策略。If you have a good understanding of the cardinality of your data, key partitioning might be a good strategy. 键分区为列中的每个唯一值创建分区。Key partitioning creates partitions for each unique value in your column. 无法设置分区数目,因为该数目基于数据中的唯一值。You can't set the number of partitions because the number is based on unique values in the data.

提示

手动设置分区方案会重新组合数据,可能会抵消 Spark 优化器的优势。Manually setting the partitioning scheme reshuffles the data and can offset the benefits of the Spark optimizer. 最佳做法是不要手动设置分区,除非有需要。A best practice is to not manually set the partitioning unless you need to.

日志记录级别Logging level

如果不需要数据流活动的每个管道执行完整地记录所有详细的遥测日志,则可根据需要将日志记录级别设置为“基本”或“无”。If you do not require every pipeline execution of your data flow activities to fully log all verbose telemetry logs, you can optionally set your logging level to "Basic" or "None". 在“详细”模式(默认)下执行数据流时,要求 ADF 在数据转换期间完整地记录每个分区级别的活动。When executing your data flows in "Verbose" mode (default), you are requesting ADF to fully log activity at each individual partition level during your data transformation. 该操作成本昂贵,因此仅在进行故障排除时启用“详细”模式可优化整体数据流和管道性能。This can be an expensive operation, so only enabling verbose when troubleshooting can improve your overall data flow and pipeline performance. “基本”模式仅记录转换持续时间,而“无”模式仅提供持续时间的摘要。"Basic" mode will only log transformation durations while "None" will only provide a summary of durations.

日志记录级别Logging level

优化 Azure Integration RuntimeOptimizing the Azure Integration Runtime

数据流在运行时启动的 Spark 群集上运行。Data flows run on Spark clusters that are spun up at run-time. 活动的集成运行时 (IR) 中定义了所用群集的配置。The configuration for the cluster used is defined in the integration runtime (IR) of the activity. 定义集成运行时时,需考虑三个性能注意事项:群集类型、群集大小和生存时间。There are three performance considerations to make when defining your integration runtime: cluster type, cluster size, and time to live.

有关如何创建 Integration Runtime 的详细信息,请参阅 Azure 数据工厂中的 Integration RuntimeFor more information how to create an Integration Runtime, see Integration Runtime in Azure Data Factory.

群集类型Cluster type

对于启动的 Spark 群集类型,有三个可用选项:常规用途、内存优化和计算优化。There are three available options for the type of Spark cluster spun up: general purpose, memory optimized, and compute optimized.

常规用途群集是默认选择,非常适合大多数数据流工作负载。General purpose clusters are the default selection and will be ideal for most data flow workloads. 这些群集往往达到了性能与成本的最佳平衡。These tend to be the best balance of performance and cost.

如果数据流具有大量联接和查找,则建议使用内存优化群集。If your data flow has many joins and lookups, you may want to use a memory optimized cluster. 内存优化群集可以在内存中存储更多数据,可最大程度地减少可能遇到的任何内存不足错误。Memory optimized clusters can store more data in memory and will minimize any out-of-memory errors you may get. 内存优化群集的每核心价格最高,但也往往可以生成更有效的管道。Memory optimized have the highest price-point per core, but also tend to result in more successful pipelines. 如果在执行数据流时遇到任何内存不足错误,请切换到内存优化 Azure IR 配置。If you experience any out of memory errors when executing data flows, switch to a memory optimized Azure IR configuration.

计算优化群集不适用于 ETL 工作流,Azure 数据工厂团队不建议将其用于大多数生产工作负载。Compute optimized aren't ideal for ETL workflows and aren't recommended by the Azure Data Factory team for most production workloads. 对于更简单的非内存密集型数据转换(例如,筛选数据或添加派生列),可以以更便宜的每核心价格使用计算优化群集。For simpler, non-memory intensive data transformations such as filtering data or adding derived columns, compute-optimized clusters can be used at a cheaper price per core.

群集大小Cluster size

数据流将数据处理分发到 Spark 群集中的各个节点,以并行执行操作。Data flows distribute the data processing over different nodes in a Spark cluster to perform operations in parallel. 具有更多核心的 Spark 群集会增加计算环境中的节点数。A Spark cluster with more cores increases the number of nodes in the compute environment. 节点越多,对数据流的处理能力越高。More nodes increase the processing power of the data flow. 增加群集的大小通常可轻松缩短处理时间。Increasing the size of the cluster is often an easy way to reduce the processing time.

默认群集大小即四个驱动程序节点和四个工作器节点。The default cluster size is four driver nodes and four worker nodes. 处理更多数据时,建议使用较大的群集。As you process more data, larger clusters are recommended. 以下是可能的大小选项:Below are the possible sizing options:

工作器核心Worker cores 驱动程序核心Driver cores 核心总数Total cores 说明Notes
44 44 88 不可用于计算优化群集Not available for compute optimized
88 88 1616
1616 1616 3232
3232 1616 4848
6464 1616 8080
128128 1616 144144
256256 1616 272272

数据流按 vcore-hrs 定价,这意味着群集大小和执行时间都会对价格产生影响。Data flows are priced at vcore-hrs meaning that both cluster size and execution-time factor into this. 纵向扩展时,每分钟的群集成本将增加,但总时间将缩短。As you scale up, your cluster cost per minute will increase, but your overall time will decrease.

提示

群集大小对数据流性能的影响程度有上限。There is a ceiling on how much the size of a cluster affects the performance of a data flow. 根据数据的大小,当群集大小增加到某一程度时,将不再能优化性能。Depending on the size of your data, there is a point where increasing the size of a cluster will stop improving performance. 例如,如果节点多于数据分区,则添加更多节点将无济于事。For example, If you have more nodes than partitions of data, adding additional nodes won't help. 最佳做法是从小规模开始,然后纵向扩展以满足性能需求。A best practice is to start small and scale up to meet your performance needs.

生存时间Time to live

默认情况下,每个数据流活动都会根据 IR 配置启动新群集。By default, every data flow activity spins up a new cluster based upon the IR configuration. 群集启动时间需要几分钟,并且完成后才能开始数据处理。Cluster start-up time takes a few minutes and data processing can't start until it is complete. 如果管道包含多个顺序数据流,则可以启用生存时间 (TTL) 值。If your pipelines contain multiple sequential data flows, you can enable a time to live (TTL) value. 指定生存时间值可使群集在执行完成后保持一定时长的活跃度。Specifying a time to live value keeps a cluster alive for a certain period of time after its execution completes. 如果新作业在 TTL 时间内开始使用 IR,则它将重新使用现有群集,且启动时间将显著缩短。If a new job starts using the IR during the TTL time, it will reuse the existing cluster and start up time will greatly reduced. 第二个作业完成后,群集将再次保持 TTL 时长的活跃度。After the second job completes, the cluster will again stay alive for the TTL time.

一个群集一次只能运行一个作业。Only one job can run on a single cluster at a time. 如果有一个可用群集,但启动了两个数据流,则只有一个数据流可使用该活跃群集。If there is an available cluster, but two data flows start, only one will use the live cluster. 第二个作业将启动自己的独立群集。The second job will spin up its own isolated cluster.

如果大多数数据流并行执行,则不建议启用 TTL。If most of your data flows execute in parallel, it is not recommended that you enable TTL.

备注

使用自动解析集成运行时时,生存时间不可用Time to live is not available when using the auto-resolve integration runtime

优化源Optimizing sources

对于除 Azure SQL 数据库以外的每个源,建议将“使用当前分区”保留为选定值。For every source except Azure SQL Database, it is recommended that you keep Use current partitioning as the selected value. 读取其他所有源系统时,数据流会根据数据的大小自动且均匀地对数据进行分区。When reading from all other source systems, data flows automatically partitions data evenly based upon the size of the data. 大约每 128 MB 数据就会创建一个新分区。A new partition is created for about every 128 MB of data. 随着数据大小的增加,分区的数量也会增加。As your data size increases, the number of partitions increase.

所有自定义分区均在 Spark 读取数据后进行,并且将对数据流性能产生负面影响。Any custom partitioning happens after Spark reads in the data and will negatively impact your data flow performance. 由于数据在读取时会均匀分区,因此不建议这样做。As the data is evenly partitioned on read, this is not recommended.

备注

读取速度可能会受源系统吞吐量的限制。Read speeds can be limited by the throughput of your source system.

Azure SQL 数据库源Azure SQL Database sources

Azure SQL 数据库有唯一的分区选项,称为“源”分区。Azure SQL Database has a unique partitioning option called 'Source' partitioning. 启用源分区会在源系统上启用并行连接,从而能够增加读取 Azure SQL DB 的次数。Enabling source partitioning can improve your read times from Azure SQL DB by enabling parallel connections on the source system. 指定分区数以及数据分区方式。Specify the number of partitions and how to partition your data. 使用具有高基数的分区列。Use a partition column with high cardinality. 你也可以输入与源表的分区方案匹配的查询。You can also enter a query that matches the partitioning scheme of your source table.

提示

对于源分区,SQL Server 的 I/O 是瓶颈。For source partitioning, the I/O of the SQL Server is the bottleneck. 添加太多分区可能会导致源数据库饱和。Adding too many partitions may saturate your source database. 通常,使用此选项时,最好添加 4-5 个分区。Generally four or five partitions is ideal when using this option.

源分区Source partitioning

隔离级别Isolation level

Azure SQL 源系统上的读取隔离级别会影响性能。The isolation level of the read on an Azure SQL source system has an impact on performance. 如果选择“读取未提交的内容”,则将提供最快的性能,并可防止任何数据库锁定。Choosing 'Read uncommitted' will provide the fastest performance and prevent any database locks. 若要了解有关 SQL 隔离级别的更多信息,请参阅了解隔离级别To learn more about SQL Isolation levels, please see Understanding isolation levels.

使用查询进行读取Read using query

可以使用表或 SQL 查询读取 Azure SQL 数据库。You can read from Azure SQL Database using a table or a SQL query. 如果要执行 SQL 查询,则必须先完成查询,然后才能开始转换。If you are executing a SQL query, the query must complete before transformation can start. SQL 查询对下推操作非常有用,可加快执行速度并减少从 SQL Server 读取的数据量,例如 SELECT、WHERE 和 JOIN 语句。SQL Queries can be useful to push down operations that may execute faster and reduce the amount of data read from a SQL Server such as SELECT, WHERE, and JOIN statements. 下推操作时,你将无法在数据进入数据流之前跟踪转换的世系和性能。When pushing down operations, you lose the ability to track lineage and performance of the transformations before the data comes into the data flow.

Azure Synapse Analytics 源Azure Synapse Analytics sources

使用 Azure Synapse Analytics 时,源选项中有一个名为“启用暂存”的设置。When using Azure Synapse Analytics, a setting called Enable staging exists in the source options. 它允许 ADF 使用 Staging 读取 Synapse,可显著优化读取性能。This allows ADF to read from Synapse using Staging, which greatly improves read performance. 启用 Staging 需要在数据流活动设置中指定 Azure Blob 存储或 Azure Data Lake Storage Gen2 暂存位置。Enabling Staging requires you to specify an Azure Blob Storage or Azure Data Lake Storage gen2 staging location in the data flow activity settings.

启用暂存Enable staging

基于文件的源File-based sources

尽管数据流支持多种文件类型,但 Azure 数据工厂建议使用 Spark 原生 Parquet 格式来实现最佳读写次数。While data flows support a variety of file types, the Azure Data Factory recommends using the Spark-native Parquet format for optimal read and write times.

如果要在一组文件中运行同一数据流,建议读取文件夹、使用通配符路径或读取文件列表。If you're running the same data flow on a set of files, we recommend reading from a folder, using wildcard paths or reading from a list of files. 一次数据流活动运行可以批量处理所有文件。A single data flow activity run can process all of your files in batch. 若要详细了解如何设置这些设置,请参阅 Azure Blob 存储等连接器文档。More information on how to set these settings can be found in the connector documentation such as Azure Blob Storage.

如果可能,请避免使用 For-Each 活动对一组文件运行数据流。If possible, avoid using the For-Each activity to run data flows over a set of files. 这会导致 for-each 在每次迭代时启动自己的 Spark 群集,这通常既无必要,又成本昂贵。This will cause each iteration of the for-each to spin up its own Spark cluster, which is often not necessary and can be expensive.

优化接收器Optimizing sinks

当数据流写入接收器时,所有自定义分区都会在写入前一刻进行。When data flows write to sinks, any custom partitioning will happen immediately before the write. 与源相同,大多数情况下建议将“使用当前分区”保留为选定的分区选项。Like the source, in most cases it is recommended that you keep Use current partitioning as the selected partition option. 即使目标未分区,已分区数据的写入速度也将比未分区数据的写入速度快得多。Partitioned data will write significantly quicker than unpartitioned data, even your destination is not partitioned. 以下是各接收器类型的相应注意事项。Below are the individual considerations for various sink types.

Azure SQL 数据库接收器Azure SQL Database sinks

在 Azure SQL 数据库中,大多数情况下默认分区均可正常运行。With Azure SQL Database, the default partitioning should work in most cases. 接收器的分区可能过多,SQL 数据库无法处理。There is a chance that your sink may have too many partitions for your SQL database to handle. 如果遇到这种情况,请减少 SQL 数据库接收器输出的分区数。If you are running into this, reduce the number of partitions outputted by your SQL Database sink.

错误行处理对性能的影响Impact of error row handling to performance

在接收器转换中启用错误行处理(“出现错误时继续运行”)时,ADF 将执行额外的步骤,然后再将兼容行写入目标表。When you enable error row handling ("continue on error") in the sink transformation, ADF will take an additional step before writing the compatible rows to your destination table. 该额外步骤将对性能造成 <5% 的负面影响,如果将选项设置为“再将不兼容的行添加到日志文件”,则还会对性能再造成一些影响。This additional step will have a small performance penalty that can be in the range of 5% added for this step with an additional small performance hit also added if you set the option to also with the incompatible rows to a log file.

使用 SQL 脚本禁用索引Disabling indexes using a SQL Script

在将数据加载到 SQL 数据库中之前兼用索引可以显著优化写入表的性能。Disabling indexes before a load in a SQL database can greatly improve performance of writing to the table. 在写入 SQL 接收器之前,运行以下命令。Run the below command before writing to your SQL sink.

ALTER INDEX ALL ON dbo.[Table Name] DISABLE

写入完成后,使用以下命令重新生成索引:After the write has completed, rebuild the indexes using the following command:

ALTER INDEX ALL ON dbo.[Table Name] REBUILD

可以在映射数据流中的 Azure SQL DB 或 Synapse 接收器内使用 Pre-SQL 和 Post-SQL 脚本来原生完成此操作。These can both be done natively using Pre and Post-SQL scripts within an Azure SQL DB or Synapse sink in mapping data flows.

禁用索引Disable indexes

警告

禁用索引时,数据流可有效地控制数据库,此时查询不太可能成功。When disabling indexes, the data flow is effectively taking control of a database and queries are unlikely to succeed at this time. 因此,大量 ETL 作业会在半夜触发,以避免此冲突。As a result, many ETL jobs are triggered in the middle of the night to avoid this conflict. 有关详细信息,请了解禁用索引的约束For more information, learn about the constraints of disabling indexes

纵向扩展数据库Scaling up your database

在管道运行的前面,计划重设源和接收器 Azure SQL 数据库与数据仓库的大小,以提高吞吐量,并在达到 DTU 限制后尽量减小 Azure 限制。Schedule a resizing of your source and sink Azure SQL DB and DW before your pipeline run to increase the throughput and minimize Azure throttling once you reach DTU limits. 管道执行完成后,根据数据库的正常运行速率重设其大小。After your pipeline execution is complete, resize your databases back to their normal run rate.

Azure Synapse Analytics 接收器Azure Synapse Analytics sinks

写入 Azure Synapse Analytics 时,请确保将“启用暂存”设置为 true。When writing to Azure Synapse Analytics, make sure that Enable staging is set to true. 这样,ADF 即可使用 SQL Copy 命令进行写入,从而高效地批量加载数据。This enables ADF to write using SQL Copy Command which effectively loads the data in bulk. 使用暂存时,需引用 Azure Data Lake Storage Gen2 或 Azure Blob 存储帐户来暂存数据。You will need to reference an Azure Data Lake Storage gen2 or Azure Blob Storage account for staging of the data when using Staging.

除暂存以外,Azure Synapse Analytics 和 Azure SQL 数据库的最佳做法相同。Other than Staging, the same best practices apply to Azure Synapse Analytics as Azure SQL Database.

基于文件的接收器File-based sinks

尽管数据流支持多种文件类型,但 Azure 数据工厂建议使用 Spark 原生 Parquet 格式来实现最佳读写次数。While data flows support a variety of file types, the Azure Data Factory recommends using the Spark-native Parquet format for optimal read and write times.

如果数据均匀分布,则“使用当前分区”将是写入文件最快的分区选项。If the data is evenly distributed, Use current partitioning will be the fastest partitioning option for writing files.

文件名选项File name options

写入文件时,可以选择命名选项,每个选项都会对性能产生影响。When writing files, you have a choice of naming options that each have a performance impact.

接收器选项Sink options

如果选择“默认”选项,则写入速度最快。Selecting the Default option will write the fastest. 每个分区将等同于具有 Spark 默认名称的文件。Each partition will equate to a file with the Spark default name. 如果只读取文件夹中的数据,这会很有用。This is useful if you are just reading from the folder of data.

设置命名模式会将每个分区文件重命名为更便于用户理解的名称。Setting a naming Pattern will rename each partition file to a more user-friendly name. 此操作在写入后进行,相较于选择默认选项要稍慢一些。This operation happens after write and is slightly slower than choosing the default. 每个分区均允许手动命名每个单独的分区。Per partition allows you to name each individual partition manually.

如果某列与数据的输出方式相符,则可以选择“作为列中的数据”。If a column corresponds to how you wish to output the data, you can select As data in column. 如果列分布不均匀,则会重新组合数据并会影响性能。This reshuffles the data and can impact performance if the columns are not evenly distributed.

“输出到单个文件”会将所有数据合并到一个分区中。Output to single file combines all the data into a single partition. 这会导致写入时间过长,尤其是对于大型数据集。This leads to long write times, especially for large datasets. Azure 数据工厂团队强烈反对使用此选项,除非有明确的业务原因。The Azure Data Factory team highly recommends not choosing this option unless there is an explicit business reason to do so.

CosmosDB 接收器CosmosDB sinks

写入 CosmosDB 时,在数据流执行期间更改吞吐量和批大小可以优化性能。When writing to CosmosDB, altering throughput and batch size during data flow execution can improve performance. 这些更改仅在数据流活动运行期间生效,并且将在完成后返回到原始集合设置。These changes only take effect during the data flow activity run and will return to the original collection settings after conclusion.

批大小: 计算数据的大致行大小,并确保行大小 * 批大小的结果小于 200 万。Batch size: Calculate the rough row size of your data, and make sure that row size * batch size is less than two million. 如果小于 200 万,增大批大小可获得更好的吞吐量If it is, increase the batch size to get better throughput

吞吐量: 在此处设置较高的吞吐量能够以更快的速度将文档写入 CosmosDB。Throughput: Set a higher throughput setting here to allow documents to write faster to CosmosDB. 请记住,吞吐量设置越高,RU 成本越高。Keep in mind the higher RU costs based upon a high throughput setting.

写入吞吐量预算: 使用小于每分钟 RU 总数的值。Write Throughput Budget: Use a value which is smaller than total RUs per minute. 如果某个数据流包含大量的 Spark 分区,则设置预算吞吐量可以在这些分区之间实现更好的平衡。If you have a data flow with a high number of Spark partitions, setting a budget throughput will allow more balance across those partitions.

优化转换Optimizing transformations

优化联接、存在和查找Optimizing Joins, Exists, and Lookups

广播Broadcasting

在联接、查找和存在转换中,如果数据流足够小,工作器节点内存可容纳一个数据流或同时容纳两个数据流,则可以通过启用“广播”来优化性能。In joins, lookups, and exists transformations, if one or both data streams are small enough to fit into worker node memory, you can optimize performance by enabling Broadcasting. 广播是指将较小的数据帧发送到群集中的所有节点。Broadcasting is when you send small data frames to all nodes in the cluster. 这样,Spark 引擎即可在不重新组合大型流中的数据的情况下执行联接。This allows for the Spark engine to perform a join without reshuffling the data in the large stream. 默认情况下,Spark 引擎将自动决定是否广播联接的一侧。By default, the Spark engine will automatically decide whether or not to broadcast one side of a join. 如果你熟悉传入的数据并且知道一个流将显著小于另一个流,则可以选择固定广播。If you are familiar with your incoming data and know that one stream will be significantly smaller than the other, you can select Fixed broadcasting. 固定广播会强制 Spark 广播选定的流。Fixed broadcasting forces Spark to broadcast the selected stream.

如果广播数据的大小对于 Spark 节点而言太大,则可能会发生内存不足错误。If the size of the broadcasted data is too large for the Spark node, you may get an out of memory error. 为避免内存不足错误,请使用内存优化群集。To avoid out of memory errors, use memory optimized clusters. 如果在数据流执行期间遇到广播超时,可以关闭广播优化。If you experience broadcast timeouts during data flow executions, you can switch off the broadcast optimization. 但是,这会导致数据流执行速度变慢。However, this will result in slower performing data flows.

联接转换优化Join Transformation optimize

交叉联接Cross joins

如果在联接条件中使用文本值,或者在联接的两侧具有多个匹配,Spark 会将联接作为交叉联接运行。If you use literal values in your join conditions or have multiple matches on both sides of a join, Spark will run the join as a cross join. 交叉联接是完全笛卡尔积,可筛选掉联接的值。A cross join is a full cartesian product that then filters out the joined values. 它比其他联接类型要慢得多。This is significantly slower than other join types. 确保在联接条件的两侧都具有列引用,以避免影响性能。Ensure that you have column references on both sides of your join conditions to avoid the performance impact.

联接前排序Sorting before joins

与 SSIS 等工具中的合并联接不同,联接转换不是强制性的合并联接操作。Unlike merge join in tools like SSIS, the join transformation isn't a mandatory merge join operation. 联接键在转换前无需排序。The join keys don't require sorting prior to the transformation. Azure 数据工厂团队不建议在映射数据流中使用排序转换。The Azure Data Factory team doesn't recommend using Sort transformations in mapping data flows.

窗口转换性能Window transformation performance

窗口转换按列中的值对数据进行分区,你可以在转换设置中通过 over() 子句进行选择。The Window transformation partitions your data by value in columns that you select as part of the over() clause in the transformation settings. 窗口转换中公开了大量非常常用的聚合和分析函数。There are a number of very popular aggregate and analytical functions that are exposed in the Windows transformation. 但如果要为排名 rank() 或行号 rowNumber() 在整个数据集上生成窗口,则建议改用排名转换代理键转换However, if your use case is to generate a window over your entire dataset for the purpose of ranking rank() or row number rowNumber(), it is recommended that you instead use the Rank transformation and the Surrogate Key transformation. 这些转换将再次使用这些函数更好地执行完整数据集操作。Those transformation will perform better again full dataset operations using those functions.

对倾斜的数据重新分区Repartitioning skewed data

某些转换(例如联接和聚合)会重新组合数据分区,有时可能会导致数据倾斜。Certain transformations such as joins and aggregates reshuffle your data partitions and can occasionally lead to skewed data. 数据倾斜意味着数据未均匀地分布于各个分区。Skewed data means that data is not evenly distributed across the partitions. 数据严重倾斜会导致下游转换和接收器写入变慢。Heavily skewed data can lead to slower downstream transformations and sink writes. 可以通过单击监视显示器中的转换在数据流运行的任何时间点检查数据的倾斜度。You can check the skewness of your data at any point in a data flow run by clicking on the transformation in the monitoring display.

倾斜度和峰度Skewness and kurtosis

监视显示器将显示数据如何分布到每个分区,以及倾斜度和峰度两个指标。The monitoring display will show how the data is distributed across each partition along with two metrics, skewness and kurtosis. 倾斜度是数据不对称程度的度量,可以是正、零、负或未定义的值。Skewness is a measure of how asymmetrical the data is and can have a positive, zero, negative, or undefined value. 负倾斜表示左尾比右尾长。Negative skew means the left tail is longer than the right. 峰度是表示数据是重尾分布还是轻尾分布的度量。Kurtosis is the measure of whether the data is heavy-tailed or light-tailed. 高峰度值表示不理想。High kurtosis values are not desirable. 理想的倾斜度范围在 -3 到 3 之间,而理想的峰度范围则是小于 10。Ideal ranges of skewness lie between -3 and 3 and ranges of kurtosis are less than 10. 查看分区图并查看 1 bar 是否明显大于其余部分可轻松理解这些数字。An easy way to interpret these numbers is looking at the partition chart and seeing if 1 bar is significantly larger than the rest.

如果转换后数据未均匀分区,可以使用优化选项卡进行重新分区。If your data is not evenly partitioned after a transformation, you can use the optimize tab to repartition. 重新组合数据需要时间,并且可能无法优化数据流性能。Reshuffling data takes time and may not improve your data flow performance.

提示

如果对数据进行了重新分区,但下游转换重新组合了数据,请在用作联接键的列上使用哈希分区。If you repartition your data, but have downstream transformations that reshuffle your data, use hash partitioning on a column used as a join key.

在管道中使用数据流Using data flows in pipelines

生成具有多个数据流的复杂管道时,逻辑流可能会对时序和成本产生重大影响。When building complex pipelines with multiple data flows, your logical flow can have a big impact on timing and cost. 本部分介绍各体系结构策略的影响。This section covers the impact of different architecture strategies.

并行执行数据流Executing data flows in parallel

如果并行执行多个数据流,则 ADF 会为每个活动启动单独的 Spark 群集。If you execute multiple data flows in parallel, ADF spins up separate Spark clusters for each activity. 这样,每个作业将是独立的,并且将并行运行,但这会导致多个群集同时运行。This allows for each job to be isolated and run in parallel, but will lead to multiple clusters running at the same time.

如果数据流并行执行,建议不启用 Azure IR 生存时间属性,因为该属性会导致多个暖池处于空闲状态。If your data flows execute in parallel, its recommended to not enable the Azure IR time to live property as it will lead to multiple unused warm pools.

提示

你无需在 for each 活动中多次运行同一数据流,而可将数据暂存到数据湖中并使用通配符路径处理单个数据流中的数据。Instead of running the same data flow multiple times in a for each activity, stage your data in a data lake and use wildcard paths to process the data in a single data flow.

按顺序执行数据流Execute data flows sequentially

如果按顺序执行数据流活动,则建议在 Azure IR 配置中设置 TTL。If you execute your data flow activities in sequence, it is recommended that you set a TTL in the Azure IR configuration. ADF 将重用计算资源,从而缩短群集启动时间。ADF will reuse the compute resources resulting in a faster cluster start up time. 每个活动将仍处于独立状态,并将为每个执行接收新的 Spark 上下文。Each activity will still be isolated receive a new Spark context for each execution.

按顺序运行作业可能会花费最长的时间来端到端执行,但可以将逻辑操作完全分开。Running jobs sequentially will likely take the longest time to execute end-to-end, but provides a clean separation of logical operations.

重载单一数据流Overloading a single data flow

如果将所有逻辑放入单一数据流,则 ADF 将在单个 Spark 实例上执行整个作业。If you put all of your logic inside of a single data flow, ADF will execute the entire job on a single Spark instance. 尽管这似乎可以降低成本,但它混合了各种逻辑流,可能难以监视和调试。While this may seem like a way to reduce costs, it mixes together different logical flows and can be difficult to monitor and debug. 如果一个组件失败,则该作业的所有其他部分也会失败。If one component fails, all other parts of the job will fail as well. Azure 数据工厂团队建议按独立的业务逻辑流来整理数据流。The Azure Data Factory team recommends organizing data flows by independent flows of business logic. 如果数据流太大,将其拆分为单独的组件即可更轻松地进行监视和调试。If your data flow becomes too large, splitting it into separates components will make monitoring and debugging easier. 尽管对数据流中的转换数量没有硬性限制,但转换过多会导致作业变得复杂。While there is no hard limit on the number of transformations in a data flow, having too many will make the job complex.

并行执行接收器Execute sinks in parallel

数据流接收器默认以串行方式顺序执行每个接收器,并且会在接收器中遇到错误时使数据流失败。The default behavior of data flow sinks is to execute each sink sequentially, in a serial manner, and to fail the data flow when an error is encountered in the sink. 此外,除非进入数据流属性并为接收器设置不同的优先级,否则所有接收器均默认为同一组。Additionally, all sinks are defaulted to the same group unless you go into the data flow properties and set different priorities for the sinks.

数据流允许你在 UI 设计器的“数据流属性”选项卡中将接收器组合在一起。Data flows allow you to group sinks together into groups from the data flow properties tab in the UI designer. 可以设置接收器的执行顺序,也可以使用同一组号将接收器组合在一起。You can both set the order of execution of your sinks as well as to group sinks together using the same group number. 为帮助管理组,可以要求 ADF 在同一组中运行接收器,以并行运行。To help manage groups, you can ask ADF to run sinks in the same group, to run in parallel.

在管道上,“接收器属性”部分下的“执行数据流”活动是一个开启并行接收器加载的选项。On the pipeline execute data flow activity under the "Sink Properties" section is an option to turn on parallel sink loading. 启用“并行运行”即指示数据流同时(而不是按顺序)写入连接的接收器。When you enable "run in parallel", you are instructing data flows write to connected sinks at the same time rather than in a sequential manner. 为了使用并行选项,接收器必须组合在一起,并且必须通过“新建分支”或“条件拆分”连接到同一流。In order to utilize the parallel option, the sinks must be group together and connected to the same stream via a New Branch or Conditional Split.

后续步骤Next steps

请参阅与性能相关的其他数据流文章:See other Data Flow articles related to performance: