使用 Azure Functions 更新或合并 Azure SQL 数据库中的记录

目前,Azure 流分析 (ASA) 仅支持将行插入(附加)到 SQL 输出(Azure SQL 数据库)。 本文讨论了在以 Azure Functions 作为中间层的 SQL 数据库上启用 UPDATE、Upsert 或 MERGE 的变通方法。

结尾介绍了 Azure Functions 替代选项。

要求

在表中写入数据通常可以通过以下方式完成:

模型 等效的 T-SQL 语句 要求
附加 INSERT
替换 MERGE (UPSERT) 唯一键
Accumulate MERGE (UPSERT),带有复合赋值运算符 (+=, -=...) 唯一密钥和累加器

为了说明这些差异,请看看在引入以下两种记录时会发生什么情况:

Arrival_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

在“追加”模式下,我们插入两条记录。 等效的 T-SQL 语句为:

INSERT INTO [target] VALUES (...);

结果:

Modified_Time Device_Id Measure_Value
10:00 A 1
10:05 A 20

在“替换”模式下,我们仅根据键获取最后一个值。 此处我们使用 Device_Id 作为密钥。等效的 T-SQL 语句为:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

结果:

Modified_Time Device_Key Measure_Value
10:05 A 20

最后,在“累加”模式下,我们使用复合赋值运算符(+=)来对 Value 求和。 此处,我们会使用 Device_Id 作为键:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

结果:

Modified_Time Device_Key Measure_Value
10:05 A 21

出于性能考虑,ASA SQL 数据库输出适配器目前仅本机支持追加模式。 这些适配器使用批量插入来最大化吞吐量并限制反压力。

本文演示如何使用 Azure Functions 来实现 ASA 的替换和累加模式。 使用函数作为中介层时,潜在的写性能不会影响流任务。 因此,使用 Azure Functions 与 Azure SQL 搭配使用效果最佳。 使用 Synapse SQL,从批量语句切换到逐行语句可能会产生更大的性能问题。

Azure Functions 输出

在我们的工作中,我们会将 ASA SQL 输出替换为 ASA Azure Functions 输出。 会在函数中实现 UPDATE、UPSERT 或 MERGE 功能。

当前有两种方法可以通过函数来访问 SQL 数据库。 第一种方法是 Azure SQL 输出绑定。 这种方法目前仅限于 C#,并且仅提供替换模式。 第二种方法是编写一个 SQL 查询,以通过适当的 SQL 驱动程序 (Microsoft.Data.SqlClient for .NET) 提交。

对于以下两个示例,我们会假设采用下表架构。 绑定选项要求在目标表上设置一个主键。 在使用 SQL 驱动程序时,不强制要求执行此操作,但建议执行。

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

函数必须满足以下要求才能用作 ASA 的输出

  • 对于已成功处理的批,Azure 流分析将从函数应用中收到 HTTP 状态 200
  • 当 Azure 流分析从 Azure 函数中收到 413(“http 请求实体过大”)异常时,它将减小发送到 Azure Functions 的批的大小
  • 在测试连接期间,流分析向 Azure Functions 发送带有空白批处理的 POST 请求,并期望返回 HTTP 状态 20x 以验证测试

选项 1:使用 Azure Functions SQL 绑定来更新键

此选项使用 Azure Functions SQL 输出绑定。 此扩展可以替换表中的对象,而无需编写 SQL 语句。 目前不支持复合赋值运算符(累加)。

此示例基于:

为了更好地理解绑定方法,建议遵循本教程

首先按照本教程的说明,创建一个默认 HttpTrigger 函数应用。 使用了以下信息:

  • 语言:C#
  • 运行时:.NET 6(在 function/runtime v4 下面)
  • 模板:HTTP trigger

通过在位于项目文件夹的终端中运行以下命令来安装绑定扩展:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

local.settings.jsonValues 部分中添加 SqlConnectionString 项,并填写目标服务器的连接字符串:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

将整个函数(项目中的 .cs 文件)替换为以下代码片段。 自行更新命名空间、类名和函数名称:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

更新绑定部分中的目标表名称:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

更新 Device 类和映射部分,以匹配自己的架构:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

现在,你可以通过调试(在 VS 代码中为 F5)来测试本地函数和数据库之间的连接。 需要能够从你的计算机上访问 SQL 数据库。 SSMS 可用于检查连接性。 然后,可以使用 Postman 等工具向本地终结点发出 POST 请求。 正文为空的请求应返回 http 204。 具有实际有效负载的请求应该在目标表中持久化(在替换/更新模式下)。 下面是一个与此示例中使用的模式相对应的示例有效负载:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

现在可以将函数发布到 Azure。 应为 SqlConnectionString 设置应用程序设置。 Azure SQL Server 防火墙应该允许 Azure 服务通过,这样实时函数才能到达。

然后可以将该函数定义为 ASA 作业中的输出,并用于替换记录而不是插入记录。

选项 2:通过自定义 SQL 查询与复合赋值合并(累加)

注意

在重新启动和恢复后,ASA 可能会重新发送已经发出的输出事件。 这是一种预期的行为,可能会导致累加逻辑失败(将个别值翻倍)。 为了防止这种情况,建议通过本机 ASA SQL 输出在表中输出相同的数据。 然后,可以使用这个控制表来检测问题,并在必要时重新同步累加。

此选项使用 Microsoft.Data.SqlClient。 通过这个库,我们可以向 SQL 数据库发出任何 SQL 查询。

此示例基于:

首先按照本教程的说明,创建一个默认 HttpTrigger 函数应用。 使用了以下信息:

  • 语言:C#
  • 运行时:.NET 6(在 function/runtime v4 下面)
  • 模板:HTTP trigger

通过在位于项目文件夹的终端中运行以下命令来安装 SqlClient 库:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

local.settings.jsonValues 部分中添加 SqlConnectionString 项,并填写目标服务器的连接字符串:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

将整个函数(项目中的 .cs 文件)替换为以下代码片段。 自行更新命名空间、类名和函数名称:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

更新 sqltext 命令生成部分以匹配你自己的架构(请注意如何在更新时通过 += 运算符实现累加):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

现在,你可以通过调试(在 VS 代码中为 F5)来测试本地函数和数据库之间的连接。 需要能够从你的计算机上访问 SQL 数据库。 SSMS 可用于检查连接性。 然后,可以使用 Postman 等工具向本地终结点发出 POST 请求。 正文为空的请求应返回 http 204。 具有实际有效负载的请求应该在目标表中持久化(在累加/合并模式下)。 下面是一个与此示例中使用的模式相对应的示例有效负载:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

现在可以将函数发布到 Azure。 应为 SqlConnectionString 设置应用程序设置。 Azure SQL Server 防火墙应该允许 Azure 服务通过,这样实时函数才能到达。

然后可以将该函数定义为 ASA 作业中的输出,并用于替换记录而不是插入记录。

备选方法

除了 Azure Functions,还有多种方法可以实现预期结果。 本部分提供了其中一些内容。

目标 SQL 数据库中的后处理

一旦通过标准的 ASA SQL 输出将数据插入到数据库中,后台任务就会开始工作。

对于 Azure SQL,可以使用 INSTEAD OFDML 触发器来拦截 ASA 发出的 INSERT 命令:

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

在 Azure Cosmos DB 中预处理

Azure Cosmos DB 本机支持 UPSERT。 此处只能追加/替换。 在 Azure Cosmos DB,累加必须是托管客户端。

如果符合要求,则可以将目标 SQL 数据库 替换为 Azure Cosmos DB 实例。 这样做需要对整体解决方案体系结构进行重大更改。

对于 Synapse SQL,Azure Cosmos DB 可以通过 Azure Synapse Link for Azure Cosmos DB 来用作中间层。 Azure Synapse Link 可用于创建分析存储。 然后可以直接在 Synapse SQL 中查询该数据存储。

备选项的比较

每种方法都提供不同的价值主张和功能:

类型 选项 模式 Azure SQL Database Azure Synapse Analytics
后处理
触发器 替换、累加 + 不适用,触发器在 Synapse SQL 中不可用
过渡 替换、累加 + +
预处理
Azure Functions 替换、累加 + -(逐行模式下的性能)
Azure Cosmos DB 替换 替换 空值 空值
Azure Cosmos DB Azure Synapse Link Replace 空值 +

获取支持

如需获取进一步的帮助,可前往 Azure 流分析的 Microsoft 问答页面

后续步骤