PowerShell 脚本 - 使用 Azure 数据工厂批量复制多个表
此 PowerShell 示例脚本将数据从 Azure SQL 数据库中的多个表复制到 Azure Synapse Analytics。
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
本示例需要 Azure PowerShell。 运行 Get-Module -ListAvailable Az
即可查找版本。
如果需要进行安装或升级,请参阅安装 Azure PowerShell 模块。
运行 Connect-AzAccount -Environment AzureChinaCloud cmdlet 以连接到世纪互联运营的 Azure。
有关运行此示例的先决条件,请参阅教程:批量复制。
示例脚本
重要
此脚本在硬盘驱动器上的 c:\ 文件夹中创建 JSON 文件,用于定义数据工厂实体(链接服务、数据集和管道)。
# Set variables with your own values
$resourceGroupName = "<resource group name>"
$dataFactoryName = "<data factory name>" # Name of the data factory must be globally unique
$dataFactoryNameLocation = "China East 2"
$azureSqlServer = "<azure sql server name>"
$azureSqlServerUser = "<azure sql server user>"
$azureSqlServerUserPassword = "<azure sql server password>"
$azureSqlDatabase = "<source azure sql database name>"
$azureSqlDataWarehouse = "<sink azure sql data warehouse name>"
$azureStorageAccount = "<Az.Storage accoutn name>"
$azureStorageAccountKey = "<Az.Storage account key>"
# No need to change values for these variables
$azureSqlDatabaseLinkedService = "AzureSqlDatabaseLinkedService"
$azureSqlDataWarehouseLinkedService = "AzureSqlDataWarehouseLinkedService"
$azureStorageLinkedService = "AzureStorageLinkedService"
$azureSqlDatabaseDataset = "AzureSqlDatabaseDataset"
$azureSqlDataWarehouseDataset = "AzureSqlDataWarehouseDataset"
$IterateAndCopySQLTablesPipeline = "IterateAndCopySQLTables"
$pipelineGetTableListAndTriggerCopyData = "GetTableListAndTriggerCopyData"
# create a resource gorup
New-AzResourceGroup -Name $resourceGroupName -Location $dataFactoryNameLocation
# create a data factory
$df = Set-AzDataFactory -ResourceGroupName $resourceGroupName -Location $dataFactoryNameLocation -Name $dataFactoryName
# create a linked service for Azure SQL Database (source)
$azureSQLDatabaseLinkedServiceDefinition = @"
{
"name": "$azureSqlDatabaseLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:$azureSqlServer.database.chinacloudapi.cn,1433;Database=$azureSqlDatabase;User ID=$azureSqlServerUser;Password=$azureSqlServerUserPassword;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$azureSQLDatabaseLinkedServiceDefinition | Out-File c:\$azureSqlDatabaseLinkedService.json
## Creates an Az.Storage linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $azureSqlDatabaseLinkedService -File c:\$azureSqlDatabaseLinkedService.json
# create a linked service for Azure SQL Datawarehouse (sink)
$azureSQLDataWarehouseLinkedServiceDefinition = @"
{
"name": "$azureSqlDataWarehouseLinkedService",
"properties": {
"type": "AzureSqlDW",
"typeProperties": {
"connectionString": {
"type": "SecureString",
"value": "Server=tcp:$azureSqlServer.database.chinacloudapi.cn,1433;Database=$azureSqlDataWarehouse;User ID=$azureSqlServerUser;Password=$azureSqlServerUserPassword;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$azureSQLDataWarehouseLinkedServiceDefinition | Out-File c:\$azureSqlDataWarehouseLinkedService.json
## Creates an linked service for Az.Storage Account. Interim storage to enable PolyBase
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $azureSqlDataWarehouseLinkedService -File c:\$azureSqlDataWarehouseLinkedService.json
$storageLinkedServiceDefinition = @"
{
"name": "$azureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"value": "DefaultEndpointsProtocol=https;AccountName=$azureStorageAccount;AccountKey=$azureStorageAccountKey;EndpointSuffix=core.chinacloudapi.cn",
"type": "SecureString"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryLinkedService command.
$storageLinkedServiceDefinition | Out-File c:\$azureStorageLinkedService.json
## Creates an Az.Storage linked service
Set-AzDataFactoryLinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $azureStorageLinkedService -File c:\$azureStorageLinkedService.json
# create the input dataset (Azure SQL Database)
$azureSqlDatabaseDatasetDefiniton = @"
{
"name": "$azureSqlDatabaseDataset",
"properties": {
"type": "AzureSqlTable",
"linkedServiceName": {
"referenceName": "$azureSqlDatabaseLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"tableName": "dummy"
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryDataset command.
$azureSqlDatabaseDatasetDefiniton | Out-File c:\$azureSqlDatabaseDataset.json
## Create a dataset in the data factory
Set-AzDataFactoryDataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $azureSqlDatabaseDataset -File "c:\$azureSqlDatabaseDataset.json"
# create the output dataset (Azure SQL Data Warehouse)
$azureSqlDataWarehouseDatasetDefiniton = @"
{
"name": "$azureSqlDataWarehouseDataset",
"properties": {
"type": "AzureSqlDWTable",
"linkedServiceName": {
"referenceName": "$azureSqlDataWarehouseLinkedService",
"type": "LinkedServiceReference"
},
"typeProperties": {
"tableName": {
"value": "@{dataset().DWTableName}",
"type": "Expression"
}
},
"parameters":{
"DWTableName":{
"type":"String"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryDataset command.
$azureSqlDataWarehouseDatasetDefiniton | Out-File c:\$azureSqlDataWarehouseDataset.json
## Create a dataset in the data factory
Set-AzDataFactoryDataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $azureSqlDataWarehouseDataset -File "c:\$azureSqlDataWarehouseDataset.json"
# Create a pipeline in the data factory that copies data from source SQL Database to sink SQL Data Warehouse
$pipelineDefinition = @"
{
"name": "$IterateAndCopySQLTablesPipeline",
"properties": {
"activities": [
{
"name": "IterateSQLTables",
"type": "ForEach",
"typeProperties": {
"isSequential": "false",
"items": {
"value": "@pipeline().parameters.tableList",
"type": "Expression"
},
"activities": [
{
"name": "CopyData",
"description": "Copy data from SQL database to SQL DW",
"type": "Copy",
"inputs": [
{
"referenceName": "$azureSqlDatabaseDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "$azureSqlDataWarehouseDataset",
"type": "DatasetReference",
"parameters": {
"DWTableName": "[@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]"
}
}
],
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT * FROM [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]"
},
"sink": {
"type": "SqlDWSink",
"preCopyScript": "TRUNCATE TABLE [@{item().TABLE_SCHEMA}].[@{item().TABLE_NAME}]",
"allowPolyBase": true
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "$azureStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
}
]
}
}
],
"parameters": {
"tableList": {
"type": "Object"
}
}
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryPipeline command.
$pipelineDefinition | Out-File c:\$IterateAndCopySQLTablesPipeline.json
## Create a pipeline in the data factory
Set-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $IterateAndCopySQLTablesPipeline -File "c:\$IterateAndCopySQLTablesPipeline.json"
# Create a pipeline in the data factory that retrieves a list of tables and invokes the above pipeline for each table to be copied
$pipeline2Definition = @"
{
"name":"$pipelineGetTableListAndTriggerCopyData",
"properties":{
"activities":[
{
"name": "LookupTableList",
"description": "Retrieve the table list from Azure SQL dataabse",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlSource",
"sqlReaderQuery": "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA = 'SalesLT' and TABLE_NAME <> 'ProductModel'"
},
"dataset": {
"referenceName": "$azureSqlDatabaseDataset",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "TriggerCopy",
"type": "ExecutePipeline",
"typeProperties": {
"parameters": {
"tableList": {
"value": "@activity('LookupTableList').output.value",
"type": "Expression"
}
},
"pipeline": {
"referenceName": "$IterateAndCopySQLTablesPipeline",
"type": "PipelineReference"
},
"waitOnCompletion": true
},
"dependsOn": [
{
"activity": "LookupTableList",
"dependencyConditions": [
"Succeeded"
]
}
]
}
]
}
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Set-AzDataFactoryPipeline command.
$pipeline2Definition | Out-File c:\$pipelineGetTableListAndTriggerCopyData.json
## Create a pipeline in the data factory
Set-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineGetTableListAndTriggerCopyData -File "c:\$pipelineGetTableListAndTriggerCopyData.json"
# Create a pipeline run
## JSON definition for dummy pipeline parameters
$pipelineParameters = @"
{
"dummy": "b"
}
"@
## IMPORTANT: store the JSON definition in a file that will be used by the Invoke-AzDataFactoryPipeline command.
$pipelineParameters | Out-File c:\PipelineParameters.json
# Create a pipeline run by using parameters
$runId = Invoke-AzDataFactoryPipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineGetTableListAndTriggerCopyData -ParameterFile c:\PipelineParameters.json
# Check the pipeline run status until it finishes the copy operation
Start-Sleep -Seconds 30
while ($True) {
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
if (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
Start-Sleep -Seconds 30
}
else {
Write-Host "Pipeline '$pipelineGetTableListAndTriggerCopyData' run finished. Result:" -foregroundcolor "Yellow"
$result
break
}
}
# Get the activity run details
$result = Get-AzDataFactoryActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName `
-PipelineRunId $runId `
-RunStartedAfter (Get-Date).AddMinutes(-10) `
-RunStartedBefore (Get-Date).AddMinutes(10) `
-ErrorAction Stop
$result
if ($result.Status -eq "Succeeded") {`
$result.Output -join "`r`n"`
}`
else {`
$result.Error -join "`r`n"`
}
# To remove the data factory from the resource gorup
# Remove-AzDataFactory -Name $dataFactoryName -ResourceGroupName $resourceGroupName
#
# To remove the whole resource group
# Remove-AzResourceGroup -Name $resourceGroupName
清理部署
运行示例脚本后,可以使用以下命令删除资源组以及与其关联的所有资源:
Remove-AzResourceGroup -ResourceGroupName $resourceGroupName
若要从资源组中删除数据工厂,请运行以下命令:
Remove-AzDataFactoryV2 -Name $dataFactoryName -ResourceGroupName $resourceGroupName
脚本说明
此脚本使用以下命令:
命令 | 注释 |
---|---|
New-AzResourceGroup | 创建用于存储所有资源的资源组。 |
Set-AzDataFactoryV2 | 创建数据工厂。 |
Set-AzDataFactoryV2LinkedService | 在数据工厂中创建链接服务。 链接服务可将数据存储或计算链接到数据工厂。 |
Set-AzDataFactoryV2Dataset | 在数据工厂中创建数据集。 数据集表示管道中活动的输入/输出。 |
Set-AzDataFactoryV2Pipeline | 在数据工厂中创建管道。 一个管道包含一个或多个执行某项操作的活动。 在此管道中,复制活动在 Azure Blob 存储中将数据从一个位置复制到另一个位置。 |
Invoke-AzDataFactoryV2Pipeline | 为管道创建运行。 换而言之,就是运行管道。 |
Get-AzDataFactoryV2ActivityRun | 获取管道中活动的运行(活动运行)的相关详细信息。 |
Remove-AzResourceGroup | 删除资源组,包括所有嵌套的资源。 |
相关内容
有关 Azure PowerShell 的详细信息,请参阅 Azure PowerShell 文档。
可以在 Azure 数据工厂 PowerShell 脚本中找到其他 Azure 数据工厂 PowerShell 脚本示例。