教程:从 Azure 流分析作业运行 Azure FunctionsTutorial: Run Azure Functions from Azure Stream Analytics jobs

可将 Functions 配置为流分析作业的输出接收器之一,以便通过 Azure 流分析运行 Azure Functions。You can run Azure Functions from Azure Stream Analytics by configuring Functions as one of the output sinks to the Stream Analytics job. Functions 是事件驱动的按需计算体验,它允许实现由 Azure 或第三方服务中出现的事件所触发的代码。Functions are an event-driven, compute-on-demand experience that lets you implement code that is triggered by events occurring in Azure or third-party services. Functions 响应触发的这一功能使其成为流分析作业的自然输出。This ability of Functions to respond to triggers makes it a natural output to Stream Analytics jobs.

流分析通过 HTTP 触发调用 Functions。Stream Analytics invokes Functions through HTTP triggers. 通过 Functions 输出适配器,用户可以将 Functions 连接到流分析,以便基于流分析查询触发事件。The Functions output adapter allows users to connect Functions to Stream Analytics, such that the events can be triggered based on Stream Analytics queries.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 创建和运行流分析作业Create and run a Stream Analytics job
  • 创建用于 Redis 的 Azure 缓存实例Create an Azure Cache for Redis instance
  • 创建 Azure 函数Create an Azure Function
  • 在用于 Redis 的 Azure 缓存中检查结果Check Azure Cache for Redis for results

如果没有 Azure 订阅,请在开始前创建一个试用帐户If you don’t have an Azure subscription, create a trial account before you begin.

创建流分析作业以运行函数Configure a Stream Analytics job to run a function

本部分演示了如何配置流分析作业来运行一个函数以将数据写入到用于 Redis 的 Azure 缓存。This section demonstrates how to configure a Stream Analytics job to run a function that writes data to Azure Cache for Redis. 流分析作业从 Azure 事件中心读取事件,并运行调用函数的查询。The Stream Analytics job reads events from Azure Event Hubs, and runs a query that invokes the function. 此函数从流分析作业读取数据,并将其写入到用于 Redis 的 Azure 缓存中。This function reads data from the Stream Analytics job, and writes it to Azure Cache for Redis.

显示各项 Azure 服务间关系的图表

创建以事件中心为输入的流分析作业Create a Stream Analytics job with Event Hubs as input

按照实时欺诈检测教程以创建事件中心,启动事件生成器应用程序,并创建流分析作业。Follow the Real-time fraud detection tutorial to create an event hub, start the event generator application, and create a Stream Analytics job. 跳过创建查询和输出的步骤。Skip the steps to create the query and the output. 改为按以下各节所述设置 Azure Functions 输出。Instead, see the following sections to set up an Azure Functions output.

创建用于 Redis 的 Azure 缓存实例Create an Azure Cache for Redis instance

  1. 使用创建缓存中所述的步骤,在用于 Redis 的 Azure 缓存中创建缓存。Create a cache in Azure Cache for Redis by using the steps described in Create a cache.

  2. 创建缓存后,在“设置” 下方选择“访问密钥” 。After you create the cache, under Settings, select Access Keys. 记下主要连接字符串 。Make a note of the Primary connection string.

    用于 Redis 的 Azure 缓存连接字符串的屏幕截图

在 Azure Functions 中创建可将数据写入到用于 Redis 的 Azure 缓存的函数Create a function in Azure Functions that can write data to Azure Cache for Redis

  1. 请参阅 Functions 文档的创建函数应用一节。See the Create a function app section of the Functions documentation. 该小节演示了如何通过使用 CSharp 语言,在 Azure Functions 中创建函数应用和 HTTP 触发的函数This walks you through how to create a function app and an HTTP-triggered function in Azure Functions, by using the CSharp language.

  2. 浏览到 run.csx 函数。Browse to the run.csx function. 将其更新为以下代码。Update it with the following code. 将“<在此处放置用于 Redis 的 Azure 缓存连接字符串>” 替换为上一节中检索到的用于 Redis 的 Azure 缓存主连接字符串。Replace "<your Azure Cache for Redis connection string goes here>" with the Azure Cache for Redis primary connection string that you retrieved in the previous section.

    using System;
    using System.Net;
    using System.Threading.Tasks;
    using StackExchange.Redis;
    using Newtonsoft.Json;
    using System.Configuration;
    
    public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
    {
        log.Info($"C# HTTP trigger function processed a request. RequestUri={req.RequestUri}");
    
        // Get the request body
        dynamic dataArray = await req.Content.ReadAsAsync<object>();
    
        // Throw an HTTP Request Entity Too Large exception when the incoming batch(dataArray) is greater than 256 KB. Make sure that the size value is consistent with the value entered in the Stream Analytics portal.
    
        if (dataArray.ToString().Length > 262144)
        {
            return new HttpResponseMessage(HttpStatusCode.RequestEntityTooLarge);
        }
        var connection = ConnectionMultiplexer.Connect("<your Azure Cache for Redis connection string goes here>");
        log.Info($"Connection string.. {connection}");
    
        // Connection refers to a property that returns a ConnectionMultiplexer
        IDatabase db = connection.GetDatabase();
        log.Info($"Created database {db}");
    
        log.Info($"Message Count {dataArray.Count}");
    
        // Perform cache operations using the cache object. For example, the following code block adds few integral data types to the cache
        for (var i = 0; i < dataArray.Count; i++)
        {
            string time = dataArray[i].time;
            string callingnum1 = dataArray[i].callingnum1;
            string key = time + " - " + callingnum1;
            db.StringSet(key, dataArray[i].ToString());
            log.Info($"Object put in database. Key is {key} and value is {dataArray[i].ToString()}");
    
            // Simple get of data types from the cache
            string value = db.StringGet(key);
            log.Info($"Database got: {value}");
        }
    
        return req.CreateResponse(HttpStatusCode.OK, "Got");
    }
    
    

    当流分析从函数收到“HTTP 请求实体过大”异常时,将减小发送到 Azure Functions 的批次的大小。When Stream Analytics receives the "HTTP Request Entity Too Large" exception from the function, it reduces the size of the batches it sends to Functions. 下面的代码确保流分析不会发送过大的批。The following code ensures that Stream Analytics doesn't send oversized batches. 确保函数中使用的最大批次数和最大批次大小值与在流分析门户中输入的值一致。Make sure that the maximum batch count and size values used in the function are consistent with the values entered in the Stream Analytics portal.

    if (dataArray.ToString().Length > 262144)
        {
            return new HttpResponseMessage(HttpStatusCode.RequestEntityTooLarge);
        }
    
  3. 在所选的文本编辑器中,创建名为 project.json 的 JSON 文件。In a text editor of your choice, create a JSON file named project.json. 粘贴下面的代码,并将其保存在本地计算机上。Paste the following code, and save it on your local computer. 此文件包含 C# 函数所需的 NuGet 包依赖项。This file contains the NuGet package dependencies required by the C# function.

    {
        "frameworks": {
            "net46": {
                "dependencies": {
                    "StackExchange.Redis":"1.1.603",
                    "Newtonsoft.Json": "9.0.1"
                }
            }
        }
    }
    
    
  4. 返回到 Azure 门户。Go back to the Azure portal. 从“平台功能” 选项卡,浏览到你的函数。From the Platform features tab, browse to your function. 在“开发工具” 下方,选择“应用服务编辑器” 。Under Development Tools, select App Service Editor.

    应用服务编辑器的屏幕截图

  5. 在应用服务编辑器中,右键单击根目录,并上传 project.json 文件。In the App Service Editor, right-click your root directory, and upload the project.json file. 上传成功后,刷新页面。After the upload is successful, refresh the page. 现在,应可看到名为 project.lock.json 的自动生成文件。You should now see an autogenerated file named project.lock.json. 该自动生成文件包含对 project.json 文件中指定 .dll 文件的引用。The autogenerated file contains references to the .dll files that are specified in the project.json file.

    应用服务编辑器的屏幕截图

更新流分析作业,以函数作为输出Update the Stream Analytics job with the function as output

  1. 在 Azure 门户中打开流分析作业。Open your Stream Analytics job on the Azure portal.

  2. 浏览到你的函数,并选择“概述” > “输出” > “添加” 。Browse to your function, and select Overview > Outputs > Add. 若要添加新的输出,请选择“Azure 函数” 接收器选项。To add a new output, select Azure Function for the sink option. Functions 输出适配器具有以下属性:The Functions output adapter has the following properties:

    属性名称Property name 说明Description
    输出别名Output alias 在作业查询中输入指代输出的用户友好名称。A user-friendly name that you use in the job's query to reference the output.
    导入选项Import option 可使用当前订阅中的函数;如果函数位于其他订阅中,也可手动提供设置。You can use the function from the current subscription, or provide the settings manually if the function is located in another subscription.
    Function AppFunction App Functions 应用的名称Name of your Functions app.
    函数Function Functions 应用中函数的名称(run.csx 函数的名称)。Name of the function in your Functions app (name of your run.csx function).
    最大批大小Max Batch Size 设置每个输出批的最大大小(以字节为单位),此值将发送到你的函数。Sets the maximum size for each output batch which is sent to your function in bytes. 默认情况下,此值设置为 262,144 字节 (256 KB)。By default, this value is set to 262,144 bytes (256 KB).
    最大批数Max Batch Count 指定发送给函数的每个批次中的最大事件数。Specifies the maximum number of events in each batch that is sent to the function. 默认值为 100。The default value is 100. 此属性是可选的。This property is optional.
    Key 可以使用其他订阅中的函数。Allows you to use a function from another subscription. 提供用于访问你的函数的键值。Provide the key value to access your function. 此属性是可选的。This property is optional.
  3. 命名输出别名。Provide a name for the output alias. 在本教程中,我们将其命名为 saop1 ,但你可以使用所选的任何名称。In this tutorial, it is named saop1, but you can use any name of your choice. 填写其他详细信息。Fill in other details.

  4. 打开流分析作业,将查询更新为以下内容。Open your Stream Analytics job, and update the query to the following. 如果没有将输出接收器命名为 saop1,请记住在查询中更改它。If you did not name your output sink saop1, remember to change it in the query.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. 在命令行中运行以下命令,启动 telcodatagen.exe 应用程序。Start the telcodatagen.exe application by running the following command in command line. 该命令使用格式 telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]The command uses the format telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. 启动流分析作业。Start the Stream Analytics job.

在用于 Redis 的 Azure 缓存中检查结果Check Azure Cache for Redis for results

  1. 浏览到 Azure 门户,并查找你的用于 Redis 的 Azure 缓存。Browse to the Azure portal, and find your Azure Cache for Redis. 选择“控制台” 。Select Console.

  2. 使用用于 Redis 的 Azure 缓存命令验证你的数据是否在用于 Redis 的 Azure 缓存中。Use Azure Cache for Redis commands to verify that your data is in Azure Cache for Redis. (该命令采用 Get {key} 格式。)例如:(The command takes the format Get {key}.) For example:

    Get "12/19/2017 21:32:24 - 123414732"Get "12/19/2017 21:32:24 - 123414732"

    此命令应会打印指定键的值:This command should print the value for the specified key:

    用于 Redis 的 Azure 缓存输出的屏幕截图

错误处理和重试Error handling and retries

如果在将事件发送到 Azure Functions 时失败,流分析会重试,直至成功完成操作。In the event of a failure while sending events to Azure Functions, Stream Analytics retries to successfully complete the operation. 不过,有些失败不会进行重试,如下所示:However, there are some failures for which retries are not attempted and they are as follows:

  1. HttpRequestExceptionsHttpRequestExceptions
  2. 请求实体太大(Http 错误代码 413)Request Entity Too Large (Http error code 413)
  3. ApplicationExceptionsApplicationExceptions

已知问题Known issues

在 Azure 门户中,尝试将最大批次大小/最大批次数值重置为空(默认),值将在保存时改回上次输入的值。In the Azure portal, when you try to reset the Max Batch Size/ Max Batch Count value to empty (default), the value changes back to the previously entered value upon save. 这时,请手动输入这些字段的默认值。Manually enter the default values for these fields in this case.

流分析当前不支持在 Azure Functions 上使用 Http 路由The use of Http routing on your Azure Functions is currently not supported by Stream Analytics.

清理资源Clean up resources

若不再需要资源组、流式处理作业以及所有相关资源,请将其删除。When no longer needed, delete the resource group, the streaming job, and all related resources. 删除作业可避免对作业使用的流单元进行计费。Deleting the job avoids billing the streaming units consumed by the job. 如果计划在将来使用该作业,可以先停止该作业,以后在需要时再重启该作业。If you're planning to use the job in future, you can stop it and re-start it later when you need. 如果不打算继续使用该作业,请按照以下步骤删除本快速入门创建的所有资源:If you are not going to continue to use this job, delete all resources created by this quickstart by using the following steps:

  1. 在 Azure 门户的左侧菜单中,单击“资源组”,然后单击已创建资源的名称。 From the left-hand menu in the Azure portal, click Resource groups and then click the name of the resource you created.
  2. 在资源组页上单击“删除” ,在文本框中键入要删除的资源的名称,并单击“删除” 。On your resource group page, click Delete, type the name of the resource to delete in the text box, and then click Delete.

后续步骤Next steps

在本教程中,你创建了一个简单的运行 Azure 函数的流分析作业。In this tutorial, you have created a simple Stream Analytics job, that runs an Azure Function. 若要详细了解流分析作业,请继续阅读下一教程:To learn more about Stream Analytics jobs, continue to the next tutorial: