快速入门:使用 Python 创建 Azure 数据工厂和管道Quickstart: Create an Azure Data Factory and pipeline using Python

此快速入门教程介绍了如何使用 Python 创建一个 Azure 数据工厂。This quickstart describes how to use Python to create an Azure data factory. 该数据工厂中的管道将数据从 Azure blob 存储中的一个文件夹复制到另一个文件夹。The pipeline in this data factory copies data from one folder to another folder in an Azure blob storage.

Azure 数据工厂是基于云的数据集成服务,用于在云中创建数据驱动型工作流,以便协调和自动完成数据移动和数据转换。Azure Data Factory is a cloud-based data integration service that allows you to create data-driven workflows in the cloud for orchestrating and automating data movement and data transformation. 使用 Azure 数据工厂,可以创建和计划数据驱动型工作流(称为管道),以便从不同的数据存储引入数据,通过各种计算服务(例如 Azure HDInsight Hadoop、Spark)处理/转换数据,将输出数据发布到数据存储(例如 Azure SQL 数据仓库),供商业智能 (BI) 应用程序使用。Using Azure Data Factory, you can create and schedule data-driven workflows (called pipelines) that can ingest data from disparate data stores, process/transform the data by using compute services such as Azure HDInsight Hadoop, Spark, and publish output data to data stores such as Azure SQL Data Warehouse for business intelligence (BI) applications to consume.

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户。If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

  • Azure 存储帐户Azure Storage account. 可以将 blob 存储用作接收器数据存储。You use the blob storage as source and sink data store. 如果没有 Azure 存储帐户,请参阅创建存储帐户一文来获取创建步骤。If you don't have an Azure storage account, see the Create a storage account article for steps to create one.
  • 按照此说明在 Azure Active Directory 中创建应用程序Create an application in Azure Active Directory following this instruction. 记下要在后续步骤中使用的以下值:应用程序 ID身份验证密钥租户 IDMake note of the following values that you use in later steps: application ID, authentication key, and tenant ID. 根据同一文章中的以下说明将应用程序分配到“参与者”角色。 Assign application to "Contributor" role by following instructions in the same article.

创建并上传输入文件Create and upload an input file

  1. 启动记事本。Launch Notepad. 复制以下文本并在磁盘上将其另存为 input.txt 文件。Copy the following text and save it as input.txt file on your disk.

    John|Doe
    Jane|Doe
    
  2. 使用 Azure 存储资源管理器等工具创建 adfv2tutorial 容器,并在该容器中创建 input 文件夹。Use tools such as Azure Storage Explorer to create the adfv2tutorial container, and input folder in the container. 然后,将 input.txt 文件上传到 input 文件夹。Then, upload the input.txt file to the input folder.

安装 Python 包Install the Python package

  1. 使用管理员特权打开一个终端或命令提示符。Open a terminal or command prompt with administrator privileges. 

  2. 首先,安装 Azure 管理资源的 Python 包:First, install the Python package for Azure management resources:

    pip install azure-mgmt-resource
    
  3. 若要为数据工厂安装 Python 包,请运行以下命令:To install the Python package for Data Factory, run the following command:

    pip install azure-mgmt-datafactory
    

    用于数据工厂的 Python SDK 支持 Python 2.7、3.3、3.4、3.5、3.6 和 3.7。The Python SDK for Data Factory supports Python 2.7, 3.3, 3.4, 3.5, 3.6 and 3.7.

创建数据工厂客户端Create a data factory client

  1. 创建一个名为 datafactory.py 的文件。Create a file named datafactory.py. 添加以下语句来添加对命名空间的引用。Add the following statements to add references to namespaces.

    from azure.common.credentials import ServicePrincipalCredentials
    from azure.mgmt.resource import ResourceManagementClient
    from azure.mgmt.datafactory import DataFactoryManagementClient
    from azure.mgmt.datafactory.models import *
    from datetime import datetime, timedelta
    import time
    
  2. 添加用于打印信息的以下函数。Add the following functions that print information.

    def print_item(group):
        """Print an Azure object instance."""
        print("\tName: {}".format(group.name))
        print("\tId: {}".format(group.id))
        if hasattr(group, 'location'):
            print("\tLocation: {}".format(group.location))
        if hasattr(group, 'tags'):
            print("\tTags: {}".format(group.tags))
        if hasattr(group, 'properties'):
            print_properties(group.properties)
    
    def print_properties(props):
        """Print a ResourceGroup properties instance."""
        if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
            print("\tProperties:")
            print("\t\tProvisioning State: {}".format(props.provisioning_state))
        print("\n\n")
    
    def print_activity_run_details(activity_run):
        """Print activity run details."""
        print("\n\tActivity run details\n")
        print("\tActivity run status: {}".format(activity_run.status))
        if activity_run.status == 'Succeeded':
            print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
            print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
            print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
        else:
            print("\tErrors: {}".format(activity_run.error['message']))
    
  3. Main 方法中添加用于创建 DataFactoryManagementClient 类的实例的以下代码。Add the following code to the Main method that creates an instance of DataFactoryManagementClient class. 将使用此对象来创建数据工厂、链接服务、数据集和管道。You use this object to create the data factory, linked service, datasets, and pipeline. 还将使用此对象来监视管道运行详细信息。You also use this object to monitor the pipeline run details. subscription_id 变量设置为 Azure 订阅的 ID。Set subscription_id variable to the ID of your Azure subscription. 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :各区域的产品可用性For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    def main():
    
        # Azure subscription ID
        subscription_id = '<Specify your Azure Subscription ID>'
    
        # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
        rg_name = 'ADFTutorialResourceGroup'
    
        # The data factory name. It must be globally unique.
        df_name = '<Specify a name for the data factory. It must be globally unique>'
    
        # Specify your Active Directory client ID, client secret, and tenant ID
        credentials = ServicePrincipalCredentials(client_id='<Active Directory application/client ID>', secret='<client secret>', tenant='<Active Directory tenant ID>' , china='true')
        resource_client = ResourceManagementClient(credentials, subscription_id , base_url='https://management.chinacloudapi.cn')
        adf_client = DataFactoryManagementClient(credentials, subscription_id , base_url='https://management.chinacloudapi.cn')
    
        rg_params = {'location':'chinaeast2'}
        df_params = {'location':'chinaeast2'}
    

创建数据工厂Create a data factory

Main 方法中添加用于创建数据工厂的以下代码。Add the following code to the Main method that creates a data factory. 如果资源组已存在,请注释掉第一个 create_or_update 语句。If your resource group already exists, comment out the first create_or_update statement.

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    #Create a data factory
    df_resource = Factory(location='chinaeast2')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

创建链接服务Create a linked service

Main 方法中添加用于创建 Azure 存储链接服务的以下代码。Add the following code to the Main method that creates an Azure Storage linked service.

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在此快速入门中,只需创建一个同时作为复制源和接收器存储的 Azure 存储链接服务,在示例中名为“AzureStorageLinkedService”。In this quickstart, you only need create one Azure Storage linked service as both copy source and sink store, named "AzureStorageLinkedService" in the sample. <storageaccountname><storageaccountkey> 替换为 Azure 存储帐户的名称和密钥。Replace <storageaccountname> and <storageaccountkey> with name and key of your Azure Storage account.

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # IMPORTANT: specify the name and key of your Azure Storage account.
    storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<storageaccountname>;AccountKey=<storageaccountkey>;EndpointSuffix=core.chinacloudapi.cn')

    ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

创建数据集Create datasets

在本部分中创建两个数据集:一个用于源,另一个用于接收器。In this section, you create two datasets: one for the source and the other for the sink.

为源 Azure Blob 创建数据集Create a dataset for source Azure Blob

向 Main 方法中添加用于创建 Azure blob 数据集的以下代码。Add the following code to the Main method that creates an Azure blob dataset. 有关 Azure Blob 数据集的属性的信息,请参阅 Azure Blob 连接器一文。For information about properties of Azure Blob dataset, see Azure blob connector article.

你将在 Azure Blob 中定义表示源数据的数据集。You define a dataset that represents the source data in Azure Blob. 此 Blob 数据集引用在上一步中创建的 Azure 存储链接服务。This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path= 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob= AzureBlobDataset(linked_service_name=ds_ls, folder_path=blob_path, file_name = blob_filename)
    ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

为接收器 Azure Blob 创建数据集Create a dataset for sink Azure Blob

向 Main 方法中添加用于创建 Azure blob 数据集的以下代码。Add the following code to the Main method that creates an Azure blob dataset. 有关 Azure Blob 数据集的属性的信息,请参阅 Azure Blob 连接器一文。For information about properties of Azure Blob dataset, see Azure blob connector article.

你将在 Azure Blob 中定义表示源数据的数据集。You define a dataset that represents the source data in Azure Blob. 此 Blob 数据集引用在上一步中创建的 Azure 存储链接服务。This Blob dataset refers to the Azure Storage linked service you create in the previous step.

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

创建管道Create a pipeline

Main 方法中添加用于创建包含复制活动的管道的以下代码。Add the following code to the Main method that creates a pipeline with a copy activity.

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(reference_name=ds_name)
    dsOut_ref = DatasetReference(reference_name=dsOut_name)
    copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

    #Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

创建管道运行Create a pipeline run

Main 方法中添加用于触发管道运行的以下代码。Add the following code to the Main method that triggers a pipeline run.

    #Create a pipeline run.
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

监视管道运行Monitor a pipeline run

若要监视管道运行,请在 Main 方法中添加以下代码:To monitor the pipeline run, add the following code the Main method:

    #Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])

现在添加以下语句,以便在运行程序时调用 main 方法:Now, add the following statement to invoke the main method when the program is run:

# Start the main method
main()

完整脚本Full script

下面是完整的 Python 代码:Here is the full Python code:

from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time


def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)
    print("\n")


def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n")


def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(
            activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(
            activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(
            activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))


def main():

    # Azure subscription ID
    subscription_id = '<your Azure subscription ID>'

    # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
    rg_name = '<Azure resource group name>'

    # The data factory name. It must be globally unique.
    df_name = '<Your data factory name>'

    # Specify your Active Directory client ID, client secret, and tenant ID
    credentials = ServicePrincipalCredentials(
        client_id='<Active Directory client ID>', secret='<client secret>', tenant='<tenant ID>' , china='true')
    resource_client = ResourceManagementClient(credentials, subscription_id , base_url='https://management.chinacloudapi.cn')
    adf_client = DataFactoryManagementClient(credentials, subscription_id , base_url='https://management.chinacloudapi.cn')

    rg_params = {'location':'chinaeast2'}
    df_params = {'location':'chinaeast2'}

    # create the resource group
    # comment out if the resource group already exits
    resource_client.resource_groups.create_or_update(rg_name, rg_params)

    # Create a data factory
    df_resource = Factory(location='chinaeast2')
    df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
    print_item(df)
    while df.provisioning_state != 'Succeeded':
        df = adf_client.factories.get(rg_name, df_name)
        time.sleep(1)

    # Create an Azure Storage linked service
    ls_name = 'storageLinkedService'

    # Specify the name and key of your Azure Storage account
    storage_string = SecureString(
        value='DefaultEndpointsProtocol=https;AccountName=<storage account name>;AccountKey=<storage account key>;EndpointSuffix=core.chinacloudapi.cn')

    ls_azure_storage = AzureStorageLinkedService(
        connection_string=storage_string)
    ls = adf_client.linked_services.create_or_update(
        rg_name, df_name, ls_name, ls_azure_storage)
    print_item(ls)

    # Create an Azure blob dataset (input)
    ds_name = 'ds_in'
    ds_ls = LinkedServiceReference(reference_name=ls_name)
    blob_path = 'adfv2tutorial/input'
    blob_filename = 'input.txt'
    ds_azure_blob = AzureBlobDataset(
        linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename)
    ds = adf_client.datasets.create_or_update(
        rg_name, df_name, ds_name, ds_azure_blob)
    print_item(ds)

    # Create an Azure blob dataset (output)
    dsOut_name = 'ds_out'
    output_blobpath = 'adfv2tutorial/output'
    dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
    dsOut = adf_client.datasets.create_or_update(
        rg_name, df_name, dsOut_name, dsOut_azure_blob)
    print_item(dsOut)

    # Create a copy activity
    act_name = 'copyBlobtoBlob'
    blob_source = BlobSource()
    blob_sink = BlobSink()
    dsin_ref = DatasetReference(ds_name)
    dsOut_ref = DatasetReference(dsOut_name)
    copy_activity = CopyActivity(act_name, inputs=[dsin_ref], outputs=[
                                 dsOut_ref], source=blob_source, sink=blob_sink)

    # Create a pipeline with the copy activity
    p_name = 'copyPipeline'
    params_for_pipeline = {}
    p_obj = PipelineResource(
        activities=[copy_activity], parameters=params_for_pipeline)
    p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
    print_item(p)

    # Create a pipeline run
    run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

    # Monitor the pipeline run
    time.sleep(30)
    pipeline_run = adf_client.pipeline_runs.get(
        rg_name, df_name, run_response.run_id)
    print("\n\tPipeline run status: {}".format(pipeline_run.status))
    filter_params = RunFilterParameters(
        last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
    query_response = adf_client.activity_runs.query_by_pipeline_run(
        rg_name, df_name, pipeline_run.run_id, filter_params)
    print_activity_run_details(query_response.value[0])


# Start the main method
main()

运行代码Run the code

生成并启动应用程序,然后验证管道执行。Build and start the application, then verify the pipeline execution.

控制台会输出数据工厂、链接服务、数据集、管道和管道运行的创建进度。The console prints the progress of creating data factory, linked service, datasets, pipeline, and pipeline run. 请等到出现包含数据读取/写入大小的复制活动运行详细信息。Wait until you see the copy activity run details with data read/written size. 然后,使用 Azure 存储资源管理器等工具检查 blob 是否已根据变量中的指定从“inputBlobPath”复制到“outputBlobPath”。Then, use tools such as Azure Storage explorer to check the blob(s) is copied to "outputBlobPath" from "inputBlobPath" as you specified in variables.

下面是示例输出:Here is the sample output:

Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: chinaeast2
Tags: {}

Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService

Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in

Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out

Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline

Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.

Activity run details

Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4

清理资源Clean up resources

若要删除数据工厂,请向程序中添加以下代码:To delete the data factory, add the following code to the program:

adf_client.factories.delete(rg_name, df_name)

后续步骤Next steps

此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。The pipeline in this sample copies data from one location to another location in an Azure blob storage. 完成相关教程来了解如何在更多方案中使用数据工厂。Go through the tutorials to learn about using Data Factory in more scenarios.