教程:从 Azure 流分析作业运行 Azure Functions

可以通过将 Functions 配置为流分析作业的接收器(输出)之一,从 Azure 流分析运行 Azure Functions。 Functions 是事件驱动的按需计算体验,它允许实现由 Azure 或第三方服务中出现的事件所触发的代码。 Functions 响应触发的这一功能使其成为流分析作业的自然输出。

流分析通过 HTTP 触发调用 Functions。 通过 Functions 输出适配器,用户可以将 Functions 连接到流分析,以便基于流分析查询触发事件。

注意

不支持从多租户群集中运行的流分析作业连接到虚拟网络 (VNet) 内的 Azure Functions。

在本教程中,你将了解如何执行以下操作:

  • 创建和运行流分析作业
  • 创建用于 Redis 的 Azure 缓存实例
  • 创建 Azure 函数
  • 在用于 Redis 的 Azure 缓存中检查结果

如果没有 Azure 订阅,可在开始前创建一个试用帐户

创建流分析作业以运行函数

本部分演示了如何配置流分析作业来运行一个函数以将数据写入到用于 Redis 的 Azure 缓存。 流分析作业从 Azure 事件中心读取事件,并运行调用函数的查询。 此函数从流分析作业读取数据,并将其写入到用于 Redis 的 Azure 缓存中。

Diagram showing relationships among the Azure services

先决条件

在开始之前,请确保已完成以下步骤:

  • 如果没有 Azure 订阅,请创建试用版
  • 请从 Microsoft 下载中心下载电话呼叫事件生成器应用 TelcoGenerator.zip,或者从 GitHub 获取源代码。

登录 Azure

登录到 Azure 门户

创建事件中心

需要先将一些示例数据发送到事件中心,然后流分析才能分析欺诈性呼叫数据流。 在本教程中,你将使用 Azure 事件中心将数据发送到 Azure。

请按以下步骤创建一个事件中心,然后向该事件中心发送调用数据:

  1. 登录 Azure 门户

  2. 选择左侧菜单中的“所有服务”,选择“物联网”,将鼠标悬停在“事件中心”上,然后选择“+ (添加)”按钮。

    Screenshot showing the Event Hubs creation page.

  3. 在“创建命名空间”页上执行以下步骤:

    1. 选择要在其中创建事件中心的 Azure 订阅。

    2. 对于“资源组”,请选择“新建”,然后输入资源组的名称。 事件中心命名空间将在此资源组中创建。

    3. 对于“命名空间名称”,请输入事件中心命名空间的唯一名称。

    4. 对于“位置”,请选择要在其中创建命名空间的区域。

    5. 对于“定价层”,请选择“标准”。

    6. 在页面底部选择“查看 + 创建”。

      Screenshot showing the Create Namespace page.

    7. 在命名空间创建向导的“查看 + 创建”页上,在查看所有设置后,选择页面底部的“创建”。

  4. 成功部署命名空间后,选择“转到资源”以导航到“事件中心命名空间”页。

  5. 在“事件中心命名空间”页上,选择命令栏中的“+事件中心”。

    Screenshot showing the Add event hub button on the Event Hubs Namespace page.

  6. 在“创建事件中心”页上,为事件中心输入一个名称。 将分区计数设置为 2。 对其余设置使用默认选项,然后选择“查看 + 创建”。

    Screenshot showing the Create event hub page.

  7. 在“查看 + 创建”页上,选择页面底部的“创建”。 然后,等待部署成功完成。

授予对事件中心的访问权限,并获取连接字符串

在应用程序可以将数据发送到 Azure 事件中心之前,事件中心必须具有允许访问的策略。 访问策略生成包含授权信息的连接字符串。

  1. 在“事件中心命名空间”页中的左侧菜单上选择“共享访问策略”。

  2. 从策略列表中选择 RootManageSharedAccessKey。

  3. 然后选择“连接字符串 - 主密钥”旁边的复制按钮。

  4. 将连接字符串粘贴到文本编辑器中。 需要在下一部分使用此连接字符串。

    连接字符串如下所示:

    Endpoint=sb://<Your event hub namespace>.servicebus.chinacloudapi.cn/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    请注意,连接字符串包含多个用分号隔开的键值对:Endpoint、SharedAccessKeyName 和 SharedAccessKey。

启动事件生成器应用程序

在启动 TelcoGenerator 应用之前,应该对其进行配置,以便将数据发送到此前创建的 Azure 事件中心。

  1. 提取 TelcoGenerator.zip 文件的内容。

  2. 在所选文本编辑器中打开 TelcoGenerator\TelcoGenerator\telcodatagen.exe.config 文件。有多个 .config 文件,因此请确保打开正确的文件。

  3. 使用以下详细信息更新配置文件中的 <appSettings> 元素:

    • 将 EventHubName 键的值设置为连接字符串末尾的 EntityPath 的值。
    • 将 Microsoft.ServiceBus.ConnectionString 键的值设置为在末尾没有 EntityPath 值 (;EntityPath=myeventhub) 的连接字符串。 别忘了删除 EntityPath 值前面的分号。
  4. 保存文件。

  5. 接下来打开命令窗口,转到解压缩 TelcoGenerator 应用程序的文件夹。 然后输入以下命令:

    .\telcodatagen.exe 1000 0.2 2
    

    此命令采用以下参数:

    • 每小时的呼叫数据记录数。
    • 欺诈概率 (%),即应用模拟欺诈呼叫的频率。 值 0.2 表示大约有 20% 的通话记录似乎具有欺诈性。
    • 持续时间(小时),即应用应运行的小时数。 还可以通过在命令行终止此过程 (Ctrl+C) 来随时停止该应用。

    几秒钟后,当应用将电话通话记录发送到事件中心时,应用将开始在屏幕上显示通话记录。 这些电话呼叫数据包含以下字段:

    记录 定义
    CallrecTime 呼叫开始时间的时间戳。
    SwitchNum 用于连接呼叫的电话交换机。 在此示例中,交换机是表示来源国家/地区(美国、中国、英国、德国或澳大利亚)的字符串。
    CallingNum 呼叫方的电话号码。
    CallingIMSI 国际移动用户标识 (IMSI)。 它是呼叫方的唯一标识符。
    CalledNum 呼叫接收人的电话号码。
    CalledIMSI 国际移动用户标识 (IMSI)。 它是呼叫接收人的唯一标识符。

创建流分析作业

有了呼叫事件流以后,即可创建流分析作业,以便从事件中心读取数据。

  1. 若要创建流分析作业,请导航到 Azure 门户
  2. 选择“创建资源”并搜索“流分析作业” 。 选择“流分析作业”磁贴并选择“创建” 。
  3. 在“新建流分析作业”页上,执行以下步骤:
    1. 对于“订阅”,请选择包含该事件中心命名空间的订阅。

    2. 对于“资源组”,请选择先前创建的资源组。

    3. 在“实例详细信息”部分,对于“名称”,请输入流分析作业的唯一名称。

    4. 对于“区域”,请选择要在其中创建流分析作业的区域。 我们建议将作业和事件中心放在同一区域以获得最佳性能,这样还无需为不同区域之间的数据传输付费。

    5. 对于“托管环境 <”,请选择“云”(如果尚未选择)。 流分析作业可以部署到云或边缘设备。 你可以通过云部署到 Azure 云,利用 Edge 部署到 IoT Edge 设备。

    6. 对于“流单元”,请选择“1”。 流单元表示执行作业所需的计算资源。 默认情况下,此值设置为 1。 若要了解如何缩放流单元,请参阅了解和调整流单元一文。

    7. 在页面底部选择“查看 + 创建”。

      Screenshot that shows the Create Azure Stream Analytics job page.

  4. 在“查看 + 创建”页上查看设置,然后选择“创建”以创建流分析作业。
  5. 部署作业后,选择“转到资源”,以导航到“流分析作业”页。

配置作业输入

下一步是使用在上一部分创建的事件中心,为用于读取数据的作业定义输入源。

  1. 在“流分析作业”页上,在左侧菜单中的“作业拓扑”部分选择“输入”。

  2. 在“输入”页上,选择“+ 添加输入”和“事件中心”。

    Screenshot showing the Input page for a Stream Analytics job.

  3. 在“事件中心”页上执行以下步骤:

    1. 对于“输入别名”,请输入 CallStream。 输入别名是用于标识输入的易记名称。 输入别名只能包含字母数字字符、连字符和下划线,而且长度必须介于 3 到 63 个字符之间。

    2. 对于“订阅”,请选择在其中创建了事件中心的 Azure 订阅。 事件中心可以位于流分析作业所在的订阅中,也可以位于另一订阅中。

    3. 对于“事件中心命名空间”,请选择在上一部分创建的事件中心命名空间。 当前订阅中可用的所有命名空间均列在下拉列表中。

    4. 对于“事件中心名称”,请选择在上一部分创建的事件中心。 所选命名空间中可用的所有事件中心均列在下拉列表中。

    5. 对于“事件中心使用者组”,请保持选中“新建”选项,以便在事件中心上创建新的使用者组。 建议对每个流分析作业使用不同的使用者组。 如果未指定任何使用者组,流分析作业将使用 $Default 使用者组。 如果作业包含自联接或具有多个输入,则稍后的某些输入可能会由多个读取器读取。 这种情况会影响单个使用者组中的读取器数量。

    6. 对于“身份验证模式”,请选择“连接字符串”。 使用此选项可以更轻松地测试本教程。

    7. 对于“事件中心策略名称”,请选择“使用现有”,然后选择先前创建的策略。

    8. 选择页面底部的“保存” 。

      Screenshot showing the Event Hubs configuration page for an input.

创建用于 Redis 的 Azure 缓存实例

  1. 使用创建缓存中所述的步骤,在用于 Redis 的 Azure 缓存中创建缓存。

  2. 创建缓存后,在“设置” 下方选择“访问密钥” 。 记下主要连接字符串 。

    Screenshot showing the selection of the Access Key menu item.

在 Azure Functions 中创建可将数据写入到用于 Redis 的 Azure 缓存的函数

  1. 请参阅 Functions 文档的创建函数应用一节。 此示例基于:

  2. 按照本教程的说明,在 Visual Studio Code 中创建默认 HttpTrigger 函数应用。 将使用以下信息:语言:C#,运行时:.NET 6(在函数 v4 下),模板:HTTP trigger

  3. 通过在位于项目文件夹的终端中运行以下命令来安装 Redis 客户端库:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. local.settings.jsonValues 分区中添加 RedisConnectionStringRedisDatabaseIndex 项,并填写目标服务器的连接字符串:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    Redis 数据库索引是用于标识实例上的数据库的 0 到 15 之间的数字。

  5. 将整个函数(项目中的 .cs 文件)替换为以下代码片段。 自行更新命名空间、类名和函数名称:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    当流分析从函数收到“HTTP 请求实体过大”异常时,将减小发送到 Azure Functions 的批次的大小。 下面的代码确保流分析不会发送过大的批。 确保函数中使用的最大批次数和最大批次大小值与在流分析门户中输入的值一致。

  6. 现在可以将函数发布到 Azure。

  7. 在 Azure 门户中打开函数,并设置 RedisConnectionStringRedisDatabaseIndex应用程序设置

更新流分析作业,以函数作为输出

  1. 在 Azure 门户中打开流分析作业。

  2. 浏览到你的函数,并选择“概述” >“输出” >“添加” 。 若要添加新的输出,请选择“Azure 函数” 接收器选项。 Functions 输出适配器具有以下属性:

    属性名称 说明
    输出别名 在作业查询中输入指代输出的用户友好名称。
    导入选项 可使用当前订阅中的函数;如果函数位于其他订阅中,也可手动提供设置。
    Function App Functions 应用的名称
    函数 Functions 应用中函数的名称(run.csx 函数的名称)。
    最大批大小 设置发送到函数的每个输出批的最大大小(以字节为单位)。 默认情况下,此值设置为 262,144 字节 (256 KB)。
    最大批数 指定发送给函数的每个批次中的最大事件数。 默认值为 100。 此属性是可选的。
    密钥 可以使用其他订阅中的函数。 提供用于访问你的函数的键值。 此属性是可选的。
  3. 命名输出别名。 在本教程中,此别名为 saop1,但你可以使用所选的任何名称。 填写其他详细信息。

  4. 打开流分析作业,将查询更新为以下内容。

    重要

    如果未将输出接收器命名为 saop1,请记住在查询中更改它。

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. 在命令行中运行以下命令,启动 telcodatagen.exe 应用程序。 该命令使用格式 telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]

    telcodatagen.exe 1000 0.2 2
    
  6. 启动流分析作业。

在用于 Redis 的 Azure 缓存中检查结果

  1. 浏览到 Azure 门户,并查找你的用于 Redis 的 Azure 缓存。 选择“控制台” 。

  2. 使用用于 Redis 的 Azure 缓存命令验证你的数据是否在用于 Redis 的 Azure 缓存中。 (该命令采用 Get {key} 格式。)使用从 Azure 函数的监视器日志中复制的键(在上一部分中)。

    Get "<在上一部分复制的键>"

    此命令应会打印指定键的值:

    Screenshot showing the Redis Cache console showing the output of the Get command.

错误处理和重试

如果在将事件发送到 Azure Functions 时失败,流分析会重试大多数操作。 将会重试所有 http 异常,直至成功,但 http 错误 413(实体太大)除外。 实体太大错误被视为数据错误,遵循重试或删除策略

注意

从流分析到 Azure Functions 的 HTTP 请求的超时设置为 100 秒。 如果 Azure Functions 应用处理批处理所用的时间超过 100 秒,则流分析会出错并重试批处理。

重试超时可能会导致向输出接收器写入重复事件。 流分析重试失败的批处理时,会重试批处理中的所有事件。 例如,假设从流分析发送一批事件(20 个)到 Azure Functions。 假定 Azure Functions 需要 100 秒来处理该批次中的前 10 个事件。 100 秒后,流分析会暂停该请求,因为它尚未收到来自 Azure Functions 的肯定响应,并且已针对同一个批发送了另一个请求。 Azure Functions 会再次处理该批次中的前 10 个事件,这会导致重复。

已知问题

在 Azure 门户中,尝试将最大批次大小/最大批次数值重置为空(默认),值将在保存时改回上次输入的值。 这时,请手动输入这些字段的默认值。

流分析当前不支持在 Azure Functions 上使用 HTTP 路由

不支持连接到虚拟网络中托管的 Azure Functions。

清理资源

若不再需要资源组、流式处理作业以及所有相关资源,请将其删除。 删除作业可避免对作业使用的流单元进行计费。 如果计划在将来使用该作业,可以先停止它,等到以后需要时再重启它。 如果不打算继续使用该作业,请按照以下步骤删除本快速入门创建的所有资源:

  1. 在 Azure 门户的左侧菜单中选择“资源组”,然后选择已创建资源的名称。
  2. 在资源组页上选择“删除”,在文本框中键入要删除的资源的名称,然后选择“删除”。

后续步骤

在本教程中,你已创建一个简单的运行 Azure 函数的流分析作业。 若要详细了解流分析作业,请继续阅读下一教程: