Azure 流分析入门:实时欺诈检测Get started using Azure Stream Analytics: Real-time fraud detection

本教程提供了有关如何使用 Azure 流分析的端到端说明。This tutorial provides an end-to-end illustration of how to use Azure Stream Analytics. 你将学习如何执行以下操作:You learn how to:

  • 将流事件引入到 Azure 事件中心实例中。Bring streaming events into an instance of Azure Event Hubs. 本教程将使用一个应用来模拟移动电话元数据记录流。In this tutorial, you'll use an app that simulates a stream of mobile-phone metadata records.

  • 编写类似 SQL 的流分析查询来转换数据、聚合信息或查找模式。Write SQL-like Stream Analytics queries to transform data, aggregating information or looking for patterns. 了解如何使用查询来检查传入的流,并查找可能存在欺诈的呼叫。You will see how to use a query to examine the incoming stream and look for calls that might be fraudulent.

  • 将结果发送给输出接收器(存储),进行分析以获得其他见解。Send the results to an output sink (storage) that you can analyze for additional insights. 在这种情况下,可以将可疑呼叫数据发送到 Azure Blob 存储。In this case, you'll send the suspicious call data to Azure Blob storage.

本教程将使用基于电话呼叫数据的实时欺诈检测的示例。This tutorial uses the example of real-time fraud detection based on phone-call data. 所演示的技术还适用于其他类型的欺诈检测,如信用卡欺诈或身份盗用。The technique illustrated is also suited for other types of fraud detection, such as credit card fraud or identity theft.

方案:实时进行远程通信和 SIM 欺诈检测Scenario: Telecommunications and SIM fraud detection in real time

电信公司的传入呼叫数据量很大。A telecommunications company has a large volume of data for incoming calls. 公司希望实时检测欺诈呼叫,以便他们可以通知客户或针对特定号码关闭服务。The company wants to detect fraudulent calls in real time so that they can notify customers or shut down service for a specific number. 有一种 SIM 欺诈涉及在同一时间以同一身份发起但位于不同地理位置的多个呼叫。One type of SIM fraud involves multiple calls from the same identity around the same time but in geographically different locations. 若要检测此类欺诈,公司需要检查来电记录,并查找特定模式 - 在本例中,将查找在不同国家/地区同时发起的呼叫。To detect this type of fraud, the company needs to examine incoming phone records and look for specific patterns—in this case, for calls made around the same time in different countries/regions. 任何属于此类别的电话记录都将写入到存储中,以供后续分析。Any phone records that fall into this category are written to storage for subsequent analysis.

先决条件Prerequisites

本教程将通过使用生成示例电话呼叫元数据的客户端应用来模拟电话呼叫数据。In this tutorial, you'll simulate phone-call data by using a client app that generates sample phone call metadata. 应用生成的某些记录看起来类似欺诈呼叫。Some of the records that the app produces look like fraudulent calls.

在开始之前,请确保具有以下各项:Before you start, make sure you have the following:

  • 一个 Azure 帐户。An Azure account.

  • 呼叫事件生成器应用 TelcoGenerator.zip,可以从 Microsoft 下载中心下载此应用。The call-event generator app, TelcoGenerator.zip, which can be downloaded from the Microsoft Download Center. 将此包解压缩到计算机上的文件夹中。Unzip this package into a folder on your computer. 如果想要查看源代码,并在调试程序中运行该应用,可从 GitHub 获取应用源代码。If you want to see the source code and run the app in a debugger, you can get the app source code from GitHub.

    备注

    Windows 可能会阻止下载的 .zip 文件。Windows might block the downloaded .zip file. 如果无法将其解压缩,请右键单击该文件,然后选择“属性”。If you can't unzip it, right-click the file and select Properties. 如果看到“此文件来自其他计算机,可能被阻止以帮助保护该计算机”的消息,则选择“取消阻止”选项,然后单击“应用”。If you see the "This file came from another computer and might be blocked to help protect this computer" message, select the Unblock option and then click Apply.

如果想要检查流分析作业的结果,还需要一种用于查看 Azure Blob 存储容器内容的工具。If you want to examine the results of the Streaming Analytics job, you also need a tool for viewing the contents of an Azure Blob Storage container. 如果使用 Visual Studio,则可以使用 Azure Tools for Visual StudioVisual Studio Cloud ExplorerIf you use Visual Studio, you can use Azure Tools for Visual Studio or Visual Studio Cloud Explorer. 或者,可以安装独立工具,如 Azure 存储资源管理器Azure 资源管理器Alternatively, you can install standalone tools like Azure Storage Explorer or Azure Explorer.

创建 Azure 事件中心以引入事件Create an Azure Event Hubs to ingest events

若要分析数据流,请将其引入到 Azure 中。To analyze a data stream, you ingest it into Azure. 引入数据的典型方式是使用 Azure 事件中心,该中心每秒可以引入数百万个事件,从而使用户能够处理并存储事件信息。A typical way to ingest data is to use Azure Event Hubs, which lets you ingest millions of events per second and then process and store the event information. 本教程将创建事件中心,然后让呼叫事件生成器应用将呼叫数据发送至该事件中心。For this tutorial, you will create an event hub and then have the call-event generator app send call data to that event hub.

备注

有关此过程的更详细版本,请参阅使用 Azure 门户创建事件中心命名空间和事件中心For a more detailed version of this procedure, see Create an Event Hubs namespace and an event hub using the Azure portal.

创建命名空间和事件中心Create a namespace and event hub

在此过程中,首先创建事件中心命名空间,然后将事件中心添加到该命名空间。In this procedure, you first create an event hub namespace, and then you add an event hub to that namespace. 事件中心命名空间用于逻辑分组相关的事件总线实例。Event hub namespaces are used to logically group related event bus instances.

  1. 登录到 Azure 门户,然后单击屏幕左上角的“创建资源”。Log in to the Azure portal, and click Create a resource at the top left of the screen.

  2. 在左侧菜单中选择“所有服务”,然后在“分析”类别中的“事件中心”旁边选择星号 (*)Select All services in the left menu, and select star (*) next to Event Hubs in the Analytics category. 确认“事件中心”已添加到左侧导航菜单中的“收藏夹”。 Confirm that Event Hubs is added to FAVORITES in the left navigational menu.

    搜索事件中心

  3. 选择左侧导航菜单中“收藏夹”下的“事件中心”,然后选择工具栏上的“添加”。 Select Event Hubs under FAVORITES in the left navigational menu, and select Add on the toolbar.

    “添加”按钮

  4. 在“创建命名空间”窗格中,输入命名空间名称,例如 <yourname>-eh-ns-demoIn the Create namespace pane, enter a namespace name such as <yourname>-eh-ns-demo. 可以对命名空间使用任何名称,但该名称必须对 URL 有效,并且在 Azure 中必须唯一。You can use any name for the namespace, but the name must be valid for a URL and it must be unique across Azure.

  5. 选择订阅并创建或选择一个资源组,然后单击“创建”。Select a subscription and create or choose a resource group, then click Create.


    Create event hub namespace in Azure portal

  6. 完成部署命名空间后,在 Azure 资源列表中找到事件中心命名空间。When the namespace has finished deploying, find the event hub namespace in your list of Azure resources.

  7. 单击新的命名空间,然后在“命名空间”窗格中,单击“事件中心”。Click the new namespace, and in the namespace pane, click Event Hub.

    用于创建新事件中心的“添加事件中心”按钮

  8. 将新事件中心命名为 asa-eh-frauddetection-demoName the new event hub asa-eh-frauddetection-demo. 可以使用其他名称。You can use a different name. 如果使用其他名称,请记下该名称,稍后会用到。If you do, make a note of it, because you need the name later. 不需要立即为事件中心设置任何其他选项。You don't need to set any other options for the event hub right now.


    Name event hub in Azure portal

  9. 单击创建Click Create.

授予对事件中心的访问权限,并获取连接字符串Grant access to the event hub and get a connection string

在进程可以将数据发送到事件中心之前,事件中心必须具有允许适当访问的策略。Before a process can send data to an event hub, the event hub must have a policy that allows appropriate access. 访问策略生成包含授权信息的连接字符串。The access policy produces a connection string that includes authorization information.

  1. 在“事件命名空间”窗格中,单击“事件中心”,然后单击新事件中心的名称。In the event namespace pane, click Event Hubs and then click the name of your new event hub.

  2. 在“事件中心”窗格中,单击“共享访问策略”,然后单击“+ 添加”。In the event hub pane, click Shared access policies and then click + Add.

    备注

    确保使用的是事件中心,而不是事件中心命名空间。Make sure you're working with the event hub, not the event hub namespace.

  3. 添加名为 asa-policy-manage-demo 的策略,对于“声明”,请选择“管理”。 Add a policy named asa-policy-manage-demo and for Claim, select Manage.


    Create shared access policy for Stream Analytics

  4. 单击创建Click Create.

  5. 部署策略后,在共享访问策略列表中单击该策略。After the policy has been deployed, click it in the list of shared access policies.

  6. 找到标记为“连接字符串 - 主键”的框,然后单击连接字符串旁边的“复制”按钮。Find the box labeled CONNECTION STRING-PRIMARY KEY and click the copy button next to the connection string.


    Stream Analytics shared access policy

  7. 将连接字符串粘贴到文本编辑器中。Paste the connection string into a text editor. 对此连接字符串稍加编辑,以便在下一部分中使用。You need this connection string for the next section, after you make some small edits to it.

    连接字符串如下所示:The connection string looks like this:

    Endpoint=sb://YOURNAME-eh-ns-demo.servicebus.chinacloudapi.cn/;SharedAccessKeyName=asa-policy-manage-demo;SharedAccessKey=Gw2NFZwU1Di+rxA2T+6hJYAtFExKRXaC2oSQa0ZsPkI=;EntityPath=asa-eh-frauddetection-demo

    请注意,连接字符串包含多个键值对,用分号分隔:EndpointSharedAccessKeyNameSharedAccessKeyEntityPathNotice that the connection string contains multiple key-value pairs, separated with semicolons: Endpoint, SharedAccessKeyName, SharedAccessKey, and EntityPath.

配置并启动事件生成器应用程序Configure and start the event generator application

在启动 TelcoGenerator 应用之前,必须对该应用进行配置,使其能够向创建的事件中心发送呼叫记录。Before you start the TelcoGenerator app, you must configure it so that it will send call records to the event hub you created.

配置 TelcoGenerator 应用Configure the TelcoGenerator app

  1. 在复制连接字符串的编辑器中,记下 EntityPath 值,然后删除 EntityPath 对(不要忘了删除它前面的分号)。In the editor where you copied the connection string, make a note of the EntityPath value, and then remove the EntityPath pair (don't forget to remove the semicolon that precedes it).

  2. 在 TelcoGenerator.zip 文件解压缩到的文件夹中,在编辑器中打开 telcodatagen.exe.config 文件。In the folder where you unzipped the TelcoGenerator.zip file, open the telcodatagen.exe.config file in an editor. (该文件夹中有多个 .config 文件,因此请确保打开正确的文件。)(There is more than one .config file, so be sure that you open the right one.)

  3. <appSettings> 元素中:In the <appSettings> element:

    • EventHubName 键的值设置为事件中心名称(即实体路径的值)。Set the value of the EventHubName key to the event hub name (that is, to the value of the entity path).
    • Microsoft.ServiceBus.ConnectionString 键的值设置为连接字符串。Set the value of the Microsoft.ServiceBus.ConnectionString key to the connection string.

    <appSettings> 部分与以下示例类似:The <appSettings> section will look like the following example:

    <appSettings>
     <!-- Service Bus specific app setings for messaging connections -->
     <add key="EventHubName" value="asa-eh-ns-demo"/>
     <add key="Microsoft.ServiceBus.ConnectionString" value="Endpoint=sb://asa-eh-ns-demo.servicebus.chinacloudapi.cn/;SharedAccessKeyName=asa-policy-manage-demo;SharedAccessKey=GEcnTKf2//1MRn6SN1A2u0O76MP9pj3v0Ccyf1su4Zo="/>
    </appSettings>
    
  4. 保存文件。Save the file.

启动应用Start the app

  1. 打开命令窗口,然后切换到 TelcoGenerator 应用解压缩到的文件夹。Open a command window and change to the folder where the TelcoGenerator app is unzipped.

  2. 输入以下命令:Enter the following command:

    telcodatagen.exe 1000 0.2 2
    

    参数包括:The parameters are:

    • 每小时的 CDR 数。Number of CDRs per hour.
    • SIM 卡欺诈概率:应用模拟欺诈呼叫的频率(占所有呼叫的百分比)。SIM Card Fraud Probability: How often, as a percentage of all calls, that the app should simulate a fraudulent call. 值 0.2 表示大约有 20% 的通话记录似乎具有欺诈性。The value 0.2 means that about 20% of the call records will look fraudulent.
    • 持续时间(以小时为单位)。Duration in hours. 应用应运行的小时数。The number of hours that the app should run. 还可以通过在命令行按 Ctrl+C 来随时停止该应用。You can also stop the app any time by pressing Ctrl+C at the command line.

    几秒钟后,当应用将电话通话记录发送到事件中心时,应用将开始在屏幕上显示通话记录。After a few seconds, the app starts displaying phone call records on the screen as it sends them to the event hub.

将在此实时欺诈检测应用程序中使用的某些关键字段如下所示:Some of the key fields that you will be using in this real-time fraud detection application are the following:

记录Record 定义Definition
CallrecTime 呼叫开始时间的时间戳。The timestamp for the call start time.
SwitchNum 用于连接呼叫的电话交换机。The telephone switch used to connect the call. 在此示例中,交换机是表示来源国家/地区(美国、中国、英国、德国或澳大利亚)的字符串。For this example, the switches are strings that represent the country/region of origin (US, China, UK, Germany, or Australia).
CallingNum 呼叫方的电话号码。The phone number of the caller.
CallingIMSI 国际移动用户标识 (IMSI)。The International Mobile Subscriber Identity (IMSI). 这是呼叫方的唯一标识符。This is the unique identifier of the caller.
CalledNum 呼叫接收人的电话号码。The phone number of the call recipient.
CalledIMSI 国际移动用户标识 (IMSI)。International Mobile Subscriber Identity (IMSI). 这是呼叫接收人的唯一标识符。This is the unique identifier of the call recipient.

创建流分析作业来管理流数据Create a Stream Analytics job to manage streaming data

现在已有了呼叫事件流,可以设置流分析作业了。Now that you have a stream of call events, you can set up a Stream Analytics job. 该作业将从你设置的事件中心读取数据。The job will read data from the event hub that you set up.

创建作业Create the job

  1. 在 Azure 门户中,单击“创建资源” > “物联网” > “流分析作业” 。In the Azure portal, click Create a resource > Internet of Things > Stream Analytics job.

  2. 将作业命名为 asa_frauddetection_job_demo,然后指定订阅、资源组和位置。Name the job asa_frauddetection_job_demo, specify a subscription, resource group, and location.

    为获得最佳性能,最好将作业和事件中心放置在同一区域,这样就不需要在不同区域之间传输数据。It's a good idea to place the job and the event hub in the same region for best performance and so that you don't pay to transfer data between regions.


    Create Stream Analytics job in portal

  3. 单击创建Click Create.

    创建作业后,门户将显示作业详细信息。The job is created and the portal displays job details. 尽管尚无任何应用正在运行,但必须先配置该作业,然后才能启动。Nothing is running yet, though—you have to configure the job before it can be started.

配置作业输入Configure job input

  1. 在仪表板或“所有资源”窗格中,查找并选择 asa_frauddetection_job_demo 流分析作业。In the dashboard or the All resources pane, find and select the asa_frauddetection_job_demo Stream Analytics job.

  2. 在“流分析作业”窗格的“概述”部分,单击“输入”框。In the Overview section of the Stream Analytics job pane, click the Input box.

    “流分析作业”窗格的“拓扑”下的“输入”框

  3. 单击“添加流输入”并选择“事件中心”。Click Add stream input and select Event Hub. 然后使用以下信息填充“新建输入”页:Then fill the New input page with the following information:

    设置Setting 建议的值Suggested value 说明Description
    输入别名Input alias CallStreamCallStream 输入一个名称,用于标识作业的输入。Enter a name to identify the job's input.
    订阅Subscription <Your subscription> 选择包含创建的事件中心的 Azure 订阅。Select the Azure subscription that has the Event Hub you created.
    事件中心命名空间Event Hub namespace asa-eh-ns-demoasa-eh-ns-demo 输入事件中心命名空间的名称。Enter the name of the Event Hub namespace.
    事件中心名称Event Hub name asa-eh-frauddetection-demoasa-eh-frauddetection-demo 选择事件中心的名称。Select the name of your Event Hub.
    事件中心策略名称Event Hub policy name asa-policy-manage-demoasa-policy-manage-demo 选择之前创建的访问策略。Select the access policy that you created earlier.

    Create Stream Analytics input in portal
  4. 单击创建Click Create.

创建查询以转换实时数据Create queries to transform real-time data

此时,设置一个流分析作业以读取传入数据流。At this point, you have a Stream Analytics job set up to read an incoming data stream. 接下来创建一个分析实时数据的查询。The next step is to create a query that analyzes the data in real time. 流分析支持简单的声明性查询模型,用于描述实时处理的转换。Stream Analytics supports a simple, declarative query model that describes transformations for real-time processing. 这些查询使用类似 SQL 的语言,该语言具有特定于流分析的一些扩展。The queries use a SQL-like language that has some extensions specific to Stream Analytics.

简单的查询可能只会读取所有传入的数据。A simple query might just read all the incoming data. 但通常需要创建查找特定数据或数据关系的查询。However, you often create queries that look for specific data or for relationships in the data. 本教程的这一部分会创建并测试多个查询,展示可以转换输入流以便进行分析的几种方法。In this section of the tutorial, you create and test several queries to learn a few ways in which you can transform an input stream for analysis.

此处创建的查询只会在屏幕中显示已转换的数据。The queries you create here will just display the transformed data to the screen. 后面的部分将配置一个输出接收器和将转换的数据写入该接收器的查询。In a later section, you'll configure an output sink and a query that writes the transformed data to that sink.

若要了解有关语言的详细信息,请参阅 Azure 流分析查询语言参考To learn more about the language, see the Azure Stream Analytics Query Language Reference.

获取用于测试查询的示例数据Get sample data for testing queries

TelcoGenerator 应用正在将呼叫记录发送到事件中心,流分析作业已配置为从事件中心读取记录。The TelcoGenerator app is sending call records to the event hub, and your Stream Analytics job is configured to read from the event hub. 可以使用查询测试作业以确保它正确读取。You can use a query to test the job to make sure that it's reading correctly. 若要在 Azure 控制台中测试查询,则需要使用示例数据。To test a query in the Azure console, you need sample data. 本演练将从要进入事件中心的流中提取示例数据。For this walkthrough, you'll extract sample data from the stream that's coming into the event hub.

  1. 请确保 TelcoGenerator 应用正在运行,并且将生成呼叫记录。Make sure that the TelcoGenerator app is running and producing call records.

  2. 在门户中,返回到“流分析作业”窗格。In the portal, return to the Streaming Analytics job pane. (如果关闭了此窗格,请在“所有资源”窗格中搜索 asa_frauddetection_job_demo。)(If you closed the pane, search for asa_frauddetection_job_demo in the All resources pane.)

  3. 单击“查询”框。Click the Query box. Azure 会列出为作业配置的输入和输出,并允许创建查询,以便在将输入流发送到输出时对其进行转换。Azure lists the inputs and outputs that are configured for the job, and lets you create a query that lets you transform the input stream as it is sent to the output.

  4. 在“查询”窗格中,单击 CallStream 输入旁边的点,然后选择“来自输入的示例数据”。In the Query pane, click the dots next to the CallStream input and then select Sample data from input.

    对流分析作业条目使用示例数据的菜单选项,其中选择了“来自输入的示例数据”

  5. 将“分钟”设置为 3,然后单击“确定”。Set Minutes to 3 and then click OK.

    用于对输入流进行采样的选项,其中选择了“3 分钟”

    Azure 会从输入流中进行 3 分钟的数据采样,并在示例数据准备就绪时发出通知。Azure samples 3 minutes' worth of data from the input stream and notifies you when the sample data is ready. (这用不了多长时间。)(This takes a short while.)

查询窗口打开时,示例数据会临时存储并可供使用。The sample data is stored temporarily and is available while you have the query window open. 如果关闭查询窗口,示例数据将被丢弃,必须创建一组新的示例数据。If you close the query window, the sample data is discarded, and you'll have to create a new set of sample data.

或者,也可以从 GitHub 获取其中具有示例数据的 .json 文件,然后上传该 .json 文件以用作 CallStream 输入的示例数据。As an alternative, you can get a .json file that has sample data in it from GitHub, and then upload that .json file to use as sample data for the CallStream input.

使用传递查询进行测试Test using a pass-through query

如果想要将每个事件存档,可使用传递查询读取事件负载中的所有字段。If you want to archive every event, you can use a pass-through query to read all the fields in the payload of the event.

  1. 在查询窗口中输入以下查询:In the query window, enter this query:

    SELECT 
        *
    FROM 
        CallStream
    

    备注

    对于 SQL,关键字不区分大小写,空格也不重要。As with SQL, keywords are not case-sensitive, and whitespace is not significant.

    在此查询中,CallStream 是创建输入时指定的别名。In this query, CallStream is the alias that you specified when you created the input. 如果使用了其他别名,请改为使用该名称。If you used a different alias, use that name instead.

  2. 单击“测试”。Click Test.

    流分析作业对示例数据运行查询,并在窗口底部显示输出。The Stream Analytics job runs the query against the sample data and displays the output at the bottom of the window. 这些结果指示已正确配置事件中心和流分析作业。The results indicate that the Event Hub and the Streaming Analytics job are configured correctly. (如前所述,稍后需要创建查询可以向其写入数据的输出接收器。)(As noted, later you'll create an output sink that the query can write data to.)

    流分析作业输出,其中显示生成的 73 条记录

    你看到的确切记录数取决于你在 3 分钟采样中捕获的记录数。The exact number of records you see will depend on how many records were captured in your 3-minute sample.

减少使用列投影的字段数Reduce the number of fields using a column projection

在许多情况下,分析并不需要输入流中的所有列。In many cases, your analysis doesn't need all the columns from the input stream. 可以使用查询投影一组返回的字段,这些字段比传递查询中的字段要小。You can use a query to project a smaller set of returned fields than in the pass-through query.

  1. 在代码编辑器中将查询更改为以下内容:Change the query in the code editor to the following:

    SELECT CallRecTime, SwitchNum, CallingIMSI, CallingNum, CalledNum 
    FROM 
        CallStream
    
  2. 再次单击“测试”。Click Test again.

    用于投影的流分析作业输出显示 25 条记录

按区域计算传入呼叫数:带聚合功能的翻转窗口Count incoming calls by region: Tumbling window with aggregation

假设要计算每个区域的传入呼叫数。Suppose you want to count the number of incoming calls per region. 在流数据中,当要执行聚合函数(如计数)时,需要将流划分为临时单位(因为数据流本身实际上是无限的)。In streaming data, when you want to perform aggregate functions like counting, you need to segment the stream into temporal units (since the data stream itself is effectively endless). 使用流分析开窗函数执行此操作。You do this using a Streaming Analytics window function. 然后,可以使用该窗口中的数据作为一个单元。You can then work with the data inside that window as a unit.

此转换需要一个不重叠的时间范围序列,每个窗口将具有一组可对其进行分组和聚合的离散数据。For this transformation, you want a sequence of temporal windows that don't overlap—each window will have a discrete set of data that you can group and aggregate. 这种类型的窗口称为“翻转窗口”。This type of window is referred to as a Tumbling window. 在翻转窗口中,可以获得按 SwitchNum(它表示发起呼叫的国家/地区)分组的传入呼叫的计数。Within the Tumbling window, you can get a count of the incoming calls grouped by SwitchNum, which represents the country/region where the call originated.

  1. 在代码编辑器中将查询更改为以下内容:Change the query in the code editor to the following:

    SELECT 
        System.Timestamp as WindowEnd, SwitchNum, COUNT(*) as CallCount 
    FROM
        CallStream TIMESTAMP BY CallRecTime 
    GROUP BY TUMBLINGWINDOW(s, 5), SwitchNum
    

    此查询在 FROM 子句中使用 Timestamp By 关键字来指定输入流中要用于定义翻转窗口的时间戳字段。This query uses the Timestamp By keyword in the FROM clause to specify which timestamp field in the input stream to use to define the Tumbling window. 在这种情况下,窗口按每条记录中的 CallRecTime 字段将数据划分为段。In this case, the window divides the data into segments by the CallRecTime field in each record. (如果未指定任何字段,开窗操作将使用每个事件到达事件中心的时间。(If no field is specified, the windowing operation uses the time that each event arrives at the event hub. 请参阅流分析查询语言参考中的“到达时间与应用程序时间”。See "Arrival Time Vs Application Time" in Stream Analytics Query Language Reference.

    投影包括 System.Timestamp,后者将返回每个窗口结束时的时间戳。The projection includes System.Timestamp, which returns a timestamp for the end of each window.

    若要指定想要使用翻转窗口,请在 GROUP BY 子句中使用 TUMBLINGWINDOW 函数。To specify that you want to use a Tumbling window, you use the TUMBLINGWINDOW function in the GROUP BY clause. 在函数中,可以指定时间单位(从微秒到一天的任意时间)和窗口大小(单位数)。In the function, you specify a time unit (anywhere from a microsecond to a day) and a window size (how many units). 在此示例中,翻转窗口由 5 秒时间间隔组成,因此你会收到按国家/地区的每隔 5 秒的呼叫计数。In this example, the Tumbling window consists of 5-second intervals, so you will get a count by country/region for every 5 seconds' worth of calls.

  2. 再次单击“测试”。Click Test again. 在结果中,请注意“WindowEnd”下的时间戳以 5 秒为增量。In the results, notice that the timestamps under WindowEnd are in 5-second increments.

    用于聚合的流分析作业输出,其中显示 13 条记录

使用自联接检测 SIM 欺诈Detect SIM fraud using a self-join

在此示例中,将欺诈使用情况视为来自同一用户的呼叫,但与另一个 5 秒内的呼叫位于不同的位置。For this example, consider fraudulent usage to be calls that originate from the same user but in different locations within 5 seconds of one another. 例如,同一用户不能合法地同时从美国和澳大利亚发起呼叫。For example, the same user can't legitimately make a call from the US and Australia at the same time.

若要检查这些情况,可以使用流数据的自联接基于 CallRecTime 值将流联接到自身。To check for these cases, you can use a self-join of the streaming data to join the stream to itself based on the CallRecTime value. 然后,可以查找 CallingIMSI 值(始发号码)相同,但 SwitchNum 值(来源国家/地区)不同的呼叫记录。You can then look for call records where the CallingIMSI value (the originating number) is the same, but the SwitchNum value (country/region of origin) is not the same.

当对流数据使用联接时,该联接必须对可以及时分隔匹配行的程度施加一定限制。When you use a join with streaming data, the join must provide some limits on how far the matching rows can be separated in time. (如前所述,流数据实际上是无限的。)使用 DATEDIFF 函数在联接的 ON 子句中指定关系的时间限制。(As noted earlier, the streaming data is effectively endless.) The time bounds for the relationship are specified inside the ON clause of the join, using the DATEDIFF function. 此示例中联接基于调用数据的 5 秒时间间隔。In this case, the join is based on a 5-second interval of call data.

  1. 在代码编辑器中将查询更改为以下内容:Change the query in the code editor to the following:

    SELECT  System.Timestamp as Time, 
        CS1.CallingIMSI, 
        CS1.CallingNum as CallingNum1, 
        CS2.CallingNum as CallingNum2, 
        CS1.SwitchNum as Switch1, 
        CS2.SwitchNum as Switch2 
    FROM CallStream CS1 TIMESTAMP BY CallRecTime 
        JOIN CallStream CS2 TIMESTAMP BY CallRecTime 
        ON CS1.CallingIMSI = CS2.CallingIMSI 
        AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 
    WHERE CS1.SwitchNum != CS2.SwitchNum
    

    除了联接中的 DATEDIFF 函数以外,此查询与任何 SQL 联接类似。This query is like any SQL join except for the DATEDIFF function in the join. DATEDIFF 版本特定于流分析,它必须显示在 ON...BETWEEN 子句中。This version of DATEDIFF is specific to Streaming Analytics, and it must appear in the ON...BETWEEN clause. 参数为时间单位(此示例中为秒)和联接的两个源的别名。The parameters are a time unit (seconds in this example) and the aliases of the two sources for the join. 这与标准 SQL DATEDIFF 函数不同。This is different from the standard SQL DATEDIFF function.

    WHERE 子句包含标志欺诈呼叫的条件:始发交换机不同。The WHERE clause includes the condition that flags the fraudulent call: the originating switches are not the same.

  2. 再次单击“测试”。Click Test again.

    用于自联接的流分析作业输出,其中显示生成的 6 条记录

  3. 单击“保存”以将自联接查询保存为流分析作业的一部分。Click Save to save the self-join query as part of the Streaming Analytics job. (它不会保存示例数据。)(It doesn't save the sample data.)


    Save Stream Analytics query in portal

创建输出接收器以存储转换后的数据Create an output sink to store transformed data

现已定义事件流、用于引入事件的事件中心输入,以及用于通过流执行转换的查询。You've defined an event stream, an event hub input to ingest events, and a query to perform a transformation over the stream. 最后一步是定义作业的输出接收器,即转换后的流要写入到的位置。The last step is to define an output sink for the job—that is, a place to write the transformed stream to.

可以使用许多资源作为输出接收器:SQL Server 数据库、表存储,甚至是另一个事件中心。You can use many resources as output sinks—a SQL Server database, table storage, and even another event hub. 本教程将流写入 Azure Blob 存储,该存储是收集事件信息供后续分析的典型选择,因为它可以包括非结构化数据。For this tutorial, you'll write the stream to Azure Blob Storage, which is a typical choice for collecting event information for later analysis, since it accommodates unstructured data.

如果已有 Blob 存储帐户,则可以使用该帐户。If you have an existing blob storage account, you can use that. 在本教程中,你将了解如何创建新的存储帐户。For this tutorial, you will learn how to create a new storage account.

创建 Azure Blob 存储帐户Create an Azure Blob Storage account

  1. 从 Azure 门户的左上角选择“创建资源” > “存储” > “存储帐户”。From the upper left-hand corner of the Azure portal, select Create a resource > Storage > Storage account. 填充存储帐户作业页,将“名称”设置为“asaehstorage”,“位置”设置为“中国北部”,“资源组”设置为“asa-eh-ns-rg”(将存储帐户托管在流式处理作业所在的资源组中,以提高性能) 。Fill out the Storage account job page with Name set to "asaehstorage", Location set to "China North", Resource group set to "asa-eh-ns-rg" (host the storage account in the same resource group as the Streaming job for increased performance). 余下设置可以保留默认值。The remaining settings can be left to their default values.

    在 Azure 门户中创建存储帐户

  2. 在 Azure 门户中,返回到“流分析作业”窗格。In the Azure portal, return to the Streaming Analytics job pane. (如果关闭了此窗格,请在“所有资源”窗格中搜索 asa_frauddetection_job_demo。)(If you closed the pane, search for asa_frauddetection_job_demo in the All resources pane.)

  3. 在“作业拓扑”部分中,单击“输出”框。In the Job Topology section, click the Output box.

  4. 在“输出”窗格中,单击“添加”,然后选择“BIob 存储”。In the Outputs pane, click Add and select Blob storage. 使用以下信息填写“新建输出”页:Then fill out the New output page with the following information:

    设置Setting 建议的值Suggested value 说明Description
    输出别名Output alias CallStream-FraudulentCallsCallStream-FraudulentCalls 输入一个名称,用于标识作业的输出。Enter a name to identify the job's output.
    订阅Subscription <Your subscription> 选择包含已创建的存储帐户的 Azure 订阅。Select the Azure subscription that has the storage account you created. 存储帐户可以在同一订阅中,也可以在另一订阅中。The storage account can be in the same or in a different subscription. 此示例假定已在同一订阅中创建存储帐户。This example assumes that you have created storage account in the same subscription.
    存储帐户Storage account asaehstorageasaehstorage 输入创建的存储帐户的名称。Enter the name of the storage account you created.
    容器Container asa-fraudulentcalls-demoasa-fraudulentcalls-demo 选择“创建新名称”并输入容器名称。Choose Create new and enter a container name.

    Create blob output for Stream Analytics job
  5. 单击“保存” 。Click Save.

启动流分析作业Start the Streaming Analytics job

作业现已配置。The job is now configured. 你已指定了输入(事件中心)、转换(查找欺诈呼叫的查询)和输出(Blob 存储)。You've specified an input (the event hub), a transformation (the query to look for fraudulent calls), and an output (blob storage). 现在可以启动作业了。You can now start the job.

  1. 请确保 TelcoGenerator 应用正在运行。Make sure the TelcoGenerator app is running.

  2. 在“作业”窗格中,单击“启动”。In the job pane, click Start. 在“启动作业”窗格中,为作业输出开始时间选择“现在”。In the Start job pane, for Job output start time, select Now.

    启动流分析作业

检查转换后的数据Examine the transformed data

现在,已有了一个完整的流分析作业。You now have a complete Streaming Analytics job. 该作业检查电话呼叫元数据流、实时查找欺诈电话呼叫并将这些欺诈呼叫的相关信息写入到存储。The job is examining a stream of phone call metadata, looking for fraudulent phone calls in real time, and writing information about those fraudulent calls to storage.

若要完成本教程,建议查看流分析作业捕获的数据。To complete this tutorial, you might want to look at the data being captured by the Streaming Analytics job. 该数据正在以区块(文件)的形式写入 Azure Blog 存储。The data is being written to Azure Blog Storage in chunks (files). 可以使用任何可读取 Azure Blob 存储的工具。You can use any tool that reads Azure Blob Storage. 如“先决条件”部分中所述,可以在 Visual Studio 中使用 Azure 扩展,或使用如 Azure 存储资源管理器Cerulean 之类的工具。As noted in the Prerequisites section, you can use Azure extensions in Visual Studio, or you can use a tool like Azure Storage Explorer or Cerulean.

当检查 blob 存储中的文件内容时,将看到如下所示的内容:When you examine the contents of a file in blob storage, you see something like the following:

Azure blob 存储与流分析输出

清理资源Clean up resources

提供继续介绍欺诈检测方案以及使用已在本教程中创建的资源的其他文章。There are additional articles that continue with the fraud-detection scenario and use the resources you've created in this tutorial. 如果想要继续,请参阅“后续步骤”下的建议。If you want to continue, see the suggestions under Next steps.

但是,如果你已完成,并且不需要已创建的资源,则可以删除它们,以免产生不必要的 Azure 费用。However, if you're done and you don't need the resources you've created, you can delete them so that you don't incur unnecessary Azure charges. 在这种情况下,建议执行以下操作:In that case, we suggest that you do the following:

  1. 停止流分析作业。Stop the Streaming Analytics job. 在“作业”窗格中,单击顶部的“停止”。In the Jobs pane, click Stop at the top.
  2. 停止 Telco Generator 应用。Stop the Telco Generator app. 在启动应用的命令窗口中,按 Ctrl+C。In the command window where you started the app, press Ctrl+C.
  3. 如果为本教程创建了新的 Blob 存储帐户,请将其删除。If you created a new blob storage account just for this tutorial, delete it.
  4. 删除流分析作业。Delete the Streaming Analytics job.
  5. 删除事件中心。Delete the event hub.
  6. 删除事件中心命名空间。Delete the event hub namespace.

获取支持Get support

若要获得进一步的帮助,可前往 Azure 流分析的 Microsoft Q&A 问题页面For further assistance, try the Microsoft Q&A question page for Azure Stream Analytics.

后续步骤Next steps

有关常规流分析的详细信息,请查看以下文章:For more information about Stream Analytics in general, see these articles: