Compartilhar via

Azure Functions的Azure Database for MySQL触发器绑定

Azure Database for MySQL触发器绑定监视用户表以获取更改(插入和更新),并使用更新的行数据调用函数。

Azure Database for MySQL触发器绑定使用 az_func_updated_at 和列数据监视用户表的更改。 因此,在使用触发器支持之前,需要更改表结构以允许 MySQL 表上的change tracking。 可以通过以下查询对表启用change tracking。 例如,在 Products 表上启用它:

ALTER TABLE Products
ADD az_func_updated_at TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

租约表包含与用户表中的主键对应的所有列,以及另外三列: az_func_AttemptCountaz_func_LeaseExpirationTimeaz_func_SyncCompletedTime。 如果任一主键列具有相同的名称,则结果是列出冲突的错误消息。 在这种情况下,必须重命名列出的主键列才能使触发器正常工作。

功能概述

触发器函数启动时,它会启动两个单独的循环:更改轮询loop和租约续订loop。 这些循环会持续运行,直到函数停止。

Azure Database for MySQL触发器绑定使用轮询loop来检查更改。 轮询loop在检测到更改时触发用户函数。 概括而言,loop如以下示例所示:

while (true) {
    1. Get list of changes on table - up to a maximum number controlled by the MySql_Trigger_MaxBatchSize setting
    2. Trigger function with list of changes
    3. Wait for delay controlled by MySql_Trigger_PollingIntervalMs setting
}

更改的处理顺序是按照更改的顺序进行处理。 首先处理最早的更改。 请考虑以下关于更改处理的要点:

  • 如果一次在多行中发生更改,则发送到函数的确切顺序取决于列和主键列的 az_func_updated_at 升序。
  • 对行的更改进行批处理。 如果loop每次迭代之间发生多个更改,则只考虑该行存在的最新更改项。

注释

目前,Azure Functions和Azure Database for MySQL之间的连接不支持托管标识。

示例用法

GitHub 存储库中提供了Azure Database for MySQL触发器的更多示例。

这些示例引用 Product 类和相应的数据库表:

namespace AzureMySqlSamples.Common
{
    public class Product
    {
        public int? ProductId { get; set; }

        public string Name { get; set; }

        public int Cost { get; set; }

        public override bool Equals(object obj)
        {
            if (obj is Product)
            {
                var that = obj as Product;
                return this.ProductId == that.ProductId && this.Name == that.Name && this.Cost == that.Cost;
            }
            return false;
        }
    }
DROP TABLE IF EXISTS Products;

CREATE TABLE Products (
  ProductId int PRIMARY KEY,
  Name varchar(100) NULL,
  Cost int NULL
);

通过向表添加一列,对数据库启用change tracking:

ALTER TABLE <table name>  
ADD COLUMN az_func_updated_at TIMESTAMP 
DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

Azure Database for MySQL触发器绑定到 IReadOnlyList<MySqlChange<T>>,其中列出了 MySqlChange 对象。 每个对象都有两个属性:

  • Item:已更改的项。 项的类型应遵循表架构,如类中 ToDoItem 所示。
  • Operation:枚举中的 MySqlChangeOperation 值。 可能的值为 Update 插入和更新。

以下示例演示在表中发生更改时调用的 Product

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.MySql;
using Microsoft.Extensions.Logging;
using AzureMySqlSamples.Common;

namespace AzureMySqlSamples.TriggerBindingSamples
{
        private static readonly Action<ILogger, string, Exception> _loggerMessage = LoggerMessage.Define<string>(LogLevel.Information, eventId: new EventId(0, "INFO"), formatString: "{Message}");

        [Function(nameof(ProductsTrigger))]
        public static void Run(
            [MySqlTrigger("Products", "MySqlConnectionString")]
            IReadOnlyList<MySqlChange<Product>> changes, FunctionContext context)
        {
            ILogger logger = context.GetLogger("ProductsTrigger");
            // The output is used to inspect the trigger binding parameter in test methods.
            foreach (MySqlChange<Product> change in changes)
            {
                Product product = change.Item;
                _loggerMessage(logger, $"Change operation: {change.Operation}", null);
                _loggerMessage(logger, $"Product Id: {product.ProductId}, Name: {product.Name}, Cost: {product.Cost}", null);
            }
        }
}

示例用法

GitHub 存储库中提供了Azure Database for MySQL触发器的更多示例。

该示例引用类 ProductMySqlChangeProduct 类、 MySqlChangeOperation 枚举和相应的数据库表。

在名为Product.java的单独文件中:

package com.function.Common;

import com.fasterxml.jackson.annotation.JsonProperty;

public class Product {
    @JsonProperty("ProductId")
    private int ProductId;
    @JsonProperty("Name")
    private String Name;
    @JsonProperty("Cost")
    private int Cost;

    public Product() {
    }

    public Product(int productId, String name, int cost) {
        ProductId = productId;
        Name = name;
        Cost = cost;
    }
}

在名为MySqlChangeProduct.java的单独文件中:

package com.function.Common;

public class MySqlChangeProduct {
    private MySqlChangeOperation Operation;
    private Product Item;

    public MySqlChangeProduct() {
    }

    public MySqlChangeProduct(MySqlChangeOperation operation, Product item) {
        this.Operation = operation;
        this.Item = item;
    }
}

在名为MySqlChangeOperation.java的单独文件中:

package com.function.Common;

import com.google.gson.annotations.SerializedName;

public enum MySqlChangeOperation {
    @SerializedName("0")
    Update
}
DROP TABLE IF EXISTS Products;

CREATE TABLE Products (
  ProductId int PRIMARY KEY,
  Name varchar(100) NULL,
  Cost int NULL
);

通过将以下列添加到表中,对数据库启用change tracking:

ALTER TABLE <table name>  
ADD COLUMN az_func_updated_at TIMESTAMP 
DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

Azure Database for MySQL触发器绑定到 MySqlChangeProduct[],它是 MySqlChangeProduct 对象的数组。 每个对象都有两个属性:

  • item:已更改的项。 项的类型应遵循表架构,如类中 Product 所示。
  • operation:枚举中的 MySqlChangeOperation 值。 可能的值为 Update 插入和更新。

以下示例演示在表中发生更改时调用的 Product Java 函数:

/**
 * Copyright (c) Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See License.txt in the project root for
 * license information.
 */

package com.function;

import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.mysql.annotation.MySqlTrigger;
import com.function.Common.MySqlChangeProduct;
import com.google.gson.Gson;

import java.util.logging.Level;

public class ProductsTrigger {
    @FunctionName("ProductsTrigger")
    public void run(
            @MySqlTrigger(
                name = "changes",
                tableName = "Products",
                connectionStringSetting = "MySqlConnectionString")
                MySqlChangeProduct[] changes,
            ExecutionContext context) {

        context.getLogger().log(Level.INFO, "MySql Changes: " + new Gson().toJson(changes));
    }
}

示例用法

GitHub 存储库中提供了Azure Database for MySQL触发器的更多示例。

该示例引用 Product 数据库表:

DROP TABLE IF EXISTS Products;

CREATE TABLE Products (
  ProductId int PRIMARY KEY,
  Name varchar(100) NULL,
  Cost int NULL
);

通过向表添加一列,对数据库启用change tracking:

ALTER TABLE <table name>  
ADD COLUMN az_func_updated_at TIMESTAMP 
DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

Azure Database for MySQL触发器绑定到列出对象的 Product。 每个对象都有两个属性:

  • item:已更改的项。 项的结构遵循表架构。
  • operation:可能的值为 Update 插入和更新。

以下示例演示在表中发生 Product 更改时调用的 PowerShell 函数。

以下示例是 function.json 文件中的绑定数据:

{
    "bindings": [
      {
        "name": "changes",
        "type": "mysqlTrigger",
        "direction": "in",
        "tableName": "Products",
        "connectionStringSetting": "MySqlConnectionString"
      }
    ],
    "disabled": false
  }

“配置”部分介绍了这些属性。

以下示例是 run.ps1 文件中函数的示例 PowerShell 代码:

using namespace System.Net

param($changes)
# The output is used to inspect the trigger binding parameter in test methods.
# Use -Compress to remove new lines and spaces for testing purposes.
$changesJson = $changes | ConvertTo-Json -Compress
Write-Host "MySql Changes: $changesJson"

示例用法

GitHub 存储库中提供了Azure Database for MySQL触发器的更多示例。

该示例引用 Product 数据库表:

DROP TABLE IF EXISTS Products;

CREATE TABLE Products (
  ProductId int PRIMARY KEY,
  Name varchar(100) NULL,
  Cost int NULL
);

通过向表添加一列,对数据库启用change tracking:

ALTER TABLE <table name>  
ADD COLUMN az_func_updated_at TIMESTAMP 
DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

Azure Database for MySQL触发器绑定到Changes,这是对象的数组。 每个对象都有两个属性:

  • item:已更改的项。 项的结构遵循表架构。
  • operation:可能的值为 Update 插入和更新。

以下示例演示在表中发生更改时调用的 Product JavaScript 函数。

以下示例是 function.json 文件中的绑定数据:

{
    "bindings": [
      {
        "name": "changes",
        "type": "mysqlTrigger",
        "direction": "in",
        "tableName": "Products",
        "connectionStringSetting": "MySqlConnectionString",
      }
    ],
    "disabled": false
  }

“配置”部分介绍了这些属性。

以下示例是文件中函数 index.js 的示例 JavaScript 代码:

module.exports = async function (context, changes) {
    context.log(`MySql Changes: ${JSON.stringify(changes)}`)
}

示例用法

GitHub 存储库中提供了Azure Database for MySQL触发器的更多示例。

该示例引用 Product 数据库表:

DROP TABLE IF EXISTS Products;

CREATE TABLE Products (
  ProductId int PRIMARY KEY,
  Name varchar(100) NULL,
  Cost int NULL
);

通过向表添加一列,对数据库启用change tracking:

ALTER TABLE <table name>  
ADD COLUMN az_func_updated_at TIMESTAMP 
DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

注释

必须使用 Azure Functions 版本 1.22.0b4 for Python。

Azure Database for MySQL触发器绑定到名为 Product 的变量,其中列出了对象。 每个对象都有两个属性:

  • item:已更改的项。 项的结构遵循表架构。
  • operation:可能的值为 Update 插入和更新。

以下示例演示在表中发生更改时调用的 Product Python 函数。

以下示例是function_app.py文件的示例 Python 代码:

import json
import logging
import azure.functions as func

app = func.FunctionApp()

# The function is triggered when a change (insert, update)
# is made to the Products table.
@app.function_name(name="ProductsTrigger")
@app.mysql_trigger(arg_name="products",
table_name="Products",
connection_string_setting="MySqlConnectionString")

def products_trigger(products: str) -> None:
logging.info("MySQL Changes: %s", json.loads(products))

特性

Attribute 属性 DESCRIPTION
TableName 必填。 触发器监视的表的名称。
ConnectionStringSetting 必填。 包含数据库connection string的应用设置的名称,该数据库包含受监视的更改表。 connection string设置的名称对应于包含要Azure Database for MySQL的 connection string 的应用程序设置(local.settings.json)。
LeasesTableName 可选。 用于存储租约的表的名称。 如果未指定,则名称为 Leases_{FunctionId}_{TableId}.

批注

Java 函数运行时库中,对值来自Azure Database for MySQL的参数使用 @MySQLTrigger 注释。 此批注支持以下元素:

元素 DESCRIPTION
name 必填。 触发器绑定到的参数的名称。
tableName 必填。 触发器监视的表的名称。
connectionStringSetting 必填。 包含数据库connection string的应用设置的名称,该数据库包含受监视的更改表。 connection string设置的名称对应于包含要Azure Database for MySQL的 connection string 的应用程序设置(local.settings.json)。
LeasesTableName 可选。 用于存储租约的表的名称。 如果未指定,则名称为 Leases_{FunctionId}_{TableId}.

配置

下表介绍了在 function.json 文件中设置的绑定配置属性:

资产 DESCRIPTION
name 必填。 触发器绑定到的参数的名称。
type 必填。 必须设置为 MysqlTrigger
direction 必填。 必须设置为 in
tableName 必填。 触发器监视的表的名称。
connectionStringSetting 必填。 包含数据库connection string的应用设置的名称,该数据库包含受监视的更改表。 connection string设置的名称对应于包含要Azure Database for MySQL的 connection string 的应用程序设置(local.settings.json)。
LeasesTableName 可选。 用于存储租约的表的名称。 如果未指定,则名称为 Leases_{FunctionId}_{TableId}.

可选配置

可以为本地开发或云部署的Azure Database for MySQL触发器配置以下可选设置。

host.json

本部分介绍版本 2.x 及更高版本中此绑定可用的配置设置。 host.json 文件中的设置将应用于函数应用实例中的所有函数。 有关函数app configuration设置的详细信息,请参阅 host.json 参考Azure Functions

设置 违约 DESCRIPTION
MaxBatchSize 100 在将更改发送到触发函数之前,每个触发器迭代loop处理的最大更改数。
PollingIntervalMs 1000 处理每批更改之间的延迟(以毫秒为单位)。 (1,000 毫秒为 1 秒)。
MaxChangesPerWorker 1000 每个应用程序辅助角色允许的用户表中挂起的更改数上限。 如果更改计数超过此限制,则可能会导致横向扩展。此设置仅适用于启用了 runtime 驱动的缩放的Azure函数应用。

示例 host.json 文件

下面是包含可选设置host.json文件的示例:

{
  "version": "2.0",
  "extensions": {
      "MySql": {
        "MaxBatchSize": 300,
        "PollingIntervalMs": 1000,
        "MaxChangesPerWorker": 100
      }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    },
    "logLevel": {
      "default": "Trace"
    }
  }
}

local.setting.json

local.settings.json 文件存储本地开发工具使用的应用设置和设置。 仅当在本地运行project时,才会使用 local.settings.json 文件中的设置。 将project发布到Azure时,请务必将任何必需的设置添加到函数应用的应用设置中。

重要

由于 local.settings.json 文件可能包含机密(如连接字符串),因此不应将其存储在远程存储库中。 支持Azure Functions的工具提供了将 local.settings.json 文件中的设置与部署project的函数应用中的 app 设置同步的方法。

设置 违约 DESCRIPTION
MySql_Trigger_BatchSize 100 在将更改发送到触发函数之前,每个触发器迭代loop处理的最大更改数。
MySql_Trigger_PollingIntervalMs 1000 处理每批更改之间的延迟(以毫秒为单位)。 (1,000 毫秒为 1 秒)。
MySql_Trigger_MaxChangesPerWorker 1000 每个应用程序辅助角色允许的用户表中挂起的更改数上限。 如果更改计数超过此限制,则可能会导致横向扩展。此设置仅适用于启用了 runtime 驱动的缩放的Azure函数应用。

示例 local.settings.json 文件

下面是包含可选设置local.settings.json文件的示例:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "MySqlConnectionString": "",
    "MySql_Trigger_MaxBatchSize": 300,
    "MySql_Trigger_PollingIntervalMs": 1000,
    "MySql_Trigger_MaxChangesPerWorker": 100
  }
}

设置change tracking(必需)

设置用于Azure Database for MySQL触发器的change tracking要求使用函数在表中添加列。 可以从支持运行查询的任何 MySQL 工具完成这些步骤,包括 Visual Studio Code

Azure Database for MySQL触发器绑定使用 az_func_updated_at 和列数据监视用户表的更改。 因此,在使用触发器支持之前,需要更改表结构以允许 MySQL 表上的change tracking。 可以通过以下查询对表启用change tracking。 例如,在 Products 表上启用它:

ALTER TABLE Products;
ADD az_func_updated_at 
TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
ON UPDATE CURRENT_TIMESTAMP;

租约表包含与用户表的主键对应的所有列,以及另外两列: az_func_AttemptCountaz_func_LeaseExpirationTime 如果任一主键列具有相同的名称,则结果是列出冲突的错误消息。 在这种情况下,必须重命名列出的主键列才能使触发器正常工作。

启用运行时驱动的缩放

根据需要,函数可以根据用户表中待处理的更改数自动缩放。 若要允许函数在使用Azure Database for MySQL触发器时正确缩放高级计划,需要启用运行时缩放监视。

  1. 在 Azure portal 中,在函数应用中,选择 Configuration

  2. “函数运行时设置 ”选项卡上,对于 运行时缩放监视,请选择“ 打开”。

     Azure portal 区域的Screenshot,用于启用运行时缩放.

重试支持

启动重试

如果在启动期间发生异常,主机运行时会自动尝试使用指数退避策略重启触发器侦听器。 这样的重启会一直进行,直到侦听器成功启动或者启动被取消。

函数异常重试

如果在更改处理期间用户函数中出现异常,则当前正在处理的行批会在 60 秒内重试。 在此期间,其他更改将按正常方式进行处理,但导致异常的批处理中的行将被忽略,直到超时期限过。

如果某个特定行的函数执行连续五次失败,则所有将来更改都会忽略该行。 由于批处理中的行不是确定性的,因此失败批处理中的行可能会在后续调用中以不同的批结尾。 此行为意味着不一定忽略失败批处理中的所有行。 如果批处理中的其他行导致异常,“良好”行最终可能会出现在将来调用中不会失败的其他批处理中。