使用 PowerShell 以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
本教程将使用 Azure 数据工厂创建一个管道,用于将 Azure SQL 数据库的表中的增量数据加载到 Azure Blob 存储。
在本教程中执行以下步骤:
- 准备用于存储水印值的数据存储。
- 创建数据工厂。
- 创建链接服务。
- 创建源、接收器和水印数据集。
- 创建管道。
- 运行管道。
- 监视管道运行。
概述
下面是高级解决方案示意图:
下面是创建此解决方案所要执行的重要步骤:
选择水印列。 在源数据存储中选择一个列,该列可用于将每个运行的新记录或已更新记录切片。 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。 此列中的最大值用作水印。
准备用于存储水印值的数据存储。
本教程在 SQL 数据库中存储水印值。创建采用以下工作流的管道:
此解决方案中的管道具有以下活动:
- 创建两个 Lookup 活动。 使用第一个 Lookup 活动检索上一个水印值。 使用第二个 Lookup 活动检索新的水印值。 这些水印值会传递到 Copy 活动。
- 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于或等于新水印值的行。 然后,该活动将源数据存储中的增量数据作为新文件复制到 Blob 存储。
- 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。
如果没有 Azure 订阅,请在开始前创建一个试用帐户。
先决条件
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
- Azure SQL 数据库。 将数据库用作源数据存储。 如果 Azure SQL 数据库没有数据库,请参阅在 Azure SQL 数据库中创建数据库,了解创建步骤。
- Azure 存储。 将 Blob 存储用作接收器数据存储。 如果没有存储帐户,请参阅创建存储帐户以获取创建步骤。 创建名为 adftutorial 的容器。
- Azure PowerShell。 遵循安装和配置 Azure PowerShell 中的说明。
在 SQL 数据库中创建数据源表
打开 SQL Server Management Studio。 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。
针对 SQL 数据库运行以下 SQL 命令,创建名为
data_source_table
的表作为数据源存储:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
本教程使用 LastModifytime 作为水印列。 下表显示了数据源存储中的数据:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
在 SQL 数据库中创建另一个表,用于存储高水印值
针对 SQL 数据库运行以下 SQL 命令,创建名为
watermarktable
的表,用于存储水印值:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
使用源数据存储的表名设置高水印的默认值。 在本教程中,表名为 data_source_table。
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
查看
watermarktable
表中的数据。Select * from watermarktable
输出:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
在 SQL 数据库中创建存储过程
运行以下命令,在 SQL 数据库中创建存储过程:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
创建数据工厂
为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。 示例为
"adfrg"
。$resourceGroupName = "ADFTutorialResourceGroup";
如果该资源组已存在,请勿覆盖它。 为
$resourceGroupName
变量分配另一个值,然后再次运行命令定义一个用于数据工厂位置的变量。
$location = "China East 2"
若要创建 Azure 资源组,请运行以下命令:
New-AzResourceGroup $resourceGroupName $location
如果该资源组已存在,请勿覆盖它。 为
$resourceGroupName
变量分配另一个值,然后再次运行命令定义一个用于数据工厂名称的变量。
重要
更新数据工厂名称,使之全局唯一。 例如 ADFTutorialFactorySP1127。
$dataFactoryName = "ADFIncCopyTutorialFactory";
要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
请注意以下几点:
数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。
若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、SQL 数据库、Azure SQL 托管实例等)和计算资源(Azure HDInsight 等)可以位于其他区域中。
创建链接服务
可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分中,创建到存储帐户和 SQL 数据库的链接服务。
创建存储链接服务
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)将
<accountName>
和<accountKey>
替换为存储帐户的名称和密钥,然后保存文件。{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn" } } }
在 PowerShell 中切换到 ADF 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。 在以下示例中,传递 ResourceGroupName 和 DataFactoryName 参数的值:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
创建 SQL 数据库链接服务
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)在保存文件之前,将 <your-server-name> 和 <your-database-name> 替换为你的服务器和数据库的名称。 此外,必须配置 Azure SQL Server,以便为数据工厂的托管标识授予访问权限。
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.chinacloudapi.cn,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
在 PowerShell 中切换到 ADF 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureSQLDatabaseLinkedService。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
创建数据集
在此步骤中,创建表示源和接收器数据的数据集。
创建源数据集
在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
本教程使用表名 data_source_table。 如果使用其他名称的表,请替换名称。
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建接收器数据集
在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
重要
此代码片段假设 Blob 存储中有一个名为
adftutorial
的 Blob 容器。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 会自动创建输出文件夹incrementalcopy
(如果容器中不存在)。 在本教程中,文件名是使用表达式@CONCAT('Incremental-', pipeline().RunId, '.txt')
动态生成的。运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
为水印创建数据集
在此步骤中,创建用于存储高水印值的数据集。
在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建管道
本教程创建包含两个 Lookup 活动、一个 Copy 活动和一个 StoredProcedure 活动的管道,这些活动链接在一个管道中。
在同一文件夹中,创建包含以下内容的 JSON 文件 IncrementalCopyPipeline.json:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
下面是示例输出:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
运行管道
使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
查看结果
在 Blob 存储(接收器存储)中,可看到数据已复制到 SinkDataset 中定义的文件。 在当前的教程中,文件名为
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
。 打开该文件,可以看到其中与 SQL 数据库中的数据相同的记录。1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
在
watermarktable
中查看最新值。 可看到水印值已更新。Select * from watermarktable
下面是示例输出:
TableName WatermarkValue data_source_table 2017-09-05 8:06:00.000
将数据插入数据源存储,验证增量数据的加载
在 SQL 数据库(数据源存储)中插入新数据。
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
SQL 数据库中的更新数据为:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000
通过使用 Invoke-AzDataFactoryV2Pipeline cmdlet 再次运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}
在 Blob 存储中,可以看到另一文件已创建。 在本教程中,新文件名为
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
。 打开该文件,会看到其中包含两行记录。在
watermarktable
中查看最新值。 可看到水印值已再次更新。Select * from watermarktable
示例输出:
TableName WatermarkValue data_source_table 2017-09-07 09:01:00.000
相关内容
已在本教程中执行了以下步骤:
- 准备用于存储水印值的数据存储。
- 创建数据工厂。
- 创建链接服务。
- 创建源、接收器和水印数据集。
- 创建管道。
- 运行管道。
- 监视管道运行。
在本教程中,管道将数据从 Azure SQL 数据库中的单个表复制到 Blob 存储。 转到下面的教程,了解如何将数据从 SQL Server 数据库中的多个表复制到 SQL 数据库。