使用 PowerShell 以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储Incrementally load data from Azure SQL Database to Azure Blob storage using PowerShell
Azure 数据工厂
Azure Synapse Analytics
本教程将使用 Azure 数据工厂创建一个管道,用于将 Azure SQL 数据库的表中的增量数据加载到 Azure Blob 存储。In this tutorial, you use Azure Data Factory to create a pipeline that loads delta data from a table in Azure SQL Database to Azure Blob storage.
在本教程中执行以下步骤:You perform the following steps in this tutorial:
- 准备用于存储水印值的数据存储。Prepare the data store to store the watermark value.
- 创建数据工厂。Create a data factory.
- 创建链接服务。Create linked services.
- 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
- 创建管道。Create a pipeline.
- 运行管道。Run the pipeline.
- 监视管道运行。Monitor the pipeline run.
概述Overview
下面是高级解决方案示意图:Here is the high-level solution diagram:
下面是创建此解决方案所要执行的重要步骤:Here are the important steps to create this solution:
选择水印列。Select the watermark column. 在源数据存储中选择一个列,该列可用于将每个运行的新记录或已更新记录切片。Select one column in the source data store, which can be used to slice the new or updated records for every run. 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。Normally, the data in this selected column (for example, last_modify_time or ID) keeps increasing when rows are created or updated. 此列中的最大值用作水印。The maximum value in this column is used as a watermark.
准备用于存储水印值的数据存储。Prepare a data store to store the watermark value.
本教程在 SQL 数据库中存储水印值。In this tutorial, you store the watermark value in a SQL database.创建采用以下工作流的管道:Create a pipeline with the following workflow:
此解决方案中的管道具有以下活动:The pipeline in this solution has the following activities:
- 创建两个 Lookup 活动。Create two Lookup activities. 使用第一个 Lookup 活动检索上一个水印值。Use the first Lookup activity to retrieve the last watermark value. 使用第二个 Lookup 活动检索新的水印值。Use the second Lookup activity to retrieve the new watermark value. 这些水印值会传递到 Copy 活动。These watermark values are passed to the Copy activity.
- 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于新水印值的行。Create a Copy activity that copies rows from the source data store with the value of the watermark column greater than the old watermark value and less than the new watermark value. 然后,该活动将源数据存储中的增量数据作为新文件复制到 Blob 存储。Then, it copies the delta data from the source data store to Blob storage as a new file.
- 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。Create a StoredProcedure activity that updates the watermark value for the pipeline that runs next time.
如果没有 Azure 订阅,请在开始前创建一个试用帐户。If you don't have an Azure subscription, create a trial account before you begin.
先决条件Prerequisites
备注
本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介。To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShell。For Az module installation instructions, see Install Azure PowerShell.
- Azure SQL 数据库。Azure SQL Database. 将数据库用作源数据存储。You use the database as the source data store. 如果 Azure SQL 数据库没有数据库,请参阅在 Azure SQL 数据库中创建数据库,了解创建步骤。If you don't have a database in Azure SQL Database, see Create a database in Azure SQL Database for steps to create one.
- Azure 存储。Azure Storage. 将 Blob 存储用作接收器数据存储。You use the blob storage as the sink data store. 如果没有存储帐户,请参阅创建存储帐户以获取创建步骤。If you don't have a storage account, see Create a storage account for steps to create one. 创建名为 adftutorial 的容器。Create a container named adftutorial.
- Azure PowerShell。Azure PowerShell. 遵循安装和配置 Azure PowerShell 中的说明。Follow the instructions in Install and configure Azure PowerShell.
在 SQL 数据库中创建数据源表Create a data source table in your SQL database
打开 SQL Server Management Studio。Open SQL Server Management Studio. 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。In Server Explorer, right-click the database, and choose New Query.
针对 SQL 数据库运行以下 SQL 命令,创建名为
data_source_table
的表作为数据源存储:Run the following SQL command against your SQL database to create a table nameddata_source_table
as the data source store:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
本教程使用 LastModifytime 作为水印列。In this tutorial, you use LastModifytime as the watermark column. 下表显示了数据源存储中的数据:The data in the data source store is shown in the following table:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
在 SQL 数据库中创建另一个表,用于存储高水印值Create another table in your SQL database to store the high watermark value
针对 SQL 数据库运行以下 SQL 命令,创建名为
watermarktable
的表,用于存储水印值:Run the following SQL command against your SQL database to create a table namedwatermarktable
to store the watermark value:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
使用源数据存储的表名设置高水印的默认值。Set the default value of the high watermark with the table name of source data store. 在本教程中,表名为 data_source_table。In this tutorial, the table name is data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
查看
watermarktable
表中的数据。Review the data in the tablewatermarktable
.Select * from watermarktable
输出:Output:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
在 SQL 数据库中创建存储过程Create a stored procedure in your SQL database
运行以下命令,在 SQL 数据库中创建存储过程:Run the following command to create a stored procedure in your SQL database:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
创建数据工厂Create a data factory
为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。Define a variable for the resource group name that you use in PowerShell commands later. 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。Copy the following command text to PowerShell, specify a name for the Azure resource group in double quotation marks, and then run the command. 示例为
"adfrg"
。An example is"adfrg"
.$resourceGroupName = "ADFTutorialResourceGroup";
如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. 为
$resourceGroupName
变量分配另一个值,然后再次运行命令Assign a different value to the$resourceGroupName
variable, and run the command again.定义一个用于数据工厂位置的变量。Define a variable for the location of the data factory.
$location = "China East 2"
若要创建 Azure 资源组,请运行以下命令:To create the Azure resource group, run the following command:
New-AzResourceGroup $resourceGroupName $location
如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. 为
$resourceGroupName
变量分配另一个值,然后再次运行命令Assign a different value to the$resourceGroupName
variable, and run the command again.定义一个用于数据工厂名称的变量。Define a variable for the data factory name.
重要
更新数据工厂名称,使之全局唯一。Update the data factory name to make it globally unique. 例如 ADFTutorialFactorySP1127。An example is ADFTutorialFactorySP1127.
$dataFactoryName = "ADFIncCopyTutorialFactory";
要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:To create the data factory, run the following Set-AzDataFactoryV2 cmdlet:
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
请注意以下几点:Note the following points:
数据工厂的名称必须全局唯一。The name of the data factory must be globally unique. 如果收到以下错误,请更改名称并重试:If you receive the following error, change the name and try again:
The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。To create Data Factory instances, the user account you use to sign in to Azure must be a member of contributor or owner roles, or an administrator of the Azure subscription.
若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、SQL 数据库、Azure SQL 托管实例等)和计算资源(Azure HDInsight 等)可以位于其他区域中。The data stores (Storage, SQL Database, Azure SQL Managed Instance, and so on) and computes (Azure HDInsight, etc.) used by the data factory can be in other regions.
创建链接服务Create linked services
可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,创建到存储帐户和 SQL 数据库的链接服务。In this section, you create linked services to your storage account and SQL Database.
创建存储链接服务Create a Storage linked service
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件。Create a JSON file named AzureStorageLinkedService.json in the C:\ADF folder with the following content. (如果文件夹 ADF 不存在,请创建。)将
<accountName>
和<accountKey>
替换为存储帐户的名称和密钥,然后保存文件。(Create the folder ADF if it doesn't already exist.) Replace<accountName>
and<accountKey>
with the name and key of your storage account before you save the file.{ "name": "AzureStorageLinkedService", "properties": { "type": "AzureStorage", "typeProperties": { "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn" } } }
在 PowerShell 中切换到 ADF 文件夹。In PowerShell, switch to the ADF folder.
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。Run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureStorageLinkedService. 在以下示例中,传递 ResourceGroupName 和 DataFactoryName 参数的值:In the following example, you pass values for the ResourceGroupName and DataFactoryName parameters:
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
下面是示例输出:Here is the sample output:
LinkedServiceName : AzureStorageLinkedService ResourceGroupName : <resourceGroupName> DataFactoryName : <dataFactoryName> Properties : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
创建 SQL 数据库链接服务Create a SQL Database linked service
在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。Create a JSON file named AzureSQLDatabaseLinkedService.json in the C:\ADF folder with the following content. (如果文件夹 ADF 不存在,请创建。)将 <server>、<database>、<user id> 和 <password> 分别替换为自己的服务器名称、数据库、用户 ID 和密码,然后保存文件。(Create the folder ADF if it doesn't already exist.) Replace <server>, <database>, <user id>, and <password> with the name of your server, database, user ID, and password before you save the file.
{ "name": "AzureSQLDatabaseLinkedService", "properties": { "type": "AzureSqlDatabase", "typeProperties": { "connectionString": "Server = tcp:<server>.database.chinacloudapi.cn,1433;Initial Catalog=<database>; Persist Security Info=False; User ID=<user> ; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;" } } }
在 PowerShell 中切换到 ADF 文件夹。In PowerShell, switch to the ADF folder.
运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureSQLDatabaseLinkedService。Run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureSQLDatabaseLinkedService.
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
下面是示例输出:Here is the sample output:
LinkedServiceName : AzureSQLDatabaseLinkedService ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService ProvisioningState :
创建数据集Create datasets
在此步骤中,创建表示源和接收器数据的数据集。In this step, you create datasets to represent source and sink data.
创建源数据集Create a source dataset
在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:Create a JSON file named SourceDataset.json in the same folder with the following content:
{ "name": "SourceDataset", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "data_source_table" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
本教程使用表名 data_source_table。In this tutorial, you use the table name data_source_table. 如果使用其他名称的表,请替换名称。Replace it if you use a table with a different name.
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SourceDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:
DatasetName : SourceDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建接收器数据集Create a sink dataset
在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:Create a JSON file named SinkDataset.json in the same folder with the following content:
{ "name": "SinkDataset", "properties": { "type": "AzureBlob", "typeProperties": { "folderPath": "adftutorial/incrementalcopy", "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')", "format": { "type": "TextFormat" } }, "linkedServiceName": { "referenceName": "AzureStorageLinkedService", "type": "LinkedServiceReference" } } }
重要
此代码片段假设 Blob 存储中有一个名为
adftutorial
的 Blob 容器。This snippet assumes that you have a blob container namedadftutorial
in your blob storage. 创建容器(如果不存在),或者将容器设置为现有容器的名称。Create the container if it doesn't exist, or set it to the name of an existing one. 会自动创建输出文件夹incrementalcopy
(如果容器中不存在)。The output folderincrementalcopy
is automatically created if it doesn't exist in the container. 在本教程中,文件名是使用表达式@CONCAT('Incremental-', pipeline().RunId, '.txt')
动态生成的。In this tutorial, the file name is dynamically generated by using the expression@CONCAT('Incremental-', pipeline().RunId, '.txt')
.运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SinkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:
DatasetName : SinkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
为水印创建数据集Create a dataset for a watermark
在此步骤中,创建用于存储高水印值的数据集。In this step, you create a dataset for storing a high watermark value.
在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:Create a JSON file named WatermarkDataset.json in the same folder with the following content:
{ "name": " WatermarkDataset ", "properties": { "type": "AzureSqlTable", "typeProperties": { "tableName": "watermarktable" }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" } } }
运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset WatermarkDataset.
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:
DatasetName : WatermarkDataset ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Structure : Properties : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
创建管道Create a pipeline
本教程创建包含两个 Lookup 活动、一个 Copy 活动和一个 StoredProcedure 活动的管道,这些活动链接在一个管道中。In this tutorial, you create a pipeline with two Lookup activities, one Copy activity, and one StoredProcedure activity chained in one pipeline.
在同一文件夹中,创建包含以下内容的 JSON 文件 IncrementalCopyPipeline.json:Create a JSON file IncrementalCopyPipeline.json in the same folder with the following content:
{ "name": "IncrementalCopyPipeline", "properties": { "activities": [ { "name": "LookupOldWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from watermarktable" }, "dataset": { "referenceName": "WatermarkDataset", "type": "DatasetReference" } } }, { "name": "LookupNewWaterMarkActivity", "type": "Lookup", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table" }, "dataset": { "referenceName": "SourceDataset", "type": "DatasetReference" } } }, { "name": "IncrementalCopyActivity", "type": "Copy", "typeProperties": { "source": { "type": "SqlSource", "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'" }, "sink": { "type": "BlobSink" } }, "dependsOn": [ { "activity": "LookupNewWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] }, { "activity": "LookupOldWaterMarkActivity", "dependencyConditions": [ "Succeeded" ] } ], "inputs": [ { "referenceName": "SourceDataset", "type": "DatasetReference" } ], "outputs": [ { "referenceName": "SinkDataset", "type": "DatasetReference" } ] }, { "name": "StoredProceduretoWriteWatermarkActivity", "type": "SqlServerStoredProcedure", "typeProperties": { "storedProcedureName": "usp_write_watermark", "storedProcedureParameters": { "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" }, "TableName": { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"} } }, "linkedServiceName": { "referenceName": "AzureSQLDatabaseLinkedService", "type": "LinkedServiceReference" }, "dependsOn": [ { "activity": "IncrementalCopyActivity", "dependencyConditions": [ "Succeeded" ] } ] } ] } }
运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。Run the Set-AzDataFactoryV2Pipeline cmdlet to create the pipeline IncrementalCopyPipeline.
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
下面是示例输出:Here is the sample output:
PipelineName : IncrementalCopyPipeline ResourceGroupName : ADF DataFactoryName : incrementalloadingADF Activities : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity} Parameters :
运行管道Run the pipeline
使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。Run the pipeline IncrementalCopyPipeline by using the Invoke-AzDataFactoryV2Pipeline cmdlet. 将占位符替换为自己的资源组和数据工厂名称。Replace placeholders with your own resource group and data factory name.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。Check the status of the pipeline by running the Get-AzDataFactoryV2ActivityRun cmdlet until you see all the activities running successfully. 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。Replace placeholders with your own appropriate time for the parameters RunStartedAfter and RunStartedBefore. 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。In this tutorial, you use -RunStartedAfter "2017/09/14" and -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:Here is the sample output:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:42:50 AM DurationInMs : 7777 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 7:42:42 AM ActivityRunEnd : 9/14/2017 7:43:07 AM DurationInMs : 25437 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:10 AM ActivityRunEnd : 9/14/2017 7:43:29 AM DurationInMs : 19769 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : d4bf3ce2-5d60-43f3-9318-923155f61037 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 7:43:32 AM ActivityRunEnd : 9/14/2017 7:43:47 AM DurationInMs : 14467 Status : Succeeded Error : {errorCode, message, failureType, target}
查看结果Review the results
在 Blob 存储(接收器存储)中,可看到数据已复制到 SinkDataset 中定义的文件。In the blob storage (sink store), you see that the data were copied to the file defined in SinkDataset. 在当前的教程中,文件名为
Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
。In the current tutorial, the file name isIncremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt
. 打开该文件,可以看到其中与 SQL 数据库中的数据相同的记录。Open the file, and you can see records in the file that are the same as the data in the SQL database.1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
在
watermarktable
中查看最新值。Check the latest value fromwatermarktable
. 可看到水印值已更新。You see that the watermark value was updated.Select * from watermarktable
下面是示例输出:Here is the sample output:
TableNameTableName WatermarkValueWatermarkValue data_source_tabledata_source_table 2017-09-05 8:06:00.0002017-09-05 8:06:00.000
将数据插入数据源存储,验证增量数据的加载Insert data into the data source store to verify delta data loading
在 SQL 数据库(数据源存储)中插入新数据。Insert new data into the SQL database (data source store).
INSERT INTO data_source_table VALUES (6, 'newdata','9/6/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
SQL 数据库中的更新数据为:The updated data in the SQL database is:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000 6 | newdata | 2017-09-06 02:23:00.000 7 | newdata | 2017-09-07 09:01:00.000
通过使用 Invoke-AzDataFactoryV2Pipeline cmdlet 再次运行管道 IncrementalCopyPipeline。Run the pipeline IncrementalCopyPipeline again by using the Invoke-AzDataFactoryV2Pipeline cmdlet. 将占位符替换为自己的资源组和数据工厂名称。Replace placeholders with your own resource group and data factory name.
$RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。Check the status of the pipeline by running the Get-AzDataFactoryV2ActivityRun cmdlet until you see all the activities running successfully. 将占位符替换为针对 RunStartedAfter 和 RunStartedBefore 参数指定的自己的适当时间。Replace placeholders with your own appropriate time for the parameters RunStartedAfter and RunStartedBefore. 本教程使用 -RunStartedAfter "2017/09/14" 和 -RunStartedBefore "2017/09/15" 。In this tutorial, you use -RunStartedAfter "2017/09/14" and -RunStartedBefore "2017/09/15".
Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
下面是示例输出:Here is the sample output:
ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupNewWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {NewWatermarkvalue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:58 AM DurationInMs : 31758 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : LookupOldWaterMarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, dataset} Output : {TableName, WatermarkValue} LinkedServiceName : ActivityRunStart : 9/14/2017 8:52:26 AM ActivityRunEnd : 9/14/2017 8:52:52 AM DurationInMs : 25497 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : IncrementalCopyActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {source, sink} Output : {dataRead, dataWritten, rowsCopied, copyDuration...} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:00 AM ActivityRunEnd : 9/14/2017 8:53:20 AM DurationInMs : 20194 Status : Succeeded Error : {errorCode, message, failureType, target} ResourceGroupName : ADF DataFactoryName : incrementalloadingADF ActivityName : StoredProceduretoWriteWatermarkActivity PipelineRunId : 2fc90ab8-d42c-4583-aa64-755dba9925d7 PipelineName : IncrementalCopyPipeline Input : {storedProcedureName, storedProcedureParameters} Output : {} LinkedServiceName : ActivityRunStart : 9/14/2017 8:53:23 AM ActivityRunEnd : 9/14/2017 8:53:41 AM DurationInMs : 18502 Status : Succeeded Error : {errorCode, message, failureType, target}
在 Blob 存储中,可以看到另一文件已创建。In the blob storage, you see that another file was created. 在本教程中,新文件名为
Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
。In this tutorial, the new file name isIncremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt
. 打开该文件,会看到其中包含两行记录。Open that file, and you see two rows of records in it.在
watermarktable
中查看最新值。Check the latest value fromwatermarktable
. 可看到水印值已再次更新。You see that the watermark value was updated again.Select * from watermarktable
示例输出:sample output:
TableNameTableName WatermarkValueWatermarkValue data_source_tabledata_source_table 2017-09-07 09:01:00.0002017-09-07 09:01:00.000
后续步骤Next steps
已在本教程中执行了以下步骤:You performed the following steps in this tutorial:
- 准备用于存储水印值的数据存储。Prepare the data store to store the watermark value.
- 创建数据工厂。Create a data factory.
- 创建链接服务。Create linked services.
- 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
- 创建管道。Create a pipeline.
- 运行管道。Run the pipeline.
- 监视管道运行。Monitor the pipeline run.
在本教程中,管道将数据从 Azure SQL 数据库中的单个表复制到 Blob 存储。In this tutorial, the pipeline copied data from a single table in Azure SQL Database to Blob storage. 转到下面的教程,了解如何将数据从 SQL Server 数据库中的多个表复制到 SQL 数据库。Advance to the following tutorial to learn how to copy data from multiple tables in a SQL Server database to SQL Database.