Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
在本教程中,你将创建一个 Azure 流分析作业,以从 Azure 事件中心读取事件,对事件数据运行查询,然后调用一个 Azure 函数来写入 Azure Cache for Redis 实例。
注意
- 可以通过将 Functions 配置为流分析作业的接收器(输出)之一,从 Azure 流分析运行 Azure Functions。 Functions 是事件驱动的按需计算体验,它允许实现由 Azure 或第三方服务中出现的事件所触发的代码。 Functions 能够响应触发器事件的能力使得它成为流分析作业的理想输出。
- 流分析通过 HTTP 触发器调用函数。 通过 Functions 输出适配器,用户可以将 Functions 连接到流分析,以便基于流分析查询触发事件。
- 不支持从多租户群集中运行的流分析作业连接到虚拟网络(VNet)中的Azure Functions。
本教程介绍如何执行下列操作:
- 创建 Azure 事件中心实例
- 创建用于 Redis 的 Azure 缓存实例
- 创建 Azure 函数
- 创建流分析作业
- 将事件中心配置为输入,并将函数配置为输出
- 运行流分析作业
- 在用于 Redis 的 Azure 缓存中检查结果
如果没有 Azure 订阅,可在开始前创建一个试用帐户。
先决条件
在开始之前,请确保完成以下步骤:
- 如果没有 Azure 订阅,请创建试用版。
- 请从 Microsoft 下载中心下载电话呼叫事件生成器应用 TelcoGenerator.zip,或者从 GitHub 获取源代码。
登录 Azure
登录到 Azure 门户。
创建事件中心
需要先将一些示例数据发送到事件中心,然后流分析才能分析欺诈性呼叫数据流。 在本教程中,你将使用 Azure 事件中心将数据发送到 Azure。
请按以下步骤创建一个事件中心,然后向该事件中心发送调用数据:
登录到 Azure 门户。
选择左侧菜单中的“所有服务”,选择“物联网”,将鼠标悬停在“事件中心”上,然后选择“+ (添加)”按钮。
在“创建命名空间”页上执行以下步骤:
选择用于创建事件中心的 Azure 订阅。
对于“资源组”,请选择“新建”,然后输入资源组的名称。 事件中心命名空间将在此资源组中创建。
对于“命名空间名称”,请输入事件中心命名空间的唯一名称。
对于“位置”,请选择要在其中创建命名空间的区域。
对于“定价层”,请选择“标准”。
在页面底部选择“查看 + 创建”。
在命名空间创建向导的“查看 + 创建”页上,在查看所有设置后,选择页面底部的“创建”。
成功部署命名空间后,选择“转到资源”以导航到“事件中心命名空间”页。
在 “事件中心命名空间 ”页上,选择命令栏上的 “+事件中心 ”。
在“创建事件中心”页上,为事件中心输入一个名称。 将分区计数设置为 2。 对其余设置使用默认选项,然后选择“查看 + 创建”。
在“查看 + 创建”页上,选择页面底部的“创建”。 然后,等待部署成功完成。
授予对事件中心的访问权限,并获取连接字符串
在应用程序可以将数据发送到 Azure 事件中心之前,事件中心必须具有允许访问的策略。 访问策略生成包含授权信息的连接字符串。
在 “事件中心命名空间 ”页上,选择左侧菜单中 的“共享访问策略 ”。
从策略列表中选择 RootManageSharedAccessKey。
然后选择“连接字符串 - 主密钥”旁边的复制按钮。
将连接字符串粘贴到文本编辑器中。 需要在下一部分使用此连接字符串。
连接字符串如下所示:
Endpoint=sb://<Your event hub namespace>.servicebus.chinacloudapi.cn/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>请注意,连接字符串包含多个用分号隔开的键值对:Endpoint、SharedAccessKeyName 和 SharedAccessKey。
启动事件生成器应用程序
在启动 TelcoGenerator 应用之前,应该对其进行配置,以便将数据发送到此前创建的 Azure 事件中心。
提取 TelcoGenerator.zip 文件的内容。
在所选文本编辑器中打开
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config文件。有多个.config文件,因此请确保打开正确的文件。使用以下详细信息更新配置文件中的
<appSettings>元素:- 将 EventHubName 键的值设置为在上一部分创建的事件中心的名称。
- 将 Microsoft.ServiceBus.ConnectionString 键的值设置为命名空间的连接字符串。 如果将连接字符串用于事件中心(而不是命名空间),请移除末尾的
EntityPath值(;EntityPath=myeventhub)。 别忘了删除 EntityPath 值前面的分号。
保存文件。
接下来打开命令窗口,转到解压缩 TelcoGenerator 应用程序的文件夹。 然后输入以下命令:
.\telcodatagen.exe 1000 0.2 2此命令采用以下参数:
- 每小时的呼叫数据记录数。
- 欺诈概率 (%),即应用模拟欺诈呼叫的频率。 值 0.2 表示大约有 20% 的通话记录似乎具有欺诈性。
- 持续时间(小时),即应用应运行的小时数。 还可以通过在命令行终止此过程 (Ctrl+C) 来随时停止该应用。
几秒钟后,当应用将电话通话记录发送到事件中心时,应用将开始在屏幕上显示通话记录。 这些电话呼叫数据包含以下字段:
记录 定义 CallrecTime 呼叫开始时间的时间戳。 SwitchNum 用于连接呼叫的电话交换机。 在此示例中,选项是表示来源国家/地区名称的字符串(例如美国、中国、英国、德国或者澳大利亚)。 CallingNum 呼叫方的电话号码。 CallingIMSI 国际移动用户标识 (IMSI)。 它是呼叫方的唯一标识符。 CalledNum 呼叫接收人的电话号码。 CalledIMSI 国际移动用户标识 (IMSI)。 它是呼叫接收人的唯一标识符。
创建流分析作业
有了呼叫事件流以后,即可创建流分析作业,以便从事件中心读取数据。
若要创建流分析作业,请导航到 Azure 门户。
在左侧菜单中选择 “所有服务 ”,搜索 流分析作业,将鼠标悬停在 “流分析作业 ”磁贴上,然后在 + 弹出窗口中选择按钮或选择“ 创建 ”。
在 “新建流分析作业 ”页上,执行以下步骤:
对于“订阅”,请选择包含该事件中心命名空间的订阅。
对于“资源组”,请选择先前创建的资源组。
在“实例详细信息”部分,对于“名称”,请输入流分析作业的唯一名称。
对于Region,选择您希望创建流式分析作业的区域。 我们建议将作业和事件中心放在同一区域以获得最佳性能,这样还无需为不同区域之间的数据传输付费。
对于托管环境,如果尚未选择,请选择“云”。 流分析作业可以部署到云或边缘设备。 Cloud 允许你部署到 Azure 云,Edge 允许你部署到 IoT Edge 设备。
对于流处理单元,选择1。 流处理单元表示执行作业所需的计算资源。 默认情况下,此值设置为 1。 若要了解如何缩放流单元,请参阅了解和调整流单元一文。
在页面底部选择“查看 + 创建”。
在“查看 + 创建”页上查看设置,然后选择“创建”以创建流分析作业。
部署作业后,选择“转至资源”以进入“流分析作业”页。
配置作业输入
下一步是使用在上一部分创建的事件中心,为用于读取数据的作业定义输入源。
在“流分析作业”页上,在左侧菜单中的“作业拓扑”部分选择“输入”。
在“输入”页上,选择“+ 添加输入”和“事件中心”。
在“事件中心”页上执行以下步骤:
对于输入别名,请输入CallStream。 输入别名是用于标识输入的易记名称。 输入别名只能包含字母数字字符和连字符,并且长度必须为 3-63 个字符。
对于“订阅”,请选择您创建事件中心所在的 Azure 订阅。 事件中心可以位于与流分析作业相同的订阅计划中,也可以位于不同的订阅计划中。
对于“事件中心命名空间”,请选择在上一部分创建的事件中心命名空间。 当前订阅中可用的所有命名空间均列在下拉列表中。
对于“事件中心名称”,请选择您在上一部分创建的事件中心。 所选命名空间中可用的所有事件中心均列在下拉列表中。
对于“事件中心使用者组”,请保持选中“新建”选项,以便在事件中心上创建新的使用者组。 建议对每个流分析作业使用不同的使用者组。 如果未指定任何使用者组,流分析作业将使用
$Default使用者组。 如果作业包含自联接或具有多个输入,则稍后的某些输入可能会由多个读取器读取。 这种情况会影响单个使用者组中的读取器数量。对于身份验证模式,请选择连接字符串。 使用此选项可以更轻松地测试本教程。
对于 事件中心策略名称,请选择 “使用现有”,然后选择默认策略: RootManageSharedAccessKey。
选择页面底部的“保存” 。
创建用于 Redis 的 Azure 缓存实例
使用创建 Azure Cache for Redis 实例中所述的步骤,在 Azure Cache for Redis 中创建缓存。
创建缓存后,在“设置” 下方选择“访问密钥” 。 记下主要连接字符串 。
在Azure Functions中创建将数据写入Azure Cache for Redis的函数
请参阅 Functions 文档的创建函数应用一节。 此示例使用:
按照本教程的说明,在 Visual Studio Code 中创建默认 HttpTrigger 函数应用。 使用以下信息:语言:
C#,运行时:.NET 6(在函数 v4 下),模板:HTTP trigger。通过在位于项目文件夹的终端中运行以下命令来安装 Redis 客户端库:
dotnet add package StackExchange.Redis --version 2.2.88在
RedisConnectionString的RedisDatabaseIndex部分中添加Values和local.settings.json项,并填写目标服务器的连接字符串。{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }Redis 数据库索引是用于标识实例上的数据库的 0 到 15 之间的数字。
将整个函数(.cs项目中的文件)替换为以下代码片段。 使用您自己的命名空间、类名称和函数名称进行更新:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }当流分析从函数收到“HTTP 请求实体过大”异常时,它会减小发送给函数的批次大小。 下面的代码确保流分析不会发送过大的批。 确保函数中使用的最大批次数和最大批次大小值与在流分析门户中输入的值一致。
发布函数到Azure。
在 Azure 门户中打开该功能,然后为 和
RedisConnectionString设置RedisDatabaseIndex。
更新流分析作业,以函数作为输出
在 Azure 门户网站中打开流分析作业。
浏览到你的函数,然后选择[概述] > [输出] > [添加]。 要添加新的输出,请选择 Azure Function 作为接收器选项。 Functions 输出适配器具有以下属性:
属性名称 说明 输出别名 在作业查询中,用一个用户友好的名称来引用输出。 导入选项 可使用当前订阅中的函数;如果函数位于其他订阅中,也可手动提供设置。 Function App 您的 Functions 应用的名称 函数 Functions 应用中函数的名称(run.csx 函数的名称)。 最大批大小 设置发送到函数的每个输出批的最大大小(以字节为单位)。 默认情况下,此值设置为 262,144 字节 (256 KB)。 最大批数 指定发送给函数的每个批次中的最大事件数。 默认值为 100。 此属性是可选的。 密钥 使您能够使用来自其他订阅的函数。 提供用于访问你的函数的键值。 此属性是可选的。 为输出别名指定一个名称。 在本教程中,它被命名为saop1,但你可以使用任何你选择的名称。 填写其他详细信息。
打开流分析作业,将查询更新为以下内容。
重要
以下示例脚本假定你为输入名称使用了 CallStream,并为输出名称使用了 saop1。 如果使用了不同的名称,请更新查询。
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum在命令行中运行以下命令,启动 telcodatagen.exe 应用程序。 该命令使用格式
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]。telcodatagen.exe 1000 0.2 2启动流分析作业。
在 Azure 函数的“监视”页上,可以看到该函数已调用。
在缓存的“Azure Cache for Redis”页上,在左侧菜单中选择“指标”,添加“缓存写入”指标,并将持续时间设置为“过去一小时”。 你将看到类似于下图的图表。
在用于 Redis 的 Azure 缓存中检查结果
从 Azure Functions 日志中获取键
首先,获取插入到 Azure Cache for Redis 中的记录的密钥。 在代码中,该键是在 Azure 函数中计算的,如以下代码片段所示:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
浏览到 Azure 门户并找到你的 Azure Functions 应用。
在左侧菜单中选择“函数”。
从函数列表中选择“HTTPTrigger1”。
在左侧菜单中选择“监视”。
切换到“日志”选项卡。
记下信息消息中的某个键,如以下屏幕截图所示。 使用此键在 Azure Cache for Redis 中查找值。
使用该键在 Azure Cache for Redis 中查找记录
浏览到 Azure 门户,并查找你的用于 Redis 的 Azure 缓存。 选择“控制台” 。
使用用于 Redis 的 Azure 缓存命令验证你的数据是否在用于 Redis 的 Azure 缓存中。 (该命令采用 Get {key} 格式。)使用从 Azure 函数的监视器日志中复制的键(在上一部分中)。
获取“上一部分的密钥”
此命令输出指定键的值:
错误处理和重试
如果在将事件发送到 Azure Functions 时失败,流分析会重试大多数操作。 它重试所有 HTTP 异常,直到成功,但 HTTP 错误 413(实体太大)除外。 实体太大错误被视为受 重试或删除策略约束的数据错误。
注意
从流分析到 Azure Functions 的 HTTP 请求的超时设置为 100 秒。 如果Azure Functions应用处理批处理需要 100 秒以上,流分析将返回错误并重试批处理。
重试超时可能会导致向输出接收器写入重复事件。 当流分析重试失败的批处理时,它会重试批处理中的所有事件。 例如,考虑一批由 Stream Analytics 发送到 Azure Functions 的 20 个事件。 假定 Azure Functions 需要 100 秒来处理该批次中的前 10 个事件。 100秒后,流分析暂停执行该请求,因为未收到来自Azure Functions的正面响应,并对同一批次发送另一请求。 Azure Functions再次处理批处理中的前 10 个事件,这会导致重复。
已知问题
在 Azure 门户中,当您尝试将“最大批大小”或“最大批处理计数”值重置为空(即默认值)时,这些值在您保存后会更改回先前输入的值。 这时,请手动输入这些字段的默认值。
流分析目前不支持在Azure Functions上使用 HTTP 路由。
连接到虚拟网络中托管的 Azure Functions 的支持未启用。
清理资源
不再需要资源时,请删除资源组、流式处理作业和所有相关资源。 删除作业将停止对作业消耗的流处理单元计费。 如果计划在将来使用该作业,可以在需要时停止该作业,并在以后重新启动它。 如果不打算继续使用该作业,请按照以下步骤删除本快速入门创建的所有资源:
- 在 Azure 门户的左侧菜单中选择“资源组”,然后选择已创建资源的名称。
- 在资源组页上选择“删除”,在文本框中键入要删除的资源的名称,然后选择“删除”。
后续步骤
在本教程中,你创建了一个运行 Azure 函数的简单流分析作业。 若要详细了解流分析作业,请继续阅读下一教程: