Compartir a través de

使用 PowerShell 通过 Azure 数据工厂批量复制多个表

适用于:Azure 数据工厂 Azure Synapse Analytics

本教程演示如何将 Azure SQL 数据库中的多个表复制到 Azure Synapse Analytics。 在其他复制方案中,也可以应用相同的模式。 例如,将 SQL Server/Oracle 中的表复制到 Azure SQL 数据库/数据仓库/Azure Blob,将 Blob 中的不同路径复制到 Azure SQL 数据库表。

从较高层面讲,本教程涉及以下步骤:

  • 创建数据工厂。
  • 创建 Azure SQL 数据库、Azure Synapse Analytics 和 Azure 存储链接服务。
  • 创建 Azure SQL 数据库和 Azure Synapse Analytics 数据集。
  • 创建一个管道用于查找要复制的表,创建另一个管道用于执行实际复制操作。
  • 启动管道运行。
  • 监视管道和活动运行。

本教程使用 Azure PowerShell。 若要了解如何使用其他工具/SDK 创建数据工厂,请参阅快速入门

端到端工作流

在本场景中,Azure SQL 数据库中包含一些表,我们要将其复制到 Azure Synapse Analytics。 下面是管道中发生的工作流中的逻辑步骤顺序:

工作流

  • 第一个管道查找需要复制到接收器数据存储的表列表。 也可以维护一个元数据表用于列出要复制到接收器数据存储的所有表。 然后,该管道触发另一个管道,后者循环访问数据库中的每个表并执行数据复制操作。
  • 第二个管道执行实际复制。 它使用表列表作为参数。 对于列表中的每个表,为获得最佳性能,会使用通过 Blob 存储和 PolyBase 进行的分阶段复制,将 Azure SQL 数据库中的特定表复制到 Azure Synapse Analytics 中的相应表。 在本示例中,第一个管道传递表列表作为参数值。

如果没有 Azure 订阅,可在开始前创建一个试用帐户

先决条件

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

  • Azure PowerShell。 遵循如何安装和配置 Azure PowerShell 中的说明。
  • Azure 存储帐户。 Azure 存储帐户用作批量复制操作中的过渡 Blob 存储。
  • Azure SQL 数据库。 此数据库包含源数据。
  • Azure Synapse Analytics。 此数据仓库包含从 SQL 数据库复制的数据。

准备 SQL 数据库和 Azure Synapse Analytics

准备源 Azure SQL 数据库

按照在 Azure SQL 数据库中创建数据库一文,使用 Adventure Works LT 示例数据在 Azure SQL 数据库中创建一个数据库。 本教程将此示例数据库中的所有表复制到 Azure Synapse Analytics。

准备接收器 Azure Synapse Analytics

  1. 如果没有 Azure Synapse Analytics 工作区,请参阅 Azure Synapse Analytics 入门一文了解创建步骤。

  2. 在 Azure Synapse Analytics 中创建相应的表架构。 后面的步骤使用 Azure 数据工厂迁移/复制数据。

Azure 服务访问 SQL 服务器

对于 SQL 数据库和 Azure Synapse Analytics,请允许 Azure 服务访问 SQL Server。 确保服务器的“允许访问 Azure 服务”设置已切换为“打开”状态 。 此设置允许数据工厂服务从 Azure SQL 数据库中读取数据,并将数据写入 Azure Synapse Analytics。 若要验证并启用此设置,请执行以下步骤:

  1. 单击左侧的“所有服务”,然后单击“SQL Server”。
  2. 选择服务器,并单击“设置”下的“防火墙”。
  3. 在“防火墙设置”页中,单击“允许访问 Azure 服务”对应的“打开”。

创建数据工厂

  1. 启动 PowerShell。 在本教程结束之前,请将 Azure PowerShell 保持打开状态。 如果将它关闭再重新打开,则需要再次运行下述命令。

    运行以下命令并输入用于登录 Azure 门户的用户名和密码:

    Connect-AzAccount -Environment AzureChinaCloud
    

    运行以下命令查看此帐户的所有订阅:

    Get-AzSubscription
    

    运行以下命令选择要使用的订阅。 请将 SubscriptionId 替换为自己的 Azure 订阅的 ID:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"
    
  2. 运行 Set-AzDataFactoryV2 cmdlet 创建数据工厂。 执行该命令之前,请将占位符替换为自己的值。

    $resourceGroupName = "<your resource group to create the factory>"
    $dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"
    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
    

    请注意以下几点:

    • Azure 数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试。

      The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
      
    • 只有 Azure 订阅的参与者或管理员才可以创建数据工厂实例。

    • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。

创建链接服务

本教程分别为源、接收器和过渡 Blob 创建了三个链接服务,其中包括数据存储的连接:

创建源 Azure SQL 数据库链接服务

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDatabaseLinkedService.json 的 JSON 文件:(如果 ADFv2TutorialBulkCopy 文件夹尚不存在,则创建该文件夹。)

    重要

    保存文件之前,请将 <servername>、<databasename>、<username>@<servername> 和 <password> 替换为 Azure SQL 数据库的值。

    {
        "name": "AzureSqlDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<servername>.database.chinacloudapi.cn,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
            }
        }
    }
    
  2. Azure PowerShell 中,切换到 ADFv2TutorialBulkCopy 文件夹。

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureSqlDatabaseLinkedService

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDatabaseLinkedService" -File ".\AzureSqlDatabaseLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureSqlDatabaseLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

创建接收器 Azure Synapse Analytics 链接服务

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDWLinkedService.json 的 JSON 文件:

    重要

    保存文件之前,请将 <servername>、<databasename>、<username>@<servername> 和 <password> 替换为 Azure SQL 数据库的值。

    {
        "name": "AzureSqlDWLinkedService",
        "properties": {
            "type": "AzureSqlDW",
            "typeProperties": {
                "connectionString": "Server=tcp:<servername>.database.chinacloudapi.cn,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
            }
        }
    }
    
  2. 若要创建链接服务 AzureSqlDWLinkedService,请运行 Set-AzDataFactoryV2LinkedService cmdlet。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDWLinkedService" -File ".\AzureSqlDWLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureSqlDWLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDWLinkedService
    

创建过渡 Azure 存储链接服务

本教程使用 Azure Blob 存储作为临时过渡区域,以利用 PolyBase 来实现更好的复制性能。

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件:

    重要

    将 <accountName> 和 <accountKey> 分别替换为 Azure 存储帐户的名称和密钥,然后保存文件。

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
            }
        }
    }
    
  2. 若要创建链接服务 AzureStorageLinkedService,请运行 Set-AzDataFactoryV2LinkedService cmdlet。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

创建数据集

在本教程中创建源和接收器数据集,用于指定数据的存储位置:

为源 SQL 数据库创建数据集

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDatabaseDataset.json 的 JSON 文件。 “tableName”是一个虚构名称,因为稍后要在复制活动中使用 SQL 查询检索数据。

    {
        "name": "AzureSqlDatabaseDataset",
        "properties": {
            "type": "AzureSqlTable",
            "linkedServiceName": {
                "referenceName": "AzureSqlDatabaseLinkedService",
                "type": "LinkedServiceReference"
            },
            "typeProperties": {
                "tableName": "dummy"
            }
        }
    }
    
  2. 若要创建数据集 AzureSqlDatabaseDataset,请运行 Set-AzDataFactoryV2Dataset cmdlet。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDatabaseDataset" -File ".\AzureSqlDatabaseDataset.json"
    

    下面是示例输出:

    DatasetName       : AzureSqlDatabaseDataset
    ResourceGroupName : <resourceGroupname>
    DataFactoryName   : <dataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

为接收器 Azure Synapse Analytics 创建数据集

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 AzureSqlDWDataset.json 的 JSON 文件:将“tableName”设置为参数,稍后引用此数据集的复制活动会将实际值传递给数据集。

    {
        "name": "AzureSqlDWDataset",
        "properties": {
            "type": "AzureSqlDWTable",
            "linkedServiceName": {
                "referenceName": "AzureSqlDWLinkedService",
                "type": "LinkedServiceReference"
            },
            "typeProperties": {
                "tableName": {
                    "value": "@{dataset().DWTableName}",
                    "type": "Expression"
                }
            },
            "parameters":{
                "DWTableName":{
                    "type":"String"
                }
            }
        }
    }
    
  2. 若要创建数据集 AzureSqlDWDataset,请运行 Set-AzDataFactoryV2Dataset cmdlet。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSqlDWDataset" -File ".\AzureSqlDWDataset.json"
    

    下面是示例输出:

    DatasetName       : AzureSqlDWDataset
    ResourceGroupName : <resourceGroupname>
    DataFactoryName   : <dataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDwTableDataset
    

创建管道

在本教程中创建两个管道:

创建管道“IterateAndCopySQLTables”

此管道使用表列表作为参数。 对于列表中的每个表,此管道会使用分阶段复制和 PolyBase,将 Azure SQL 数据库中的表的数据复制到 Azure Synapse Analytics。

  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 IterateAndCopySQLTables.json 的 JSON 文件:

    {
        "name": "IterateAndCopySQLTables",
        "properties": {
            "activities": [
                {
                    "name": "IterateSQLTables",
                    "type": "ForEach",
                    "typeProperties": {
                        "isSequential": "false",
                        "items": {
                            "value": "@pipeline().parameters.tableList",
                            "type": "Expression"
                        },
                        "activities": [
                            {
                                "name": "CopyData",
                                "description": "Copy data from Azure SQL Database to Azure Synapse Analytics",
                                "type": "Copy",
                                "inputs": [
                                    {
                                        "referenceName": "AzureSqlDatabaseDataset",
                                        "type": "DatasetReference"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "referenceName": "AzureSqlDWDataset",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "DWTableName": "[@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]"
                                        }
                                    }
                                ],
                                "typeProperties": {
                                    "source": {
                                        "type": "SqlSource",
                                        "sqlReaderQuery": "SELECT * FROM [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]"
                                    },
                                    "sink": {
                                        "type": "SqlDWSink",
                                        "preCopyScript": "TRUNCATE TABLE [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]",
                                        "allowPolyBase": true
                                    },
                                    "enableStaging": true,
                                    "stagingSettings": {
                                        "linkedServiceName": {
                                            "referenceName": "AzureStorageLinkedService",
                                            "type": "LinkedServiceReference"
                                        }
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            "parameters": {
                "tableList": {
                    "type": "Object"
                }
            }
        }
    }
    
  2. 若要创建管道 IterateAndCopySQLTables,请运行 Set-AzDataFactoryV2Pipeline cmdlet。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IterateAndCopySQLTables" -File ".\IterateAndCopySQLTables.json"
    

    下面是示例输出:

    PipelineName      : IterateAndCopySQLTables
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Activities        : {IterateSQLTables}
    Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

创建管道“GetTableListAndTriggerCopyData”

此管道执行两个步骤:

  • 查找 Azure SQL 数据库系统表,以获取要复制的表列表。
  • 触发管道“IterateAndCopySQLTables”来执行实际数据复制。
  1. C:\ADFv2TutorialBulkCopy 文件夹中,创建包含以下内容的名为 GetTableListAndTriggerCopyData.json 的 JSON 文件:

    {
        "name":"GetTableListAndTriggerCopyData",
        "properties":{
            "activities":[
                { 
                    "name": "LookupTableList",
                    "description": "Retrieve the table list from Azure SQL database",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'SalesLT' and TABLE_NAME <> 'ProductModel'"
                        },
                        "dataset": {
                            "referenceName": "AzureSqlDatabaseDataset",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "TriggerCopy",
                    "type": "ExecutePipeline",
                    "typeProperties": {
                        "parameters": {
                            "tableList": {
                                "value": "@activity('LookupTableList').output.value",
                                "type": "Expression"
                            }
                        },
                        "pipeline": {
                            "referenceName": "IterateAndCopySQLTables",
                            "type": "PipelineReference"
                        },
                        "waitOnCompletion": true
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupTableList",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. 若要创建管道 GetTableListAndTriggerCopyData,请运行 Set-AzDataFactoryV2Pipeline cmdlet。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "GetTableListAndTriggerCopyData" -File ".\GetTableListAndTriggerCopyData.json"
    

    下面是示例输出:

    PipelineName      : GetTableListAndTriggerCopyData
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Activities        : {LookupTableList, TriggerCopy}
    Parameters        :
    

启动并监视管道运行

  1. 针对“GetTableListAndTriggerCopyData”主管道启动管道运行,并捕获管道运行 ID,以便将来进行监视。 随后,此管道根据 ExecutePipeline 活动中的指定,触发管道“IterateAndCopySQLTables”的运行。

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName 'GetTableListAndTriggerCopyData'
    
  2. 运行以下脚本可持续检查管道 GetTableListAndTriggerCopyData 的运行状态,并列显最终管道运行和活动运行的结果。

    while ($True) {
        $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId
    
        if ($run) {
            if ($run.Status -ne 'InProgress') {
                Write-Host "Pipeline run finished. The status is: " $run.Status -ForegroundColor "Yellow"
                Write-Host "Pipeline run details:" -ForegroundColor "Yellow"
                $run
                break
            }
            Write-Host  "Pipeline is running...status: InProgress" -ForegroundColor "Yellow"
        }
    
        Start-Sleep -Seconds 15
    }
    
    $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    Write-Host "Activity run details:" -ForegroundColor "Yellow"
    $result
    

    下面是示例运行的输出:

    Pipeline run details:
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    RunId             : 0000000000-00000-0000-0000-000000000000
    PipelineName      : GetTableListAndTriggerCopyData
    LastUpdated       : 9/18/2017 4:08:15 PM
    Parameters        : {}
    RunStart          : 9/18/2017 4:06:44 PM
    RunEnd            : 9/18/2017 4:08:15 PM
    DurationInMs      : 90637
    Status            : Succeeded
    Message           : 
    
    Activity run details:
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    ActivityName      : LookupTableList
    PipelineRunId     : 0000000000-00000-0000-0000-000000000000
    PipelineName      : GetTableListAndTriggerCopyData
    Input             : {source, dataset, firstRowOnly}
    Output            : {count, value, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/18/2017 4:06:46 PM
    ActivityRunEnd    : 9/18/2017 4:07:09 PM
    DurationInMs      : 22995
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    ActivityName      : TriggerCopy
    PipelineRunId     : 0000000000-00000-0000-0000-000000000000
    PipelineName      : GetTableListAndTriggerCopyData
    Input             : {pipeline, parameters, waitOnCompletion}
    Output            : {pipelineRunId}
    LinkedServiceName : 
    ActivityRunStart  : 9/18/2017 4:07:11 PM
    ActivityRunEnd    : 9/18/2017 4:08:14 PM
    DurationInMs      : 62581
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
  3. 可以获取管道“IterateAndCopySQLTables”的运行 ID,并检查详细的活动运行结果,如下所示。

    Write-Host "Pipeline 'IterateAndCopySQLTables' run result:" -ForegroundColor "Yellow"
    ($result | Where-Object {$_.ActivityName -eq "TriggerCopy"}).Output.ToString()
    

    下面是示例运行的输出:

    {
        "pipelineRunId": "7514d165-14bf-41fb-b5fb-789bea6c9e58"
    }
    
    $result2 = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId <copy above run ID> -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    $result2
    
  4. 连接到接收器 Azure Synapse Analytics,并确认是否已正确地从 Azure SQL 数据库复制数据。

已在本教程中执行了以下步骤:

  • 创建数据工厂。
  • 创建 Azure SQL 数据库、Azure Synapse Analytics 和 Azure 存储链接服务。
  • 创建 Azure SQL 数据库和 Azure Synapse Analytics 数据集。
  • 创建一个管道用于查找要复制的表,创建另一个管道用于执行实际复制操作。
  • 启动管道运行。
  • 监视管道和活动运行。

转到以下教程,了解如何以增量方式将数据从源复制到目标: