Azure 数据工厂中的管道和活动Pipelines and activities in Azure Data Factory

本文帮助你了解 Azure 数据工厂中的管道和活动,并帮助你利用它们为数据移动和数据处理方案构造端到端数据驱动工作流。This article helps you understand pipelines and activities in Azure Data Factory and use them to construct end-to-end data-driven workflows for your data movement and data processing scenarios.

概述Overview

数据工厂可以包含一个或多个数据管道。A data factory can have one or more pipelines. “管道”是共同执行一项任务的活动的逻辑分组。A pipeline is a logical grouping of activities that together perform a task. 例如,管道可能包含一组引入和清理日志数据的活动,然后在 HDInsight 群集上启动 Spark 作业以分析日志数据。For example, a pipeline could contain a set of activities that ingest and clean log data, and then kick off a Spark job on an HDInsight cluster to analyze the log data. 这样做的好处是,通过管道可以将活动作为一个集来管理,而非单独管理每个活动。The beauty of this is that the pipeline allows you to manage the activities as a set instead of each one individually. 例如,可以部署和计划管道,而不需单独对活动进行操作。For example, you can deploy and schedule the pipeline, instead of the activities independently.

管道中的活动定义对数据执行的操作。The activities in a pipeline define actions to perform on your data. 例如,可使用复制活动将数据从本地 SQL Server 复制到 Azure Blob 存储。For example, you may use a copy activity to copy data from an on-premises SQL Server to an Azure Blob Storage. 然后,使用在 Azure HDInsight 群集上运行 Hive 脚本的 Hive 活动,将 Blob 存储中的数据处理/转换为生成输出数据。Then, use a Hive activity that runs a Hive script on an Azure HDInsight cluster to process/transform data from the blob storage to produce output data. 最后,再使用一个复制活动将输出数据复制到 Azure SQL 数据仓库,基于该仓库构建紧商业智能 (BI) 报告解决方案。Finally, use a second copy activity to copy the output data to an Azure SQL Data Warehouse on top of which business intelligence (BI) reporting solutions are built.

数据工厂包含三组活动:数据移动活动数据转换活动控制活动Data Factory has three groupings of activities: data movement activities, data transformation activities, and control activities. 每个活动可获取零个或多个输入数据集,并生成一个或多个输出数据集An activity can take zero or more input datasets and produce one or more output datasets. 下图显示了数据工厂中管道、活动和数据集之间的关系:The following diagram shows the relationship between pipeline, activity, and dataset in Data Factory:

数据集、活动和管道之间的关系

输入数据集表示管道中活动的输入,输出数据集表示活动的输出。An input dataset represents the input for an activity in the pipeline and an output dataset represents the output for the activity. 数据集可识别不同数据存储(如表、文件、文件夹和文档)中的数据。Datasets identify data within different data stores, such as tables, files, folders, and documents. 创建数据集后,可将其与管道中的活动一起使用。After you create a dataset, you can use it with activities in a pipeline. 例如,数据集可以是复制活动或 HDInsightHive 活动的输入/输出数据集。For example, a dataset can be an input/output dataset of a Copy Activity or an HDInsightHive Activity. 有关数据集的详细信息,请参阅 Azure 数据工厂中的数据集一文。For more information about datasets, see Datasets in Azure Data Factory article.

数据移动活动Data movement activities

数据工厂中的复制活动可以将数据从源数据存储复制到接收器数据存储。Copy Activity in Data Factory copies data from a source data store to a sink data store. 数据工厂支持本部分中的表中列出的数据存储。Data Factory supports the data stores listed in the table in this section. 来自任何源的数据都可以写入到任何接收器。Data from any source can be written to any sink. 单击某个数据存储即可了解如何将数据复制到该存储,以及如何从该存储复制数据。Click a data store to learn how to copy data to and from that store.

CategoryCategory 数据存储Data store 支持用作源Supported as a source 支持用作接收器Supported as a sink Azure IR 支持Supported by Azure IR 自承载 IR 支持Supported by self-hosted IR
AzureAzure Azure Blob 存储Azure Blob storage
  Azure Cosmos DB (SQL API)Azure Cosmos DB (SQL API)
  Azure Cosmos DB 的 API for MongoDBAzure Cosmos DB's API for MongoDB
  Azure 数据资源管理器Azure Data Explorer
  Azure Data Lake Storage Gen2Azure Data Lake Storage Gen2
  Azure Database for MariaDBAzure Database for MariaDB
  Azure Database for MySQLAzure Database for MySQL
  Azure Database for PostgreSQLAzure Database for PostgreSQL
  Azure 文件存储Azure File Storage
  Azure SQL 数据库Azure SQL Database
  Azure SQL 数据库托管实例Azure SQL Database Managed Instance
  Azure Synapse Analytics(以前称为 SQL 数据仓库)Azure Synapse Analytics (formerly SQL Data Warehouse)
  Azure 表存储Azure Table storage
数据库Database Amazon RedshiftAmazon Redshift
  DB2DB2
  DrillDrill
  Google BigQueryGoogle BigQuery
  GreenplumGreenplum
  HBaseHBase
  HiveHive
  Apache ImpalaApache Impala
  InformixInformix
  MariaDBMariaDB
  Microsoft AccessMicrosoft Access
  MySQLMySQL
  NetezzaNetezza
  OracleOracle
  PhoenixPhoenix
  PostgreSQLPostgreSQL
  Presto(预览)Presto (Preview)
  通过 Open Hub 实现的 SAP Business WarehouseSAP Business Warehouse via Open Hub
  通过 MDX 实现的 SAP Business WarehouseSAP Business Warehouse via MDX
  SAP HANASAP HANA
  SAP 表SAP table
  SparkSpark
  SQL ServerSQL Server
  SybaseSybase
  TeradataTeradata
  VerticaVertica
NoSQLNoSQL CassandraCassandra
  Couchbase(预览)Couchbase (Preview)
  MongoDBMongoDB
文件File Amazon S3Amazon S3
  文件系统File system
  FTPFTP
  Google Cloud StorageGoogle Cloud Storage
  HDFSHDFS
  SFTPSFTP
通用协议Generic protocol 泛型 HTTPGeneric HTTP
  泛型 ODataGeneric OData
  泛型 ODBCGeneric ODBC
  泛型 RESTGeneric REST
服务和应用Services and apps Amazon Marketplace Web ServiceAmazon Marketplace Web Service
  Common Data ServiceCommon Data Service
  Concur(预览)Concur (Preview)
  Dynamics 365Dynamics 365
  Dynamics AXDynamics AX
  Dynamics CRMDynamics CRM
  Google AdWordsGoogle AdWords
  HubSpot(预览)HubSpot (Preview)
  JiraJira
  Magento(预览)Magento (Preview)
  Marketo(预览)Marketo (Preview)
  Office 365Office 365
  Oracle Eloqua(预览)Oracle Eloqua (Preview)
  Oracle Responsys(预览)Oracle Responsys (Preview)
  Oracle 服务云(预览)Oracle Service Cloud (Preview)
  PayPal(预览)PayPal (Preview)
  QuickBooks(预览)QuickBooks (Preview)
  SalesforceSalesforce
  Salesforce 服务云Salesforce Service Cloud
  Salesforce Marketing CloudSalesforce Marketing Cloud
  SAP Cloud for Customer (C4C)SAP Cloud for Customer (C4C)
  SAP ECCSAP ECC
  ServiceNowServiceNow
  Shopify(预览)Shopify (Preview)
  Square(预览)Square (Preview)
  Web 表(HTML 表)Web table (HTML table)
  XeroXero
  Zoho(预览)Zoho (Preview)

Note

如果连接器标记为“预览” ,可以试用它并给我们反馈。If a connector is marked Preview, you can try it out and give us feedback. 若要在解决方案中使用预览版连接器的依赖项,请联系 Azure 支持If you want to take a dependency on preview connectors in your solution, contact Azure support.

有关详细信息,请参阅复制活动 - 概述一文。For more information, see Copy Activity - Overview article.

数据转换活动Data transformation activities

Azure 数据工厂支持以下转换活动,这些活动既可以单独添加到管道,也可以与其他活动关联在一起添加。Azure Data Factory supports the following transformation activities that can be added to pipelines either individually or chained with another activity.

数据转换活动Data transformation activity 计算环境Compute environment
HiveHive HDInsight [Hadoop]HDInsight [Hadoop]
PigPig HDInsight [Hadoop]HDInsight [Hadoop]
MapReduceMapReduce HDInsight [Hadoop]HDInsight [Hadoop]
Hadoop 流式处理Hadoop Streaming HDInsight [Hadoop]HDInsight [Hadoop]
SparkSpark HDInsight [Hadoop]HDInsight [Hadoop]
存储过程Stored Procedure Azure SQL、Azure SQL 数据仓库或 SQL ServerAzure SQL, Azure SQL Data Warehouse, or SQL Server
自定义活动Custom Activity Azure BatchAzure Batch

有关详细信息,请参阅数据转换活动一文。For more information, see the data transformation activities article.

控制流活动Control flow activities

支持以下控制流活动:The following control flow activities are supported:

控制活动Control activity 说明Description
追加变量Append Variable 向现有数组变量中添加值。Add a value to an existing array variable.
执行管道Execute Pipeline Execute Pipeline 活动允许数据工厂管道调用另一个管道。Execute Pipeline activity allows a Data Factory pipeline to invoke another pipeline.
FilterFilter 将筛选器表达式应用于输入数组Apply a filter expression to an input array
对每一个For Each ForEach 活动在管道中定义重复的控制流。ForEach Activity defines a repeating control flow in your pipeline. 此活动用于循环访问集合,并在循环中执行指定的活动。This activity is used to iterate over a collection and executes specified activities in a loop. 此活动的循环实现类似于采用编程语言的 Foreach 循环结构。The loop implementation of this activity is similar to Foreach looping structure in programming languages.
获取元数据Get Metadata GetMetadata 活动可用于检索 Azure 数据工厂中的任何数据的元数据。GetMetadata activity can be used to retrieve metadata of any data in Azure Data Factory.
If Condition 活动If Condition Activity If Condition 可用于基于计算结果为 true 或 false 的条件进行分支。The If Condition can be used to branch based on condition that evaluates to true or false. If Condition 活动可提供 if 语句在编程语言中提供相同的功能。The If Condition activity provides the same functionality that an if statement provides in programming languages. 当条件计算结果为 true 时,它会计算一组活动,当条件计算结果为 false 时,它会计算另一组活动。It evaluates a set of activities when the condition evaluates to true and another set of activities when the condition evaluates to false.
Lookup 活动Lookup Activity Lookup 活动可用于从任何外部源读取或查找记录/表名称/值。Lookup Activity can be used to read or look up a record/ table name/ value from any external source. 此输出可进一步由后续活动引用。This output can further be referenced by succeeding activities.
设置变量Set Variable 设置现有变量的值。Set the value of an existing variable.
Until 活动Until Activity 实现类似于采用编程语言的 Do-Until 循环结构的 Do-Until 循环。Implements Do-Until loop that is similar to Do-Until looping structure in programming languages. 它在循环中将执行一组活动,直到与活动相关联的条件的计算结果为 true。It executes a set of activities in a loop until the condition associated with the activity evaluates to true. 你可以在数据工厂中为 Until 活动指定超时值。You can specify a timeout value for the until activity in Data Factory.
验证活动Validation Activity 确保管道仅在存在引用数据集、满足指定条件或已超时时才继续执行。Ensure a pipeline only continues execution if a reference dataset exists, meets a specified criteria, or a timeout has been reached.
Wait 活动Wait Activity 在管道中使用等待活动时,管道将等待一段指定的时间,然后继续执行后续活动。When you use a Wait activity in a pipeline, the pipeline waits for the specified period of time before continuing with execution of subsequent activities.
Web 活动Web Activity Web 活动可用于从数据工厂管道调用自定义的 REST 终结点。Web Activity can be used to call a custom REST endpoint from a Data Factory pipeline. 可以传递数据集和链接服务以供活动使用和访问。You can pass datasets and linked services to be consumed and accessed by the activity.
Webhook 活动Webhook Activity 使用 Webhook 活动,调用终结点并传递回调 URL。Using the webhook activity, call an endpoint and pass a callback URL. 管道运行在继续下一个活动之前,等待调用回调。The pipeline run waits for the callback to be invoked before proceeding to the next activity.

管道 JSONPipeline JSON

下面介绍如何以 JSON 格式定义管道:Here is how a pipeline is defined in JSON format:

{
    "name": "PipelineName",
    "properties":
    {
        "description": "pipeline description",
        "activities":
        [
        ],
        "parameters": {
        },
        "concurrency": <your max pipeline concurrency>,
        "annotations": [
        ]
    }
}
标记Tag 说明Description 类型Type 必须Required
namename 管道的名称。Name of the pipeline. 指定一个名称,它表示管道要执行的操作。Specify a name that represents the action that the pipeline performs.
  • 最大字符数:140Maximum number of characters: 140
  • 必须以字母、数字或下划线 (_) 开头Must start with a letter, number, or an underscore (_)
  • 不允许使用以下字符:“.”、“+”、“?”、“/”、“<”、“>”、“*”、“%”、“&”、“:”、“\”Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
StringString Yes
descriptiondescription 指定描述管道用途的文本。Specify the text describing what the pipeline is used for. StringString No
活动activities activities 节中可定义有一个或多个活动。The activities section can have one or more activities defined within it. 请参阅活动 JSON 一节,以了解有关活动 JSON 元素的详细信息。See the Activity JSON section for details about the activities JSON element. ArrayArray Yes
参数parameters 参数部分可在在管道内定义一个或多个参数,使你的管道能够灵活地重复使用。The parameters section can have one or more parameters defined within the pipeline, making your pipeline flexible for reuse. 列出List No
concurrencyconcurrency 管道可以具有的最大并发运行数。The maximum number of concurrent runs the pipeline can have. 默认情况下,没有最大值。By default, there is no maximum. 如果达到并发限制,则附加管道运行将排队,直到较早的管道完成为止If the concurrency limit is reached, additional pipeline runs will be queued until earlier ones complete NumberNumber No
annotationsannotations 与管道关联的标记的列表A list of tags associated with the pipeline ArrayArray No

活动 JSONActivity JSON

activities 节中可定义有一个或多个活动。The activities section can have one or more activities defined within it. 有两种主要类型的活动:执行和控制活动。There are two main types of activities: Execution and Control Activities.

执行活动Execution activities

执行活动包括数据移动数据转换活动Execution activities include data movement and data transformation activities. 它们具有以下顶级结构:They have the following top-level structure:

{
    "name": "Execution Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "linkedServiceName": "MyLinkedService",
    "policy":
    {
    },
    "dependsOn":
    {
    }
}

下表描述了活动 JSON 定义中的属性:Following table describes properties in the activity JSON definition:

标记Tag 说明Description 必须Required
namename 活动的名称。Name of the activity. 指定一个名称,它表示活动要执行的操作。Specify a name that represents the action that the activity performs.
  • 最大字符数:55Maximum number of characters: 55
  • 必须以字母、数字或下划线 (_) 开头Must start with a letter number, or an underscore (_)
  • 不允许使用以下字符:“.”、“+”、“?”、“/”、“<”、“>”、“*”、“%”、“&”、“:”、“\”Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
Yes
descriptiondescription 描述活动用途的文本Text describing what the activity or is used for Yes
typetype 活动的类型。Type of the activity. 有关不同的活动类型,请参阅数据移动活动数据转换活动控制活动部分。See the Data Movement Activities, Data Transformation Activities, and Control Activities sections for different types of activities. Yes
linkedServiceNamelinkedServiceName 活动使用的链接服务的名称。Name of the linked service used by the activity.

活动可能需要你指定链接到所需计算环境的链接服务。An activity may require that you specify the linked service that links to the required compute environment.
对于 HDInsight 活动、存储过程活动为“是”。Yes for HDInsight Activity, Stored Procedure Activity.

对其他活动均非必需No for all others
typePropertiestypeProperties typeProperties 部分的属性取决于每个活动类型。Properties in the typeProperties section depend on each type of activity. 要查看活动的类型属性,请单击链接转到上一节中的活动。To see type properties for an activity, click links to the activity in the previous section. No
policypolicy 影响活动运行时行为的策略。Policies that affect the run-time behavior of the activity. 该属性包括超时和重试行为。This property includes timeout and retry behavior. 如果未指定,将使用默认值。If it is not specified, default values are used. 有关详细信息,请参阅活动策略部分。For more information, see Activity policy section. No
dependsOndependsOn 该属性用于定义活动依赖项,以及后续活动对以前活动的依赖方式。This property is used to define activity dependencies, and how subsequent activities depend on previous activities. 有关详细信息,请参阅活动依赖项For more information, see Activity dependency No

活动策略Activity policy

影响活动运行时行为的策略,提供可配置性选项。Policies affect the run-time behavior of an activity, giving configurability options. 活动策略仅对执行活动可用。Activity Policies are only available for execution activities.

活动策略 JSON 定义Activity policy JSON definition

{
    "name": "MyPipelineName",
    "properties": {
      "activities": [
        {
          "name": "MyCopyBlobtoSqlActivity",
          "type": "Copy",
          "typeProperties": {
            ...
          },
         "policy": {
            "timeout": "00:10:00",
            "retry": 1,
            "retryIntervalInSeconds": 60,
            "secureOutput": true
         }
        }
      ],
        "parameters": {
           ...
        }
    }
}
JSON 名称JSON name 说明Description 允许的值Allowed Values 必须Required
timeouttimeout 指定活动运行的超时。Specifies the timeout for the activity to run. TimespanTimespan 否。No. 默认超时为 7 天。Default timeout is 7 days.
retryretry 最大重试次数Maximum retry attempts IntegerInteger 否。No. 默认值为 0Default is 0
retryIntervalInSecondsretryIntervalInSeconds 重试之间的延迟(以秒为单位)The delay between retry attempts in seconds IntegerInteger 否。No. 默认为 30 秒Default is 30 seconds
secureOutputsecureOutput 当设置为 true 时,来自活动的输出被视为安全的,不会记录到监视中。When set to true, output from activity is considered as secure and will not be logged to monitoring. 布尔Boolean 否。No. 默认值为 false。Default is false.

控制活动Control activity

控制活动具有以下顶级结构:Control activities have the following top-level structure:

{
    "name": "Control Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "dependsOn":
    {
    }
}
标记Tag 说明Description 必须Required
namename 活动的名称。Name of the activity. 指定一个名称,它表示活动要执行的操作。Specify a name that represents the action that the activity performs.
  • 最大字符数:55Maximum number of characters: 55
  • 必须以字母、数字或下划线 (_) 开头Must start with a letter number, or an underscore (_)
  • 不允许使用以下字符:“.”、“+”、“?”、“/”、“<”、“>”、“*”、“%”、“&”、“:”、“\”Following characters are not allowed: “.”, “+”, “?”, “/”, “<”,”>”,”*”,”%”,”&”,”:”,”\”
Yes
    descriptiondescription 描述活动用途的文本Text describing what the activity or is used for Yes
    typetype 活动的类型。Type of the activity. 有关不同的活动类型,请参阅数据移动活动数据转换活动控制活动部分。See the data movement activities, data transformation activities, and control activities sections for different types of activities. Yes
    typePropertiestypeProperties typeProperties 部分的属性取决于每个活动类型。Properties in the typeProperties section depend on each type of activity. 要查看活动的类型属性,请单击链接转到上一节中的活动。To see type properties for an activity, click links to the activity in the previous section. No
    dependsOndependsOn 该属性用于定义活动依赖项,以及后续活动对以前活动的依赖方式。This property is used to define Activity Dependency, and how subsequent activities depend on previous activities. 有关详细信息,请参阅活动依赖项For more information, see activity dependency. No

    活动依赖项Activity dependency

    活动依赖项定义后续活动对以前活动的依赖方式,确定是否继续执行下一个任务的条件。Activity Dependency defines how subsequent activities depend on previous activities, determining the condition whether to continue executing the next task. 活动可能依赖于具有不同依赖项条件的一个或多个以前的活动。An activity can depend on one or multiple previous activities with different dependency conditions.

    不同依赖项条件有:成功、失败、跳过、完成。The different dependency conditions are: Succeeded, Failed, Skipped, Completed.

    例如,如果管道具有活动 A -> 活动 B,则可能发生的不同情况是:For example, if a pipeline has Activity A -> Activity B, the different scenarios that can happen are:

    • 活动 B 对活动 A 具有依赖项条件“成功” :只有活动 A 的最终状态为“成功”,活动 B 才运行Activity B has dependency condition on Activity A with succeeded: Activity B only runs if Activity A has a final status of succeeded
    • 活动 B 对活动 A 具有依赖项条件“失败” :只有活动 A 的最终状态为“失败”,活动 B 才运行Activity B has dependency condition on Activity A with failed: Activity B only runs if Activity A has a final status of failed
    • 活动 B 对活动 A 具有依赖项条件“完成” :如果活动 A 的最终状态为“成功”或“失败”,则活动 B 运行Activity B has dependency condition on Activity A with completed: Activity B runs if Activity A has a final status of succeeded or failed
    • 活动 B 对活动 A 具有依赖项条件“跳过” :如果活动 A 的最终状态为“跳过”,则活动 B 运行。Activity B has dependency condition on Activity A with skipped: Activity B runs if Activity A has a final status of skipped. 在活动 X -> 活动 Y -> 活动 Z 的情况下出现跳过,其中每个活动仅在以前的活动成功后才运行。Skipped occurs in the scenario of Activity X -> Activity Y -> Activity Z, where each activity runs only if the previous activity succeeds. 如果活动 X 失败,则活动 Y 的状态为“跳过”,因为它从不执行。If Activity X fails, then Activity Y has a status of “Skipped” because it never executes. 类似,活动 Z 的状态也为“跳过”。Similarly, Activity Z has a status of “Skipped” as well.

    示例:活动 2 是否运行取决于活动 1 是否成功运行Example: Activity 2 depends on the Activity 1 succeeding

    {
        "name": "PipelineName",
        "properties":
        {
            "description": "pipeline description",
            "activities": [
             {
                "name": "MyFirstActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                }
            },
            {
                "name": "MySecondActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                },
                "dependsOn": [
                {
                    "activity": "MyFirstActivity",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
              ]
            }
          ],
          "parameters": {
           }
        }
    }
    
    

    复制管道示例Sample copy pipeline

    在以下示例管道中,activities 节有一个 Copy 类型的活动。In the following sample pipeline, there is one activity of type Copy in the activities section. 在此示例中,复制活动将 Azure Blob 存储中的数据复制到 Azure SQL 数据库。In this sample, the copy activity copies data from an Azure Blob storage to an Azure SQL database.

    {
      "name": "CopyPipeline",
      "properties": {
        "description": "Copy data from a blob to Azure SQL table",
        "activities": [
          {
            "name": "CopyFromBlobToSQL",
            "type": "Copy",
            "inputs": [
              {
                "name": "InputDataset"
              }
            ],
            "outputs": [
              {
                "name": "OutputDataset"
              }
            ],
            "typeProperties": {
              "source": {
                "type": "BlobSource"
              },
              "sink": {
                "type": "SqlSink",
                "writeBatchSize": 10000,
                "writeBatchTimeout": "60:00:00"
              }
            },
            "policy": {
              "retry": 2,
              "timeout": "01:00:00"
            }
          }
        ]
      }
    }
    

    请注意以下几点:Note the following points:

    • 在 activities 节中,只有一个活动的 type 设置为 CopyIn the activities section, there is only one activity whose type is set to Copy.
    • 活动的输入设置为 InputDataset,活动的输出设置为 OutputDatasetInput for the activity is set to InputDataset and output for the activity is set to OutputDataset. 有关在 JSON 中定义数据集的信息,请参阅数据集文章。See Datasets article for defining datasets in JSON.
    • typeProperties 节中,BlobSource 指定为源类型,SqlSink 指定为接收器类型。In the typeProperties section, BlobSource is specified as the source type and SqlSink is specified as the sink type. 数据移动活动部分,单击想要用作源或接收器的数据存储,以了解有关将数据移动到该数据存储或从该数据存储移出的详细信息。In the data movement activities section, click the data store that you want to use as a source or a sink to learn more about moving data to/from that data store.

    有关创建此管道的完整演练,请参阅快速入门:创建数据工厂For a complete walkthrough of creating this pipeline, see Quickstart: create a data factory.

    转换管道示例Sample transformation pipeline

    在以下示例管道中,activities 节有一个 HDInsightHive 类型的活动。In the following sample pipeline, there is one activity of type HDInsightHive in the activities section. 在此示例中,HDInsight Hive 活动通过在 Azure HDInsight Hadoop 群集上运行 Hive 脚本文件,转换 Azure Blob 存储中的数据。In this sample, the HDInsight Hive activity transforms data from an Azure Blob storage by running a Hive script file on an Azure HDInsight Hadoop cluster.

    {
        "name": "TransformPipeline",
        "properties": {
            "description": "My first Azure Data Factory pipeline",
            "activities": [
                {
                    "type": "HDInsightHive",
                    "typeProperties": {
                        "scriptPath": "adfgetstarted/script/partitionweblogs.hql",
                        "scriptLinkedService": "AzureStorageLinkedService",
                        "defines": {
                            "inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.chinacloudapi.cn/inputdata",
                            "partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.chinacloudapi.cn/partitioneddata"
                        }
                    },
                    "inputs": [
                        {
                            "name": "AzureBlobInput"
                        }
                    ],
                    "outputs": [
                        {
                            "name": "AzureBlobOutput"
                        }
                    ],
                    "policy": {
                        "retry": 3
                    },
                    "name": "RunSampleHiveActivity",
                    "linkedServiceName": "HDInsightOnDemandLinkedService"
                }
            ]
        }
    }
    

    请注意以下几点:Note the following points:

    • 在 activities 节中,只有一个活动的 type 设置为 HDInsightHiveIn the activities section, there is only one activity whose type is set to HDInsightHive.
    • 配置单元脚本文件,partitionweblogs.hql,被存储在 Azure 存储帐户中(由 scriptLinkedService 指定,调用 AzureStorageLinkedService),并位于容器 adfgetstarted 中的脚本文件夹中。The Hive script file, partitionweblogs.hql, is stored in the Azure storage account (specified by the scriptLinkedService, called AzureStorageLinkedService), and in script folder in the container adfgetstarted.
    • defines 部分用于指定以配置单元配置值传递到配置单元脚本的运行时设置(例如,${hiveconf:inputtable}${hiveconf:partitionedtable})。The defines section is used to specify the runtime settings that are passed to the hive script as Hive configuration values (for example, ${hiveconf:inputtable}, ${hiveconf:partitionedtable}).

    每个转换活动的 typeProperties 节都不同。The typeProperties section is different for each transformation activity. 若要了解有关转换活动所支持的类型属性的详细信息,请单击数据转换活动中的转换活动。To learn about type properties supported for a transformation activity, click the transformation activity in the Data transformation activities.

    有关创建此管道的完整演练,请参阅教程:使用 Spark 转换数据For a complete walkthrough of creating this pipeline, see Tutorial: transform data using Spark.

    管道中的多个活动Multiple activities in a pipeline

    前两个示例管道中只有一个活动。The previous two sample pipelines have only one activity in them. 可在管道中添加多个活动。You can have more than one activity in a pipeline. 如果在管道中具有多个活动且后续活动不依赖于以前的活动,则活动可能并行运行。If you have multiple activities in a pipeline and subsequent activities are not dependent on previous activities, the activities may run in parallel.

    可以使用活动依赖项将两个活动进行链接,以定义后续活动对以前活动的依赖方式,确定是否继续执行下一个任务的条件。You can chain two activities by using activity dependency, which defines how subsequent activities depend on previous activities, determining the condition whether to continue executing the next task. 活动可以依赖于具有不同依赖项条件的一个或多个以前的活动。An activity can depend on one or more previous activities with different dependency conditions.

    计划管道Scheduling pipelines

    管道由触发器计划Pipelines are scheduled by triggers. 有不同类型的触发器(允许管道在时钟计划上触发的计划触发器,以及按需触发管道的手动触发器)。There are different types of triggers (scheduler trigger, which allows pipelines to be triggered on a wall-clock schedule, as well as manual trigger, which triggers pipelines on-demand). 有关触发器的详细信息,请参阅管道执行和触发器一文。For more information about triggers, see pipeline execution and triggers article.

    若要使你的触发器启动管道运行,必须包含对触发器定义中的特定管道的管道引用。To have your trigger kick off a pipeline run, you must include a pipeline reference of the particular pipeline in the trigger definition. 管道和触发器具有 n-m 关系。Pipelines & triggers have an n-m relationship. 多个触发器可以启动单个管道,并且同一个触发器可以启动多个管道。Multiple triggers can kick off a single pipeline and the same trigger can kick off multiple pipelines. 定义管道后,必须启动触发器,以使其开始触发管道。Once the trigger is defined, you must start the trigger to have it start triggering the pipeline. 有关触发器的详细信息,请参阅管道执行和触发器一文。For more information about triggers, see pipeline execution and triggers article.

    例如,假设有一个计划触发器“触发器 A”,我希望使用该触发器启动我的管道“MyCopyPipeline”。For example, say you have a scheduler trigger, "Trigger A" that I wish to kick off my pipeline, "MyCopyPipeline". 定义该触发器,如以下示例中所示:You define the trigger as shown in the following example:

    触发器 A 定义Trigger A definition

    {
      "name": "TriggerA",
      "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
          ...
          }
        },
        "pipeline": {
          "pipelineReference": {
            "type": "PipelineReference",
            "referenceName": "MyCopyPipeline"
          },
          "parameters": {
            "copySourceName": "FileSource"
          }
        }
      }
    }
    

    后续步骤Next steps

    参阅以下教程,了解创建包含以下活动的管道的分步说明:See the following tutorials for step-by-step instructions for creating pipelines with activities: