快速入门:使用 Python 创建数据工厂和管道
适用于:Azure 数据工厂 Azure Synapse Analytics
在本快速入门中,你将使用 Python 创建数据工厂。 此数据工厂中的管道将数据从 Azure Blob 存储中的一个文件夹复制到另一个文件夹。
Azure 数据工厂是基于云的数据集成服务,用于创建数据驱动型工作流,以便协调和自动完成数据移动和数据转换。 可以使用 Azure 数据工厂创建和计划数据驱动型工作流(称为管道)。
管道可以从不同的数据存储引入数据。 管道使用计算服务(例如 Azure HDInsight Hadoop、Spark 和 Azure 机器学习)来处理或转换数据。 管道将输出数据发布到数据存储(例如 Azure Synapse Analytics),供商业智能 (BI) 应用程序使用。
先决条件
具有活动订阅的 Azure 帐户。 创建一个试用帐户。
Azure 存储资源管理器(可选)。
Microsoft Entra ID 中的应用程序。 按照此链接中的步骤,使用身份验证选项 2(应用程序机密)创建应用程序,并按照同一文章中的说明将应用程序分配到“参与者”角色。 记下本文显示的以下值,以便在后续步骤中使用:应用程序(客户端)ID、客户端密码值和租户 ID。
创建并上传输入文件
启动记事本。 复制以下文本并在磁盘上将其另存为 input.txt 文件。
John|Doe Jane|Doe
使用 Azure 存储资源管理器等工具创建 adfv2tutorial 容器,并在该容器中创建 input 文件夹。 然后,将 input.txt 文件上传到 input 文件夹。
安装 Python 包
使用管理员特权打开一个终端或命令提示符。
首先,安装 Azure 管理资源的 Python 包:
pip install azure-mgmt-resource
若要为数据工厂安装 Python 包,请运行以下命令:
pip install azure-mgmt-datafactory
用于数据工厂的 Python SDK 支持 Python 2.7 和 3.6+。
要为 Azure 标识身份验证安装 Python 包,请运行以下命令:
pip install azure-identity
注意
在某些常见依赖关系上,“azure-identity”包可能与“azure-cli”冲突。 如果遇到任何身份验证问题,请删除“azure-cli”及其依赖关系,或使用未安装“azure-cli”包的初始状态计算机,以确保“azure-identity”包正常工作。 对于主权云,必须使用适当的云特定常量。 请参阅使用用于 Python 的 Azure 库连接到所有区域 - 多云 | 有关在主权云中连接 Python 的 Microsoft Docs 说明。
创建数据工厂客户端
创建一个名为 datafactory.py 的文件。 添加以下语句来添加对命名空间的引用。
from azure.identity import DefaultAzureCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta from msrestazure.azure_cloud import AZURE_CHINA_CLOUD import time
添加用于打印信息的以下函数。
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))
向 Main 方法中添加用于创建 DataFactoryManagementClient 类的实例的以下代码。 将使用此对象来创建数据工厂、链接服务、数据集和管道。 还将使用此对象来监视管道运行详细信息。 将 subscription_id 变量设置为 Azure 订阅的 ID。 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory tenant ID credentials = DefaultAzureCredential(authority = AZURE_CHINA_CLOUD.endpoints.active_directory, tenant_id = '<tenant ID>') resource_client = ResourceManagementClient( credentials, subscription_id, base_url = AZURE_CHINA_CLOUD.endpoints.resource_manager, credential_scopes = [AZURE_CHINA_CLOUD.endpoints.resource_manager + "/.default"] ) adf_client = DataFactoryManagementClient( credentials, subscription_id, base_url = AZURE_CHINA_CLOUD.endpoints.resource_manager, credential_scopes = [AZURE_CHINA_CLOUD.endpoints.resource_manager + "/.default"] ) rg_params = {'location':'chinaeast2'} df_params = {'location':'chinaeast2'}
创建数据工厂
向 Main 方法中添加用于创建数据工厂的以下代码。 如果资源组已存在,请注释掉第一个 create_or_update
语句。
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='chinaeast2')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
创建链接服务
在 Main 方法中添加用于创建 Azure 存储链接服务的以下代码。
可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在此快速入门中,只需创建一个同时作为复制源和接收器存储的 Azure 存储链接服务,在示例中名为“AzureStorageLinkedService”。 将 <storageaccountname>
和 <storageaccountkey>
替换为 Azure 存储帐户的名称和密钥。
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=core.chinacloudapi.cn')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
创建数据集
在本部分中创建两个数据集:一个用于源,另一个用于接收器。
为源 Azure Blob 创建数据集
向 Main 方法中添加用于创建 Azure blob 数据集的以下代码。 有关 Azure Blob 数据集的属性的信息,请参阅 Azure Blob 连接器一文。
在 Azure Blob 中定义表示源数据的数据集。 此 Blob 数据集引用在上一步中创建的 Azure 存储链接服务。
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
为接收器 Azure Blob 创建数据集
向 Main 方法中添加用于创建 Azure blob 数据集的以下代码。 有关 Azure Blob 数据集的属性的信息,请参阅 Azure Blob 连接器一文。
在 Azure Blob 中定义表示源数据的数据集。 此 Blob 数据集引用在上一步中创建的 Azure 存储链接服务。
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
创建管道
向 Main 方法中添加用于创建包含复制活动的管道的以下代码。
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { "ParameterName1" : "ParameterValue1" } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
创建管道运行
在 Main 方法中添加用于触发管道运行的以下代码。
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
监视管道运行
若要监视管道运行,请在 Main 方法中添加以下代码:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
现在添加以下语句,以便在运行程序时调用 main 方法:
# Start the main method
main()
完整脚本
下面是完整的 Python 代码:
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
from msrestazure.azure_cloud import AZURE_CHINA_CLOUD
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory tenant ID
credentials = DefaultAzureCredential(authority = AZURE_CHINA_CLOUD.endpoints.active_directory, tenant_id = '<tenant ID>')
resource_client = ResourceManagementClient(
credentials, subscription_id,
base_url = AZURE_CHINA_CLOUD.endpoints.resource_manager,
credential_scopes = [AZURE_CHINA_CLOUD.endpoints.resource_manager + "/.default"]
)
adf_client = DataFactoryManagementClient(
credentials, subscription_id,
base_url = AZURE_CHINA_CLOUD.endpoints.resource_manager,
credential_scopes = [AZURE_CHINA_CLOUD.endpoints.resource_manager + "/.default"]
)
rg_params = {'location':'chinaeast2'}
df_params = {'location':'chinaeast2'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='chinaeast2')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=core.chinacloudapi.cn')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
运行代码
生成并启动应用程序,然后验证管道执行。
控制台会输出数据工厂、链接服务、数据集、管道和管道运行的创建进度。 请等到出现包含数据读取/写入大小的复制活动运行详细信息。 然后,使用 Azure 存储资源管理器等工具检查 blob 是否已根据变量中的指定从“inputBlobPath”复制到“outputBlobPath”。
下面是示例输出:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: chinaeast2
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
清理资源
若要删除数据工厂,请向程序中添加以下代码:
adf_client.factories.delete(rg_name, df_name)
相关内容
此示例中的管道将数据从 Azure Blob 存储中的一个位置复制到另一个位置。 完成相关教程来了解如何在更多方案中使用数据工厂。