教程:使用 Azure 数据工厂将数据从 SQL Edge 同步到 Azure Blob 存储Tutorial: Sync data from SQL Edge to Azure Blob storage by using Azure Data Factory

在本教程中,你将使用 Azure 数据工厂以增量方式将数据从 Azure SQL Edge 实例中的表同步到 Azure Blob 存储。In this tutorial, you'll use Azure Data Factory to incrementally sync data to Azure Blob storage from a table in an instance of Azure SQL Edge.

开始之前Before you begin

如果你还没有在 Azure SQL Edge 部署中创建数据库或表,请使用以下方法之一来创建一个:If you haven't already created a database or table in your Azure SQL Edge deployment, use one of these methods to create one:

  • 使用 SQL Server Management StudioAzure Data Studio 连接到 SQL Edge。Use SQL Server Management Studio or Azure Data Studio to connect to SQL Edge. 运行 SQL 脚本创建数据库和表。Run a SQL script to create the database and table.

  • 通过直接连接到 SQL Edge 模块,使用 SQLCMD 创建数据库和表。Create a database and table by using SQLCMD by directly connecting to the SQL Edge module. 有关详细信息,请参阅使用 sqlcmd 连接到数据库引擎For more information, see Connect to the Database Engine by using sqlcmd.

  • 使用 SQLPackage.exe 将 DAC 包文件部署到 SQL Edge 容器。Use SQLPackage.exe to deploy a DAC package file to the SQL Edge container. 将 SqlPackage 文件 URI 指定为模块所需属性配置的一部分可以自动化此过程。You can automate this process by specifying the SqlPackage file URI as part of the module's desired properties configuration. 也可以直接使用 SqlPackage.exe 客户端工具将 DAC 包部署到 SQL Edge。You can also directly use the SqlPackage.exe client tool to deploy a DAC package to SQL Edge.

    有关如何下载 SqlPackage.exe 的详细信息,请参阅下载和安装 sqlpackageFor information about how to download SqlPackage.exe, see Download and install sqlpackage. 以下是适用于 SqlPackage.exe 的一些示例命令。Following are some sample commands for SqlPackage.exe. 有关详细信息,请参阅 SqlPackage.exe 文档。For more information, see the SqlPackage.exe documentation.

    创建 DAC 包Create a DAC package

    sqlpackage /Action:Extract /SourceConnectionString:"Data Source=<Server_Name>,<port>;Initial Catalog=<DB_name>;User ID=<user>;Password=<password>" /TargetFile:<dacpac_file_name>
    

    应用 DAC 包Apply a DAC package

    sqlpackage /Action:Publish /Sourcefile:<dacpac_file_name> /TargetServerName:<Server_Name>,<port> /TargetDatabaseName:<DB_Name> /TargetUser:<user> /TargetPassword:<password>
    

创建 SQL 表和过程用于存储和更新水印级别Create a SQL table and procedure to store and update the watermark levels

水印表用于存储上次将数据同步到 Azure 存储时的时间戳。A watermark table is used to store the last timestamp up to which data has already been synchronized with Azure Storage. Transact-SQL (T-SQL) 存储过程用于在每次同步后更新水印表。A Transact-SQL (T-SQL) stored procedure is used to update the watermark table after every sync.

在 SQL Edge 实例上运行以下命令:Run these commands on the SQL Edge instance:

    Create table [dbo].[watermarktable]
    (
    TableName varchar(255),
    WatermarkValue datetime,
    )
    GO

    CREATE PROCEDURE usp_write_watermark @timestamp datetime, @TableName varchar(50)  
    AS  
    BEGIN
    UPDATE [dbo].[watermarktable]
    SET [WatermarkValue] = @timestamp WHERE [TableName] = @TableName
    END
    Go

创建数据工厂管道Create a Data Factory pipeline

在此部分中,你将创建一个 Azure 数据工厂管道,用于将数据从 Azure SQL Edge 中的表同步到 Azure Blob 存储。In this section, you'll create an Azure Data Factory pipeline to sync data to Azure Blob storage from a table in Azure SQL Edge.

使用数据工厂 UI 创建数据工厂Create a data factory by using the Data Factory UI

按照本教程中的说明创建数据工厂。Create a data factory by following the instructions in this tutorial.

创建数据工厂管道Create a Data Factory pipeline

  1. 在数据工厂 UI 的“开始使用”页上,选择“创建管道” 。On the Let's get started page of the Data Factory UI, select Create pipeline.

    创建数据工厂管道

  2. 在管道“属性”窗口的“常规”页上,输入名称 PeriodicSync 。On the General page of the Properties window for the pipeline, enter PeriodicSync for the name.

  3. 添加“查找”活动用于获取旧水印值。Add the Lookup activity to get the old watermark value. 在“活动”窗格中展开“常规”,将“查找”活动拖动到管道设计器图面 。In the Activities pane, expand General and drag the Lookup activity to the pipeline designer surface. 将活动的名称更改为 OldWatermarkChange the name of the activity to OldWatermark.

    添加旧水印查找

  4. 切换到“设置”选项卡,选择“源数据集”对应的“新建” 。Switch to the Settings tab and select New for Source Dataset. 现在,你将创建一个代表水印表中的数据的数据集。You'll now create a dataset to represent data in the watermark table. 此表包含在前一复制操作中使用过的旧水印。This table contains the old watermark that was used in the previous copy operation.

  5. 在“新建数据集”窗口中,依次选择“Azure SQL Server”、“继续” 。In the New Dataset window, select Azure SQL Server, and then select Continue.

  6. 在数据集“设置属性”窗口的“名称”下,输入 WatermarkDataset 。In the Set properties window for the dataset, under Name, enter WatermarkDataset.

  7. 对于“链接服务”,选择“新建”,然后完成下列步骤 :For Linked Service, select New, and then complete these steps:

    1. 在“名称”下,输入 SQLDBEdgeLinkedService 。Under Name, enter SQLDBEdgeLinkedService.

    2. 在“服务器名称”下,输入 SQL Edge 服务器详细信息。Under Server name, enter your SQL Edge server details.

    3. 从列表中选择“数据库名称”。Select your Database name from the list.

    4. 输入用户名密码Enter your User name and Password.

    5. 若要测试与 SQL Edge 实例的连接,请选择“测试连接”。To test the connection to the SQL Edge instance, select Test connection.

    6. 选择“创建”。Select Create.

    创建链接服务

    1. 选择“确定”。Select OK.
  8. 在“设置”选项卡中选择“编辑” 。On the Settings tab, select Edit.

  9. 在“连接”选项卡中,对于“表”,选择“[dbo].[watermarktable]” 。On the Connection tab, select [dbo].[watermarktable] for Table. 若要预览表中的数据,请选择“预览数据”。If you want to preview data in the table, select Preview data.

  10. 通过选择顶部的管道选项卡,或者选择左侧树状视图中管道的名称,切换到管道编辑器。Switch to the pipeline editor by selecting the pipeline tab at the top or by selecting the name of the pipeline in the tree view on the left. 在“查找”活动的属性窗口中,确认在“源数据集”列表中选择了“WatermarkDataset” 。In the properties window for the Lookup activity, confirm that WatermarkDataset is selected in the Source dataset list.

  11. 在“活动”窗格中展开“常规”,将另一个“查找”活动拖动到管道设计器图面 。In the Activities pane, expand General and drag another Lookup activity to the pipeline designer surface. 在属性窗口的“常规”选项卡中,将名称设置为 NewWatermark 。Set the name to NewWatermark on the General tab of the properties window. 此“查找”活动从包含源数据的表获取新的水印值,因此源数据可以复制到目标。This Lookup activity gets the new watermark value from the table that contains the source data so it can be copied to the destination.

  12. 在第二个“查找”活动的属性窗口中切换到“设置”选项卡,然后选择“新建”以创建一个数据集,该数据集指向包含新水印值的源表 。In the properties window for the second Lookup activity, switch to the Settings tab and select New to create a dataset to point to the source table that contains the new watermark value.

  13. 在“新建数据集”窗口中,依次选择 SQL Edge 实例和“继续”。In the New Dataset window, select SQL Edge instance, and then select Continue.

    1. 在“设置属性”窗口的“名称”下,输入 SourceDataset 。In the Set properties window, under Name, enter SourceDataset. 在“链接服务”下,选择 SQLDBEdgeLinkedService 。Under Linked service, select SQLDBEdgeLinkedService.

    2. 在“表”下,选择要同步的表。Under Table, select the table that you want to synchronize. 你还可以按本教程稍后所述,为此数据集指定查询。You can also specify a query for this dataset, as described later in this tutorial. 此查询优先于在此步骤中指定的表。The query takes precedence over the table you specify in this step.

    3. 选择“确定”。Select OK.

  14. 通过选择顶部的管道选项卡,或者选择左侧树状视图中管道的名称,切换到管道编辑器。Switch to the pipeline editor by selecting the pipeline tab at the top or by selecting the name of the pipeline in the tree view on the left. 在“查找”活动的属性窗口中,确认在“源数据集”列表中选择了“SourceDataset” 。In the properties window for the Lookup activity, confirm that SourceDataset is selected in the Source dataset list.

  15. 在“使用查询”下,选择“查询” 。Select Query under Use query. 在以下查询中更新表名称,然后输入查询。Update the table name in the following query and then enter the query. 仅从表中选择 timestamp 的最大值。You're selecting only the maximum value of timestamp from the table. 确保选择“仅第一行”。Be sure to select First row only.

    select MAX(timestamp) as NewWatermarkvalue from [TableName]
    

    选择查询

  16. 在“活动”窗格中,展开“移动和转换”,然后将“复制”活动从“活动”窗格拖动到设计器图面 。In the Activities pane, expand Move & Transform and drag the Copy activity from the Activities pane to the designer surface. 将活动的名称设置为 IncrementalCopy。Set the name of the activity to IncrementalCopy.

  17. 通过将附加到“查找”活动的绿色按钮拖动到“复制”活动,将两个“查找”活动都连接到“复制”活动。Connect both Lookup activities to the Copy activity by dragging the green button attached to the Lookup activities to the Copy activity. 看到“复制”活动的边框颜色变为蓝色时,松开鼠标按键。Release the mouse button when you see the border color of the Copy activity change to blue.

  18. 选择“复制”活动,确认“属性”窗口中已显示该活动的属性。Select the Copy activity and confirm that you see the properties for the activity in the Properties window.

  19. 在“属性”窗口中切换到“源”选项卡,然后完成以下步骤 :Switch to the Source tab in the Properties window and complete these steps:

    1. 在“源数据集”框中,选择“SourceDataset” 。In the Source dataset box, select SourceDataset.

    2. 在“使用查询”下,选择“查询” 。Under Use query, select Query.

    3. 在“查询”框中,输入 SQL 查询。Enter the SQL query in the Query box. 下面是一个示例查询:Here's a sample query:

    select * from TemperatureSensor where timestamp > '@{activity('OldWaterMark').output.firstRow.WatermarkValue}' and timestamp <= '@{activity('NewWaterMark').output.firstRow.NewWatermarkvalue}'
    
  20. 在“接收器”选项卡的“接收器数据集”下,选择“新建” 。On the Sink tab, select New under Sink Dataset.

  21. 在本教程中,接收器数据存储为 Azure Blob 存储数据存储。In this tutorial, the sink data store is an Azure Blob storage data store. 选择“Azure Blob 存储”,然后在“新建数据集”窗口中选择“继续” 。Select Azure Blob storage, and then select Continue in the New Dataset window.

  22. 在“选择格式”窗口中选择数据的格式,然后选择“继续” 。In the Select Format window, select the format of your data, and then select Continue.

  23. 在“设置属性”窗口的“名称”下,输入 SinkDataset 。In the Set Properties window, under Name, enter SinkDataset. 在“链接服务”下,选择“新建” 。Under Linked service, select New. 现在,你将创建一个连接(链接服务),用于连接到 Azure Blob 存储。You'll now create a connection (a linked service) to your Azure Blob storage.

  24. 在“新建链接服务(Azure Blob 存储)”窗口中完成以下步骤:In the New Linked Service (Azure Blob storage) window, complete these steps:

    1. 在“名称”框中,输入 AzureStorageLinkedService 。In the Name box, enter AzureStorageLinkedService.

    2. 在“存储帐户名称”中,为 Azure 订阅选择 Azure 存储帐户。Under Storage account name, select the Azure storage account for your Azure subscription.

    3. 测试连接,然后选择“完成”。Test the connection and then select Finish.

  25. 在“设置属性”窗口的“链接服务”下,确认选择了 AzureStorageLinkedService 。In the Set Properties window, confirm that AzureStorageLinkedService is selected under Linked service. 依次选择“创建”、“确定” 。Select Create and OK.

  26. 在“接收器”选项卡中选择“编辑” 。On Sink tab, select Edit.

  27. 转到 SinkDataset 的“连接”选项卡,然后完成以下步骤:Go to the Connection tab of SinkDataset and complete these steps:

    1. 在“文件路径”下,输入“asdedatasync/incrementalcopy”,其中“asdedatasync”是 Blob 容器名称,“incrementalcopy”是文件夹名称 。Under File path, enter asdedatasync/incrementalcopy, where asdedatasync is the blob container name and incrementalcopy is the folder name. 创建容器(如果不存在),或者使用现有容器的名称。Create the container if it doesn't exist, or use the name of an existing one. Azure 数据工厂自动创建输出文件夹 incrementalcopy(如果不存在)。Azure Data Factory automatically creates the output folder incrementalcopy if it doesn't exist. 对于“文件路径”,也可使用“浏览”按钮导航到 Blob 容器中的某个文件夹。 You can also use the Browse button for the File path to navigate to a folder in a blob container.

    2. 对于“文件路径”的“文件”部分,选择“添加动态内容 [Alt+P]”,然后在打开的窗口中输入 @CONCAT('Incremental-', pipeline().RunId, '.txt') 。For the File part of the File path, select Add dynamic content [Alt+P], and then enter @CONCAT('Incremental-', pipeline().RunId, '.txt') in the window that opens. 选择“完成”。Select Finish. 文件名是使用表达式动态生成的。The file name is dynamically generated by the expression. 每次管道运行都有唯一的 ID。Each pipeline run has a unique ID. “复制”活动使用运行 ID 生成文件名。The Copy activity uses the run ID to generate the file name.

  28. 通过选择顶部的管道选项卡,或者选择左侧树状视图中管道的名称,切换到管道编辑器。Switch to the pipeline editor by selecting the pipeline tab at the top or by selecting the name of the pipeline in the tree view on the left.

  29. 在“活动”窗格中,展开“常规”,然后将“存储过程”活动从“活动”窗格拖动到管道设计器图面 。In the Activities pane, expand General and drag the Stored Procedure activity from the Activities pane to the pipeline designer surface. 将“复制”活动的绿色(成功)输出连接到“存储过程”活动。Connect the green (success) output of the Copy activity to the Stored Procedure activity.

  30. 在管道设计器中选择“存储过程活动”,将其名称更改为 SPtoUpdateWatermarkActivity 。Select Stored Procedure Activity in the pipeline designer and change its name to SPtoUpdateWatermarkActivity.

  31. 切换到“SQL 帐户”选项卡,选择“链接服务”下的“*QLDBEdgeLinkedService” 。Switch to the SQL Account tab, and select *QLDBEdgeLinkedService under Linked service.

  32. 切换到“存储过程”选项卡,然后完成以下步骤:Switch to the Stored Procedure tab and complete these steps:

    1. 在“存储过程名称”下,选择“[dbo].[usp_write_watermark]” 。Under Stored procedure name, select [dbo].[usp_write_watermark].

    2. 若要指定存储过程参数的值,选择“导入参数”,然后为参数输入以下值:To specify values for the stored procedure parameters, select Import parameter and enter these values for the parameters:

    名称Name 类型Type Value
    LastModifiedtimeLastModifiedtime DateTimeDateTime @{activity('NewWaterMark').output.firstRow.NewWatermarkvalue}@{activity('NewWaterMark').output.firstRow.NewWatermarkvalue}
    TableNameTableName 字符串String @{activity('OldWaterMark').output.firstRow.TableName}@{activity('OldWaterMark').output.firstRow.TableName}
  33. 若要验证管道设置,请选择工具栏中的“验证”。To validate the pipeline settings, select Validate on the toolbar. 确认没有任何验证错误。Confirm that there are no validation errors. 若要关闭“管道验证报告”窗口,请选择 >> 。To close the Pipeline Validation Report window, select >>.

  34. 选择“全部发布”按钮,将实体(链接服务、数据集和管道)发布到 Azure 数据工厂服务。Publish the entities (linked services, datasets, and pipelines) to the Azure Data Factory service by selecting the Publish All button. 请等待,直到确认发布操作已成功的消息出现。Wait until you see a message confirming that the publish operation has succeeded.

根据计划触发管道Trigger a pipeline based on a schedule

  1. 在管道工具栏中选择“添加触发器”,然后依次选择“新建/编辑”、“新建” 。On the pipeline toolbar, select Add Trigger, select New/Edit, and then select New.

  2. 将触发器命名为 HourlySync。Name your trigger HourlySync. 在“类型”下,选择“计划” 。Under Type, select Schedule. 将“重复周期”设置为每小时一次。Set the Recurrence to every 1 hour.

  3. 选择“确定”。Select OK.

  4. 选择“全部发布”。Select Publish All.

  5. 选择“立即触发”。Select Trigger Now.

  6. 在左侧切换到“监视”选项卡。Switch to the Monitor tab on the left. 可以看到手动触发器触发的管道运行的状态。You can see the status of the pipeline run triggered by the manual trigger. 选择“刷新”可刷新列表。Select Refresh to refresh the list.

后续步骤Next steps

本教程中的 Azure 数据工厂管道每小时将数据从 SQL Edge 实例中的表复制到 Azure Blob 存储中的某个位置一次。The Azure Data Factory pipeline in this tutorial copies data from a table on a SQL Edge instance to a location in Azure Blob storage once every hour. 若要了解如何在其他方案中使用数据工厂,请参阅这些教程To learn about using Data Factory in other scenarios, see these tutorials.