Management .NET SDK:使用用于 .NET 的 Azure 流分析 API 设置和运行分析作业Management .NET SDK: Set up and run analytics jobs using the Azure Stream Analytics API for .NET

了解如何通过管理 .NET SDK 设置和运行使用 .NET 版流分析 API 的分析作业。Learn how to set up and run analytics jobs using the Stream Analytics API for .NET using the Management .NET SDK. 设置项目、创建输入和输出源、转换,以及开始和停止作业。Set up a project, create input and output sources, transformations, and start and stop jobs. 就分析作业来说,可以从 Blob 存储或事件中心流式传输数据。For your analytics jobs, you can stream data from Blob storage or from an event hub.

请参阅 .NET 版流分析 API 的管理参考文档See the management reference documentation for the Stream Analytics API for .NET.

Azure 流分析是一种完全托管的服务,可以在云中通过流式数据处理复杂的事件,具有延迟性低、可用性高和大小灵活等特点。Azure Stream Analytics is a fully managed service providing low-latency, highly available, scalable, complex event processing over streaming data in the cloud. 客户可以使用流分析来设置流式处理作业,分析数据流和进行近实时分析。Stream Analytics enables customers to set up streaming jobs to analyze data streams, and allows them to drive near real-time analytics.

备注

本文中的示例代码已使用 Azure 流分析的 Management .NET SDK v2.x 版本进行了更新。We have updated the sample code in this article with Azure Stream Analytics Management .NET SDK v2.x version. 有关使用旧版 (1.x) SDK 的示例代码,请参阅使用流分析的 Management .NET SDK v1.x For sample code using the uses lagecy (1.x) SDK version, please see Use the Management .NET SDK v1.x for Stream Analytics.

先决条件Prerequisites

在开始阅读本文前,必须完成以下要求:Before you begin this article, you must have the following requirements:

  • 安装 Visual Studio 2019 或 2015。Install Visual Studio 2019 or 2015.

  • 下载并安装 Azure .NET SDKDownload and install Azure .NET SDK.

  • 在订阅中创建 Azure 资源组。Create an Azure Resource Group in your subscription. 以下示例是 Azure PowerShell 脚本示例。The following example is a sample Azure PowerShell script. 有关 Azure PowerShell 的信息,请参阅 安装和配置 Azure PowerShellFor Azure PowerShell information, see Install and configure Azure PowerShell;

    # Log in to your Azure account
    Connect-AzAccount -Environment AzureChinaCloud
    
    # Select the Azure subscription you want to use to create the resource group
    Select-AzureSubscription -SubscriptionName <subscription name>
    
    # If Stream Analytics has not been registered to the subscription, remove the remark    symbol (#) to run the Register-AzProvider cmdlet to register the provider namespace
    #Register-AzProvider -Force -ProviderNamespace 'Microsoft.StreamAnalytics'
    
    # Create an Azure resource group
    New-AzureResourceGroup -Name <YOUR RESOURCE GROUP NAME> -Location <LOCATION>
    
  • 设置作业要连接到的输入源和输出目标。Set up an input source and output target for the job to connect to.

设置项目Set up a project

若要创建分析作业,请使用适用于 .NET 的流分析 API,首先设置项目。To create an analytics job, use the Stream Analytics API for .NET, first set up your project.

  1. 创建 Visual Studio C# .NET 控制台应用程序。Create a Visual Studio C# .NET console application.

  2. 在程序包管理器控制台中运行以下命令来安装 NuGet 包。In the Package Manager Console, run the following commands to install the NuGet packages. 第一个是 Azure 流分析管理 .NET SDK。The first one is the Azure Stream Analytics Management .NET SDK. 第二个用于 Azure 客户端身份验证。The second one is for Azure client authentication.

    Install-Package Microsoft.Azure.Management.StreamAnalytics -Version 2.0.0
    Install-Package Microsoft.Rest.ClientRuntime.Azure.Authentication -Version 2.3.1
    
  3. 将下面的 appSettings 部分添加到 App.config 文件:Add the following appSettings section to the App.config file:

    <appSettings>
        <add key="ClientId" value="1950a258-227b-4e31-a9cf-717495945fc2" />
        <add key="RedirectUri" value="urn:ietf:wg:oauth:2.0:oob" />
        <add key="SubscriptionId" value="YOUR SUBSCRIPTION ID" />
        <add key="ActiveDirectoryTenantId" value="YOUR TENANT ID" />
    </appSettings>
    

    SubscriptionIdActiveDirectoryTenantId 的值替换为 Azure 订阅 ID 和租户 ID。Replace values for SubscriptionId and ActiveDirectoryTenantId with your Azure subscription and tenant IDs. 可以通过运行以下 Azure PowerShell cmdlet 来获取这些值:You can get these values by running the following Azure PowerShell cmdlet:

       Get-AzureAccount
    
  4. 在 .csproj 文件中添加以下引用:Add the following reference in your .csproj file:

    <Reference Include="System.Configuration" />
    
  5. 将以下 using 语句添加到项目中的源文件 (Program.cs):Add the following using statements to the source file (Program.cs) in the project:

    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Threading;
    using System.Threading.Tasks;
    
    using Microsoft.Azure.Management.StreamAnalytics;
    using Microsoft.Azure.Management.StreamAnalytics.Models;
    using Microsoft.Rest.Azure.Authentication;
    using Microsoft.Rest;
    
  6. 添加一个身份验证帮助器方法:Add an authentication helper method:

    private static async Task<ServiceClientCredentials> GetCredentials()
    {
        var activeDirectoryClientSettings = ActiveDirectoryClientSettings.UsePromptOnly(ConfigurationManager.AppSettings["ClientId"], new Uri("urn:ietf:wg:oauth:2.0:oob"));
        ServiceClientCredentials credentials = await UserTokenProvider.LoginWithPromptAsync(ConfigurationManager.AppSettings["ActiveDirectoryTenantId"], activeDirectoryClientSettings);
    
        return credentials;
     }
    

创建流分析管理客户端Create a Stream Analytics management client

一个 StreamAnalyticsManagementClient 对象,用于管理作业和作业组件,例如输入、输出和转换。A StreamAnalyticsManagementClient object allows you to manage the job and the job components, such as input, output, and transformation.

将以下代码添加到 Main 方法的开头:Add the following code to the beginning of the Main method:

 string resourceGroupName = "<YOUR AZURE RESOURCE GROUP NAME>";
 string streamingJobName = "<YOUR STREAMING JOB NAME>";
 string inputName = "<YOUR JOB INPUT NAME>";
 string transformationName = "<YOUR JOB TRANSFORMATION NAME>";
 string outputName = "<YOUR JOB OUTPUT NAME>";
 
 SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
 
 // Get credentials
 ServiceClientCredentials credentials = GetCredentials().Result;
 
 // Create Stream Analytics management client
 StreamAnalyticsManagementClient streamAnalyticsManagementClient = new StreamAnalyticsManagementClient(credentials)
 {
     SubscriptionId = ConfigurationManager.AppSettings["SubscriptionId"]
 };

resourceGroupName 变量的值应该与你在先决条件步骤中创建或选取的资源组的名称相同。The resourceGroupName variable's value should be the same as the name of the resource group you created or picked in the prerequisite steps.

若要自动执行凭据演示方面的作业创建,请参阅使用 Azure Resource Manager 对服务主体进行身份验证To automate the credential presentation aspect of job creation, refer to Authenticating a service principal with Azure Resource Manager.

本文的剩余部分假定此代码位于 Main 方法的开头。The remaining sections of this article assume that this code is at the beginning of the Main method.

创建流分析作业Create a Stream Analytics job

以下代码在你所定义的资源组下创建流分析作业。The following code creates a Stream Analytics job under the resource group that you have defined. 可以在以后向作业添加输入、输出和转换。You will add an input, output, and transformation to the job later.

// Create a streaming job
StreamingJob streamingJob = new StreamingJob()
{
    Tags = new Dictionary<string, string>()
    {
        { "Origin", ".NET SDK" },
        { "ReasonCreated", "Getting started tutorial" }
    },
    Location = "China North",
    EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Drop,
    EventsOutOfOrderMaxDelayInSeconds = 5,
    EventsLateArrivalMaxDelayInSeconds = 16,
    OutputErrorPolicy = OutputErrorPolicy.Drop,
    DataLocale = "en-US",
    CompatibilityLevel = CompatibilityLevel.OneFullStopZero,
    Sku = new Sku()
    {
        Name = SkuName.Standard
    }
};
StreamingJob createStreamingJobResult = streamAnalyticsManagementClient.StreamingJobs.CreateOrReplace(streamingJob, resourceGroupName, streamingJobName);

创建流分析输入源Create a Stream Analytics input source

下面的代码使用 blob 输入源类型和 CSV 序列化创建流分析输入源。The following code creates a Stream Analytics input source with the blob input source type and CSV serialization. 若要创建事件中心输入源,请使用 EventHubStreamInputDataSource 而非 BlobStreamInputDataSourceTo create an event hub input source, use EventHubStreamInputDataSource instead of BlobStreamInputDataSource. 同样,可以自定义输入源的序列化类型。Similarly, you can customize the serialization type of the input source.

// Create an input
StorageAccount storageAccount = new StorageAccount()
{
    AccountName = "<YOUR STORAGE ACCOUNT NAME>",
    AccountKey = "<YOUR STORAGE ACCOUNT KEY>"
};
Input input = new Input()
{
    Properties = new StreamInputProperties()
    {
        Serialization = new CsvSerialization()
        {
            FieldDelimiter = ",",
            Encoding = Encoding.UTF8
        },
        Datasource = new BlobStreamInputDataSource()
        {
            StorageAccounts = new[] { storageAccount },
            Container = "<YOUR STORAGE BLOB CONTAINER>",
            PathPattern = "{date}/{time}",
            DateFormat = "yyyy/MM/dd",
            TimeFormat = "HH",
            SourcePartitionCount = 16
        }
    }
};
Input createInputResult = streamAnalyticsManagementClient.Inputs.CreateOrReplace(input, resourceGroupName, streamingJobName, inputName);

输入源(不管是来自 Blob 存储还是来自事件中心)将绑定到特定作业。Input sources, whether from Blob storage or an event hub, are tied to a specific job. 要将同一输入源用于不同的作业,必须再次调用该方法并指定不同的作业名称。To use the same input source for different jobs, you must call the method again and specify a different job name.

测试流分析输入源Test a Stream Analytics input source

TestConnection 方法可测试流分析作业是否能够连接到输入源,并测试特定输入源类型的其他方面。The TestConnection method tests whether the Stream Analytics job is able to connect to the input source as well as other aspects specific to the input source type. 例如,在 blob 输入源(已在此前的步骤中创建过)中,该方法可检查存储帐户名称和密钥对能否用于连接到存储帐户,并检查指定的容器是否存在。For example, in the blob input source you created in an earlier step, the method will check that the Storage account name and key pair can be used to connect to the Storage account as well as check that the specified container exists.

// Test the connection to the input
ResourceTestStatus testInputResult = streamAnalyticsManagementClient.Inputs.Test(resourceGroupName, streamingJobName, inputName);

创建流分析输出目标Create a Stream Analytics output target

创建输出目标类似于创建流分析输入源。Creating an output target is similar to creating a Stream Analytics input source. 像输入源一样,输出目标将绑定到特定作业。Like input sources, output targets are tied to a specific job. 要将同一输出目标用于不同的作业,必须再次调用该方法并指定不同的作业名称。To use the same output target for different jobs, you must call the method again and specify a different job name.

以下代码可创建一个输出目标(Azure SQL 数据库)。The following code creates an output target (Azure SQL Database). 可以自定义输出目标的数据类型和/或序列化类型。You can customize the output target's data type and/or serialization type.

// Create an output
Output output = new Output()
{
    Datasource = new AzureSqlDatabaseOutputDataSource()
    {
        Server = "<YOUR DATABASE SERVER NAME>",
        Database = "<YOUR DATABASE NAME>",
        User = "<YOUR DATABASE LOGIN>",
        Password = "<YOUR DATABASE LOGIN PASSWORD>",
        Table = "<YOUR DATABASE TABLE NAME>"
    }
};
Output createOutputResult = streamAnalyticsManagementClient.Outputs.CreateOrReplace(output, resourceGroupName, streamingJobName, outputName);

测试流分析输出目标Test a Stream Analytics output target

流分析输出目标还有一个用于测试连接的 TestConnection 方法。A Stream Analytics output target also has the TestConnection method for testing connections.

// Test the connection to the output
ResourceTestStatus testOutputResult = streamAnalyticsManagementClient.Outputs.Test(resourceGroupName, streamingJobName, outputName);

创建流分析转换Create a Stream Analytics transformation

下面的代码使用查询“select * from Input”创建流分析转换,并通过指定的方式为流分析作业分配一个流式处理单位。The following code creates a Stream Analytics transformation with the query "select * from Input" and specifies to allocate one streaming unit for the Stream Analytics job. 有关如何调整流式处理单位的详细信息,请参阅缩放 Azure 流分析作业For more information on adjusting streaming units, see Scale Azure Stream Analytics jobs.

// Create a transformation
Transformation transformation = new Transformation()
{
    Query = "Select Id, Name from <your input name>", // '<your input name>' should be replaced with the value you put for the 'inputName' variable above or in a previous step
    StreamingUnits = 1
};
Transformation createTransformationResult = streamAnalyticsManagementClient.Transformations.CreateOrReplace(transformation, resourceGroupName, streamingJobName, transformationName);

与输入和输出一样,转换也会绑定到在创建时所属的特定流分析作业。Like input and output, a transformation is also tied to the specific Stream Analytics job it was created under.

启动流分析作业Start a Stream Analytics job

创建流分析作业及其输入、输出和转换后,可以通过调用 Start 方法来启动该作业。After creating a Stream Analytics job and its input(s), output(s), and transformation, you can start the job by calling the Start method.

下面的示例性代码启动一个流分析作业,其自定义输出开始时间设置为 2012 年 12 月 12 日 12:12:12(UTC 时间):The following sample code starts a Stream Analytics job with a custom output start time set to December 12, 2012, 12:12:12 UTC:

// Start a streaming job
StartStreamingJobParameters startStreamingJobParameters = new StartStreamingJobParameters()
{
    OutputStartMode = OutputStartMode.CustomTime,
    OutputStartTime = new DateTime(2012, 12, 12, 12, 12, 12, DateTimeKind.Utc)
};
streamAnalyticsManagementClient.StreamingJobs.Start(resourceGroupName, streamingJobName, startStreamingJobParameters);

停止流分析作业Stop a Stream Analytics job

可以通过调用 Stop 方法来停止正在运行的流分析作业。You can stop a running Stream Analytics job by calling the Stop method.

// Stop a streaming job
streamAnalyticsManagementClient.StreamingJobs.Stop(resourceGroupName, streamingJobName);

删除流分析作业Delete a Stream Analytics job

Delete 方法会删除作业以及基础性的子资源,包括作业的输入、输出和转换。The Delete method will delete the job as well as the underlying sub-resources, including input(s), output(s), and transformation of the job.

// Delete a streaming job
streamAnalyticsManagementClient.StreamingJobs.Delete(resourceGroupName, streamingJobName);

获取支持Get support

如需进一步的帮助,请参阅 Azure 流分析的 Microsoft 问答问题页面For further assistance, try our Microsoft Q&A question page for Azure Stream Analytics.

后续步骤Next steps

现已学习了使用 .NET SDK 创建和运行分析作业的基础知识。You've learned the basics of using a .NET SDK to create and run analytics jobs. 要了解更多信息,请参阅下列文章:To learn more, see the following articles:

[stream.analytics.scale.jobs]: stream-analytics-scale-jobs.md stream.analytics.query.language.reference: https://go.microsoft.com/fwlink/?LinkID=513299 [stream.analytics.rest.api.reference]: https://go.microsoft.com/fwlink/?LinkId=517301