使用 PowerShell 根据更改跟踪信息,以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
在本教程中,请创建一个带管道的 Azure 数据工厂,该管道根据 Azure SQL 数据库的源数据库中的“更改跟踪”信息将增量数据加载到 Azure Blob 存储。
在本教程中执行以下步骤:
- 准备源数据存储
- 创建数据工厂。
- 创建链接服务。
- 创建源、接收器和更改跟踪数据集。
- 创建、运行和监视完整复制管道
- 在源表中添加或更新数据
- 创建、运行和监视增量复制管道
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
概述
在数据集成解决方案中,一种广泛使用的方案是在完成初始数据加载后以增量方式加载数据。 在某些情况下,可以通过某种方式(例如,使用 LastModifyTime、CreationTime 等属性)将源数据存储中某个时段的更改数据轻松地进行切分。 在某些情况下,没有明确的方式可以将增量数据从上一次处理过的数据中区分出来。 可以使用 Azure SQL 数据库、SQL Server 等数据存储支持的更改跟踪技术来确定增量数据。 本教程介绍如何将 Azure 数据工厂与 SQL 更改跟踪技术配合使用,通过增量方式将增量数据从 Azure SQL 数据库加载到 Azure Blob 存储中。 有关 SQL 更改跟踪技术的更具体的信息,请参阅 SQL Server 中的更改跟踪。
端到端工作流
下面是典型的端到端工作流步骤,用于通过更改跟踪技术以增量方式加载数据。
注意
Azure SQL 数据库和 SQL Server 都支持更改跟踪技术。 本教程使用 Azure SQL 数据库作为源数据存储。 此外,还可以使用 SQL Server 实例。
- 首次加载历史数据(运行一次):
- 在 Azure SQL 数据库的源数据库中启用更改跟踪技术。
- 在数据库中获取 SYS_CHANGE_VERSION 的初始值,作为捕获更改数据的基线。
- 将完整数据从源数据库加载到 Azure Blob 存储中。
- 以增量方式按计划加载增量数据(在首次加载数据后定期运行):
- 获取旧的和新的 SYS_CHANGE_VERSION 值。
- 将 sys.change_tracking_tables 中已更改行(两个 SYS_CHANGE_VERSION 值之间)的主键与源表中的数据联接,以便加载增量数据,然后将增量数据移到目标位置。
- 更新 SYS_CHANGE_VERSION,以便下次进行增量加载。
高级解决方案
在本教程中,请创建两个管道来执行下述两项操作:
首次加载: 创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。
增量加载: 创建一个包含以下活动的管道并定期运行。
- 创建两项查找活动,从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。
- 创建一项复制活动,将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。
- 创建一项存储过程活动,更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。
如果没有 Azure 订阅,请在开始前创建一个试用帐户。
先决条件
- Azure PowerShell。 按如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。
- Azure SQL 数据库。 将数据库用作源数据存储。 如果没有 Azure SQL 数据库,请参阅创建 Azure SQL 数据库一文获取创建步骤。
- Azure 存储帐户。 将 Blob 存储用作接收器数据存储。 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。 创建名为 adftutorial 的容器。
在数据库中创建数据源表
启动 SQL Server Management Studio,连接到 SQL 数据库。
在“服务器资源管理器”中,右键单击你的数据库,然后选择“新建查询”。
针对数据库运行以下 SQL 命令,创建名为
data_source_table
的表作为数据源存储。create table data_source_table ( PersonID int NOT NULL, Name varchar(255), Age int PRIMARY KEY (PersonID) ); INSERT INTO data_source_table (PersonID, Name, Age) VALUES (1, 'aaaa', 21), (2, 'bbbb', 24), (3, 'cccc', 20), (4, 'dddd', 26), (5, 'eeee', 22);
通过运行以下 SQL 查询,在数据库和源表 (data_source_table) 上启用更改跟踪机制:
注意
- 将 <your database name> 替换为你的数据库的名称,其中包含 data_source_table。
- 在当前的示例中,更改的数据保留两天。 如果每隔三天或三天以上加载更改的数据,则不会包括某些更改的数据。 需将 CHANGE_RETENTION 的值更改为更大的值。 或者,确保在两天内加载一次更改的数据。 有关详细信息,请参阅对数据库启用更改跟踪
ALTER DATABASE <your database name> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) ALTER TABLE data_source_table ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
通过运行以下查询,创建一个新表并存储带默认值的 ChangeTracking_version:
create table table_store_ChangeTracking_version ( TableName varchar(255), SYS_CHANGE_VERSION BIGINT, ); DECLARE @ChangeTracking_version BIGINT SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION(); INSERT INTO table_store_ChangeTracking_version VALUES ('data_source_table', @ChangeTracking_version)
注意
如果对 SQL 数据库启用更改跟踪后数据并未更改,则更改跟踪版本的值为 0。
运行以下查询,在数据库中创建存储过程。 管道会调用此存储过程,以便更新上一步创建的表中的更改跟踪版本。
CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50) AS BEGIN UPDATE table_store_ChangeTracking_version SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion WHERE [TableName] = @TableName END
Azure PowerShell
按如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。
创建数据工厂
为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。 例如:
"adfrg"
。$resourceGroupName = "ADFTutorialResourceGroup";
如果该资源组已存在,请勿覆盖它。 为
$resourceGroupName
变量分配另一个值,然后再次运行命令定义一个用于数据工厂位置的变量:
$location = "China East 2"
若要创建 Azure 资源组,请运行以下命令:
New-AzResourceGroup $resourceGroupName $location
如果该资源组已存在,请勿覆盖它。 为
$resourceGroupName
变量分配另一个值,然后再次运行命令。定义一个用于数据工厂名称的变量。
重要
更新数据工厂名称,使之全局唯一。
$dataFactoryName = "IncCopyChgTrackingDF";
要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
请注意以下几点:
Azure 数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试。
The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。
若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。
创建链接服务
可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分中,创建 Azure 存储帐户和 Azure SQL 数据库中数据库的链接服务。
创建 Azure 存储链接服务。
在此步骤中,请将 Azure 存储帐户链接到数据工厂。
在 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件:(如果此文件夹尚未存在,请创建。) 将
<accountName>
和<accountKey>
分别替换为 Azure 存储帐户的名称和密钥,然后保存文件。{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn" } } }
在 Azure PowerShell 中切换到 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹。
运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureStorageLinkedService。 在以下示例中,传递 ResourceGroupName 和 DataFactoryName 参数的值。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
创建 Azure SQL 数据库链接服务
在此步骤中,请将数据库链接到数据工厂。
在 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中创建名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件,并在其中包含以下内容:在保存该文件之前,请将 <your-server-name> 和 <your-database-name> 替换为你的服务器和数据库的名称。 此外,必须配置 Azure SQL Server,以便为数据工厂的托管标识授予访问权限。
{ "name": "AzureSqlDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server=tcp:<your-server-name>.database.chinacloudapi.cn,1433;Database=<your-database-name>;" }, "authenticationType": "ManagedIdentity", "annotations": [] } }
在 Azure PowerShell 中,运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureSQLDatabaseLinkedService。
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
下面是示例输出:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
创建数据集
在此步骤中,请创建表示数据源和数据目标的数据集, 并创建用于存储 SYS_CHANGE_VERSION 的位置。
创建源数据集
在此步骤中,请创建一个代表源数据的数据集。
在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SourceDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SourceDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建接收器数据集
在此步骤中,请创建一个数据集,代表从源数据存储复制的数据。
在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incchgtracking", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
在 Azure Blob 存储中创建 adftutorial 容器,这是先决条件的部分要求。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 在本教程中,输出文件名是使用以下表达式动态生成的:@CONCAT('Incremental-', pipeline().RunId, '.txt')。
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SinkDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : SinkDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
创建更改跟踪数据集
在此步骤中,请创建用于存储更改跟踪版本的数据集。
在同一文件夹中,创建包含以下内容的名为 ChangeTrackingDataset.json 的 JSON 文件:
{ "name": " ChangeTrackingDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "table_store_ChangeTracking_version" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
创建 table_store_ChangeTracking_version 表,这是先决条件的部分要求。
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:ChangeTrackingDataset
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
下面是该 cmdlet 的示例输出:
DatasetName : ChangeTrackingDataset ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建用于完整复制的管道
在这一步,请创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。
创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 FullCopyPipeline.json:
{ "name": "FullCopyPipeline", "properties": { "activities": [{ "name": "FullCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource" }, "sink": { "type": "BlobSink" } }, "inputs": [{ "referenceName": "SourceDataset", "type": "DatasetReference" }], "outputs": [{ "referenceName": "SinkDataset", "type": "DatasetReference" }] }] } }
运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
下面是示例输出:
PipelineName : FullCopyPipeline ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Activities : {FullCopyActivity} Parameters :
运行完整的复制管道
运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 FullCopyPipeline。
Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName
监视完整的复制管道
登录到 Azure 门户。
单击“所有服务”,使用关键字
data factories
进行搜索,然后选择“数据工厂”。在数据工厂列表中搜索你的数据工厂,然后选择它来启动“数据工厂”页。
在“数据工厂”页中,单击“监视和管理”磁贴。
数据集成应用程序在单独的选项卡中启动。可以看到所有管道运行及其状态。 请注意,在以下示例中,管道运行的状态为“成功”。 单击“参数”列中的链接即可查看传递至管道的参数。 如果有错误,在“错误”列可以看到链接。 单击“操作”列中的链接。
单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行。
若要切换回“管道运行”视图,请单击“管道”,如图所示。
查看结果
可以在 adftutorial
容器的 incchgtracking
文件夹中看到名为 incremental-<GUID>.txt
的文件。
该文件应包含数据库中的数据:
1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22
向源表中添加更多数据
对数据库运行以下查询来添加和更新行。
INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');
UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1
创建用于增量复制的管道
在此步骤中,请创建一个包含以下活动的管道并定期运行。 查找活动从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。 复制活动将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。 存储过程活动更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。
创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 IncrementalCopyPipeline.json:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupLastChangeTrackingVersionActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from table_store_ChangeTracking_version" }, "dataset": { "referenceName": "ChangeTrackingDataset", "type": "DatasetReference" } } }, { "name": "LookupCurrentChangeTrackingVersionActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupLastChangeTrackingVersionActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupCurrentChangeTrackingVersionActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoUpdateChangeTrackingActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "Update_ChangeTracking_Version", "storedProcedureParameters": { "CurrentTrackingVersion": { "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}", "type": "INT64" }, "TableName": { "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}", "type": "String" } } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
下面是示例输出:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADFTutorialResourceGroup DataFactoryName : IncCopyChgTrackingDF Activities : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity} Parameters :
运行增量复制管道
运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。
Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName
监视增量复制管道
在数据集成应用程序中,刷新“管道运行”视图。 确认在列表中看到 IncrementalCopyPipeline。 单击“操作”列中的链接。
单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行。
若要切换回“管道运行”视图,请单击“管道”,如图所示。
查看结果
可以在 adftutorial
容器的 incchgtracking
文件夹中看到第二个文件。
该文件应该只包含数据库中的增量数据。 带 U
的记录是数据库中的更新行,带 I
的记录是添加的行。
1,update,10,2,U
6,new,50,1,I
前三个列是 data_source_table 中的更改数据。 最后两个列是更改跟踪系统表中的元数据。 第四列是每个更改行的 SYS_CHANGE_VERSION。 第五列是操作:U = update(更新),I = insert(插入)。 如需详细了解更改跟踪信息,请参阅 CHANGETABLE。
==================================================================
PersonID Name Age SYS_CHANGE_VERSION SYS_CHANGE_OPERATION
==================================================================
1 update 10 2 U
6 new 50 1 I
相关内容
继续查看以下教程,了解如何仅基于 LastModifiedDate 来复制新的和更改的文件: