将 SQL 数据库中的参考数据用于 Azure 流分析作业Use reference data from a SQL Database for an Azure Stream Analytics job

Azure 流分析支持将 Azure SQL 数据库用作参考数据的输入源。Azure Stream Analytics supports Azure SQL Database as a source of input for reference data. 可以在 Azure 门户和 Visual Studio 中配合流分析工具将 SQL 数据库用作流分析作业的参考数据。You can use SQL Database as reference data for your Stream Analytics job in the Azure portal and in Visual Studio with Stream Analytics tools. 本文演示如何使用这两种方法。This article demonstrates how to do both methods.

Azure 门户Azure portal

在 Azure 门户中使用以下步骤将 Azure SQL 数据库添加为参考输入源:Use the following steps to add Azure SQL Database as a reference input source using the Azure portal:

门户先决条件Portal prerequisites

  1. 创建流分析作业。Create a Stream Analytics job.

  2. 创建流分析作业使用的存储帐户。Create a storage account to be used by the Stream Analytics job.

  3. 创建包含数据集的 Azure SQL 数据库,流分析作业将使用该数据集作为参考数据。Create your Azure SQL Database with a data set to be used as reference data by the Stream Analytics job.

定义 SQL 数据库参考数据输入Define SQL Database reference data input

  1. 在流分析作业中,选择“作业拓扑”下的“输入”。 In your Stream Analytics job, select Inputs under Job topology. 单击“添加参考输入”并选择“SQL 数据库”。 Click Add reference input and choose SQL Database.

    流分析作业输入

  2. 填写“流分析输入配置”。Fill out the Stream Analytics Input Configurations. 选择数据库名称、服务器名称、用户名和密码。Choose the database name, server name, username and password. 如果希望参考数据输入定期刷新,请选择“打开”以指定刷新频率(采用 DD:HH:MM 格式)。If you want your reference data input to refresh periodically, choose “On” to specify the refresh rate in DD:HH:MM. 如果你的数据集较大且刷新频率较短,可以使用增量查询If you have large data sets with a short refresh rate, you can use a delta query.

    SQL 数据库参考配置

  3. 在 SQL 查询编辑器中测试快照查询。Test the snapshot query in the SQL query editor. 有关详细信息,请参阅使用 Azure 门户的 SQL 查询编辑器进行连接并查询数据For more information, see Use the Azure portal's SQL query editor to connect and query data

在作业配置中指定存储帐户Specify storage account in Job config

导航到“配置”下的“存储帐户设置”,然后选择“添加存储帐户”。 Navigate to Storage account settings under Configure and select Add storage account.

流分析存储帐户设置

启动作业Start the job

配置其他输入、输出和查询后,可以启动流分析作业。Once you have configured other inputs, outputs, and query, you can start the Stream Analytics job.

适用于 Visual Studio 的工具Tools for Visual Studio

在 Visual Studio 中使用以下步骤将 Azure SQL 数据库添加为参考输入源:Use the following steps to add Azure SQL Database as a reference input source using Visual Studio:

Visual Studio 先决条件Visual Studio prerequisites

  1. 安装用于 Visual Studio 的流分析工具Install the Stream Analytics tools for Visual Studio. 支持以下 Visual Studio 版本:The following versions of Visual Studio are supported:

    • Visual Studio 2015Visual Studio 2015
    • Visual Studio 2019Visual Studio 2019
  2. 通过用于 Visual Studio 的流分析工具快速入门来熟悉工具。Become familiar with the Stream Analytics tools for Visual Studio quickstart.

  3. 创建存储帐户。Create a storage account.

创建 SQL 数据库表Create a SQL Database table

使用 SQL Server Management Studio 创建用于存储参考数据的表。Use SQL Server Management Studio to create a table to store your reference data. 有关详细信息,请参阅使用 SSMS 设计第一个 Azure SQL 数据库See Design your first Azure SQL database using SSMS for details.

以下示例中使用的示例表是通过以下语句创建的:The example table used in the following example was created from the following statement:

create table chemicals(Id Bigint,Name Nvarchar(max),FullName Nvarchar(max));

选择自己的订阅Choose your subscription

  1. 在 Visual Studio 中,在“视图”菜单中选择“服务器资源管理器” 。In Visual Studio, on the View menu, select Server Explorer.

  2. 右键单击“Azure”,选择“连接到 Microsoft Azure 订阅”,然后使用 Azure 帐户登录。 Right click on Azure, select Connect to Microsoft Azure Subscription, and sign in with your Azure account.

创建流分析项目Create a Stream Analytics project

  1. 选择“文件”>“新建项目” 。Select File > New Project.

  2. 在左侧的模板列表中,选择“流分析”,然后选择“Azure 流分析应用程序” 。In the templates list on the left, select Stream Analytics, and then select Azure Stream Analytics Application.

  3. 输入项目的“名称”、“位置”和“解决方案名称”,然后选择“确定”。 Enter the project Name, Location, and Solution name, and select OK.

    Visual Studio 中的新流分析项目

定义 SQL 数据库参考数据输入Define SQL Database reference data input

  1. 创建新输入。Create a new input.

    Visual Studio 中的新流分析输入

  2. 在“解决方案资源管理器”中双击“Input.json”。 Double-click Input.json in the Solution Explorer.

  3. 填写“流分析输入配置”。 Fill out the Stream Analytics Input Configuration. 选择数据库名称、服务器名称、刷新类型和刷新频率。Choose the database name, server name, refresh type and refresh rate. DD:HH:MM 格式指定刷新频率。Specify the refresh rate in the format DD:HH:MM.

    Visual Studio 中的流分析输入配置

    如果选择“仅执行一次”或“定期执行”,项目中的“Input.json”文件节点下会生成一个名为 [Input Alias].snapshot.sql 的 SQL CodeBehind 文件。 If you choose "Execute only once" or "Execute periodically", one SQL CodeBehind file named [Input Alias].snapshot.sql is generated in the project under the Input.json file node.

    Visual Studio 中输入 Code Behind 文件

    如果选择“使用增量数据定期刷新”,则会生成两个 SQL CodeBehind 文件: [Input Alias].snapshot.sql[Input Alias].delta.sqlIf you choose "Refresh Periodically with Delta", two SQL CodeBehind files will be generated: [Input Alias].snapshot.sql and [Input Alias].delta.sql.

    解决方案资源管理器中的 Code Behind 文件

  4. 在编辑器中打开 SQL 文件并编写 SQL 查询。Open the SQL file in the editor and write the SQL query.

  5. 如果使用 Visual Studio 2019 并且已安装 SQL Server Data Tools,可以单击“执行”来测试查询。 If you are using Visual Studio 2019, and you have installed SQL Server Data tools, you can test the query by clicking Execute. 此时会弹出一个向导窗口帮助你连接到 SQL 数据库;查询结果将显示在窗口底部。A wizard window will pop up to help you connect to the SQL database and the query result will appear in the window at the bottom.

指定存储帐户Specify storage account

打开“JobConfig.json”以指定用于存储 SQL 参考快照的存储帐户。 Open JobConfig.json to specify the storage account for storing SQL reference snapshots.

Visual Studio 中的流分析作业配置

在本地进行测试并部署到 AzureTest locally and deploy to Azure

将作业部署到 Azure 之前,可在本地针对实时输入数据测试查询逻辑。Before deploying the job to Azure, you can test the query logic locally against live input data. 有关此功能的详细信息,请参阅使用用于 Visual Studio 的 Azure 流分析工具在本地测试实时数据(预览)For more information on this feature, see Test live data locally using Azure Stream Analytics tools for Visual Studio (Preview). 完成测试后,单击“提交到 Azure”。 When you're done testing, click Submit to Azure. 请参考使用用于 Visual Studio 的 Azure 流分析工具创建流分析快速入门来了解如何启动作业。Reference the Create a Stream Analytics using the Azure Stream Analytics tools for Visual Studio quickstart to learn how to start the job.

增量查询 Delta query

使用增量查询时,建议使用 Azure SQL 数据库中的时态表When using the delta query, temporal tables in Azure SQL Database are recommended.

  1. 在 Azure SQL 数据库中创建时态表。Create a temporal table in Azure SQL Database.

       CREATE TABLE DeviceTemporal 
       (  
          [DeviceId] int NOT NULL PRIMARY KEY CLUSTERED 
          , [GroupDeviceId] nvarchar(100) NOT NULL
          , [Description] nvarchar(100) NOT NULL 
          , [ValidFrom] datetime2 (0) GENERATED ALWAYS AS ROW START
          , [ValidTo] datetime2 (0) GENERATED ALWAYS AS ROW END
          , PERIOD FOR SYSTEM_TIME (ValidFrom, ValidTo)
       )  
       WITH (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DeviceHistory));  -- DeviceHistory table will be used in Delta query
    
  2. 创作快照查询。Author the snapshot query.

    使用 @snapshotTime 参数指示流分析运行时从在当前系统时间有效的 SQL 数据库时态表中获取参考数据集。Use the @snapshotTime parameter to instruct the Stream Analytics runtime to obtain the reference data set from SQL database temporal table valid at the system time. 如果不提供此参数,可能会出于时钟偏差的原因获取不准确的基本参考数据集。If you don't provide this parameter, you risk obtaining an inaccurate base reference data set due to clock skews. 完整的快照查询示例如下所示:An example of full snapshot query is shown below:

       SELECT DeviceId, GroupDeviceId, [Description]
       FROM dbo.DeviceTemporal
       FOR SYSTEM_TIME AS OF @snapshotTime
    
  3. 创作增量查询。Author the delta query.

    此查询检索从开始时间 @deltaStartTime 到结束时间 @deltaEndTime 范围内,在 SQL 数据库中插入或删除的所有行。This query retrieves all of the rows in your SQL database that were inserted or deleted within a start time, @deltaStartTime, and an end time @deltaEndTime. 增量查询必须返回与快照查询以及列操作相同的列。 The delta query must return the same columns as the snapshot query, as well as the column operation. 此列定义在 @deltaStartTime@deltaEndTime 时间范围内是否插入或删除了行。This column defines if the row is inserted or deleted between @deltaStartTime and @deltaEndTime. 如果插入了记录,则生成的行将标记为 1;如果删除了记录,则标记为 2The resulting rows are flagged as 1 if the records were inserted, or 2 if deleted.

    对于更新的记录,时态表将通过捕获插入和删除操作来执行簿记。For records that were updated, the temporal table does bookkeeping by capturing an insertion and deletion operation. 然后,流分析运行时将增量查询的结果应用到前一快照,以保持参考数据的最新状态。The Stream Analytics runtime will then apply the results of the delta query to the previous snapshot to keep the reference data up to date. 下面显示了增量查询的示例:An example of delta query is show below:

       SELECT DeviceId, GroupDeviceId, Description, 1 as _operation_
       FROM dbo.DeviceTemporal
       WHERE ValidFrom BETWEEN @deltaStartTime AND @deltaEndTime   -- records inserted
       UNION
       SELECT DeviceId, GroupDeviceId, Description, 2 as _operation_
       FROM dbo.DeviceHistory   -- table we created in step 1
       WHERE ValidTo BETWEEN @deltaStartTime AND @deltaEndTime     -- record deleted
    

    请注意,除了运行用于存储检查点的增量查询以外,流分析运行时还可以定期运行快照查询。Note that Stream Analytics runtime may periodically run the snapshot query in addition to the delta query to store checkpoints.

测试查询Test your query

请务必验证查询是否返回流分析作业将用作参考数据的所需数据集。It is important to verify that your query is returning the expected dataset that the Stream Analytics job will use as reference data. 若要测试查询,请转到门户上“作业拓扑”部分下的“输入”。To test your query, go to Input under Job Topology section on portal. 然后,可以在“SQL 数据库参考输入”中选择“示例数据”。You can then select Sample Data on your SQL Database Reference input. 示例可用后,可以下载该文件并检查返回的数据是否符合预期。After the sample becomes available, you can download the file and check to see if the data being returned is as expected. 如果希望优化开发和测试迭代,建议使用适用于 Visual Studio 的流分析工具If you want a optimize your development and test iterations, it is recommended to use the Stream Analytics tools for Visual Studio. 也可以使用你喜欢的任何其他工具,首先确保查询从 Azure SQL 数据库返回正确的结果,然后在流分析作业中使用该查询。You can also any other tool of your preference to first ensure the query is returning the right results from you Azure SQL Database and then use that in your Stream Analytics job.

常见问题FAQs

在 Azure 流分析中使用 SQL 参考数据输入是否会产生额外的费用?Will I incur additional cost by using SQL reference data input in Azure Stream Analytics?

流分析作业中不会产生额外的流单元费用There are no additional cost per streaming unit in the Stream Analytics job. 但是,流分析作业必须有一个关联的 Azure 存储帐户。However, the Stream Analytics job must have an associated Azure storage account. 流分析作业查询 SQL DB(在作业启动和刷新间隔期间)来检索参考数据集,并将该快照存储在存储帐户中。The Stream Analytics job queries the SQL DB (during job start and refresh interval) to retrieve the reference data set and stores that snapshot in the storage account. 存储这些快照会产生额外的费用,详情请参阅 Azure 存储帐户的定价页Storing these snapshots will incur additional charges detailed in the pricing page for Azure storage account.

如何知道参考数据快照是从 SQL DB 查询的并在 Azure 流分析作业中使用?How do I know reference data snapshot is being queried from SQL DB and used in the Azure Stream Analytics job?

可以使用两个按“逻辑名称”筛选的指标(在指标 Azure 门户中)来监视 SQL 数据库参考数据输入的运行状况。There are two metrics filtered by Logical Name (under Metrics Azure Portal) which you can use to monitor the health of the SQL database reference data input.

  • InputEvents:此指标度量从 SQL 数据库参考数据集载入的记录数。InputEvents: This metric measures the number of records loaded in from the SQL database reference data set.
  • InputEventBytes:此指标度量流分析作业内存中载入的参考数据快照大小。InputEventBytes: This metric measures the size of the reference data snapshot loaded in memory of the Stream Analytics job.

可以使用这两个指标的组合来推断作业是否正在查询 SQL 数据库以提取参考数据集,然后将其载入内存。The combination of both of these metrics can be used to infer if the job is querying the SQL database to fetch the reference data set and then loading it to memory.

是否需要特殊类型的 Azure SQL 数据库?Will I require a special type of Azure SQL Database?

Azure 流分析可与任何类型的 Azure SQL 数据库配合工作。Azure Stream Analytics will work with any type of Azure SQL Database. 但必须知道,针对参考数据输入设置的刷新频率可能会影响查询负载。However, it is important to understand that the refresh rate set for your reference data input could impact your query load. 若要使用增量查询选项,我们建议使用 Azure SQL 数据库中的时态表。To use the delta query option, it is recommended to use temporal tables in Azure SQL Database.

Azure 流分析为何在 Azure 存储帐户中存储快照?Why does Azure Stream Analytics store snapshots in Azure Storage account?

流分析保证刚好进行一次事件处理,以及至少进行一次事件传送。Stream Analytics guarantees exactly once event processing and at least once delivery of events. 如果暂时性的问题影响了作业,只需进行少量的重放即可还原状态。In cases where transient issues impact your job, a small amount of replay is necessary to restore state. 若要启用重放,必须将这些快照存储在 Azure 存储帐户中。To enable replay, it is required to have these snapshots stored in an Azure Storage account. 有关检查点重放的详细信息,请参阅 Azure 流分析作业中的检查点和重放概念For more information on checkpoint replay, see Checkpoint and replay concepts in Azure Stream Analytics jobs.

后续步骤Next steps