快速入门:使用 Azure PowerShell 创建流分析作业Quickstart: Create a Stream Analytics job using Azure PowerShell

Azure PowerShell 模块用于通过 PowerShell cmdlet 或脚本创建和管理 Azure 资源。The Azure PowerShell module is used to create and manage Azure resources using PowerShell cmdlets or scripts. 本快速入门详细介绍了如何使用 Azure PowerShell 模块来部署并运行 Azure 流分析作业。This quickstart details using the Azure PowerShell module to deploy and run an Azure Stream Analytics job.

示例作业从 IoT 中心设备中读取流式处理数据。The example job reads streaming data from an IoT Hub device. 输入数据由 Raspberry Pi 联机模拟器生成。The input data is generated by a Raspberry Pi online simulator. 接下来,流分析作业使用流分析查询语言对数据进行转换,以便筛选所涉及的温度大于 27° 的消息。Next, the Stream Analytics job transforms the data using the Stream Analytics query language to filter messages with a temperature greater than 27°. 最后,作业将生成的输出事件写入到 Blob 存储的一个文件中。Finally, it writes the resulting output events into a file in blob storage.

准备阶段Before you begin

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

  • 如果没有 Azure 订阅,请创建试用版If you don't have an Azure subscription, create a Trial.

  • 本快速入门需要 Azure PowerShell 模块。This quickstart requires the Azure PowerShell module. 运行 Get-Module -ListAvailable Az 即可找到在本地计算机上安装的版本。Run Get-Module -ListAvailable Az to find the version that is installed on your local machine. 如果需要进行安装或升级,请参阅安装 Azure PowerShell 模块If you need to install or upgrade, see Install Azure PowerShell module.

  • 某些 IoT 中心操作不受 Azure PowerShell 支持,必须使用 Azure CLI 2.0.70 或更高版本以及 Azure CLI 的 IoT 扩展来完成。Some IoT Hub actions are not supported by Azure PowerShell and must be completed using Azure CLI version 2.0.70 or later and the IoT extension for Azure CLI. 安装 Azure CLI,并使用 az extension add --name azure-iot 来安装 IoT 扩展。Install the Azure CLI and use az extension add --name azure-iot to install the IoT extension.

登录 AzureSign in to Azure

使用 Connect-AzAccount -Environment AzureChinaCloud 命令登录到 Azure 订阅,然后在弹出的浏览器中输入 Azure 凭据:Sign in to your Azure subscription with the Connect-AzAccount -Environment AzureChinaCloud command, and enter your Azure credentials in the pop-up browser:

# Connect to your Azure account
Connect-AzAccount -Environment AzureChinaCloud

如果有多个订阅,请运行以下 cmdlet,选择要用于本快速入门的订阅。If you have more than one subscription, select the subscription you would like to use for this quickstart by running the following cmdlets. 请确保将 <your subscription name> 替换为订阅的名称:Make sure to replace <your subscription name> with the name of your subscription:

# List all available subscriptions.
Get-AzSubscription

# Select the Azure subscription you want to use to create the resource group and resources.
Get-AzSubscription -SubscriptionName "<your subscription name>" | Select-AzSubscription

创建资源组Create a resource group

使用 New-AzResourceGroup 创建 Azure 资源组。Create an Azure resource group with New-AzResourceGroup. 资源组是在其中部署和管理 Azure 资源的逻辑容器。A resource group is a logical container into which Azure resources are deployed and managed.

$resourceGroup = "StreamAnalyticsRG"
$location = "chinaeast"
New-AzResourceGroup `
    -Name $resourceGroup `
    -Location $location

对输入数据进行准备Prepare the input data

在定义流分析作业之前,请对已配置为作业输入的数据进行准备。Before defining the Stream Analytics job, prepare the data that is configured as input to the job.

以下 Azure CLI 代码块通过多项命令来准备作业所需的输入数据。The following Azure CLI code block does many commands to prepare the input data required by the job. 查看介绍代码的部分。Review the sections to understand the code.

  1. 在 PowerShell 窗口中运行 az login 命令,以便登录到 Azure 帐户。In your PowerShell window, run the az login command to sign in to your Azure account.

    当你成功登录后,Azure CLI 会返回订阅的列表。When you successfully sign in, Azure CLI returns a list of your subscriptions. 复制用于本快速入门的订阅,然后运行 az account set 命令以选择该订阅。Copy the subscription you're using for this quickstart and run the az account set command to select that subscription. 选择在上一部分使用 PowerShell 选择的订阅。Choose the same subscription you selected in the previous section with PowerShell. 确保将 <your subscription name> 替换为订阅的名称。Make sure to replace <your subscription name> with the name of your subscription.

    az login
    
    az account set --subscription "<your subscription>"
    
  2. 使用 az iot hub create 命令创建 IoT 中心。Create an IoT Hub using the az iot hub create command. 此示例创建名为 MyASAIoTHub 的 IoT 中心。This example creates an IoT Hub called MyASAIoTHub. 由于 IoT 中心名称是唯一的,因此需使用你自己的 IoT 中心名称。Because IoT Hub names are unique, you need to come up with your own IoT Hub name. 将 SKU 设置为 F1 即可使用免费层,前提是它在订阅中可用。Set the SKU to F1 to use the free tier if it is available with your subscription. 否则,请选择下一个最低的层。If not, choose the next lowest tier.

    az iot hub create --name "<your IoT Hub name>" --resource-group $resourceGroup --sku S1
    

    创建 IoT 中心以后,请使用 az iot hub show-connection-string 命令获取 IoT 中心连接字符串。Once the IoT hub has been created, get the IoT Hub connection string using the az iot hub show-connection-string command. 复制整个连接字符串并将其保存。这样,在将 IoT 中心作为输入添加到流分析作业时,就可以使用该字符串。Copy the entire connection string and save it for when you add the IoT Hub as input to your Stream Analytics job.

    az iot hub show-connection-string --hub-name "MyASAIoTHub"
    
  3. 使用 az iothub device-identity create 命令将设备添加到 IoT 中心。Add a device to IoT Hub using the az iothub device-identity create command. 此示例创建名为 MyASAIoTDevice 的设备。This example creates a device called MyASAIoTDevice.

    az iot hub device-identity create --hub-name "MyASAIoTHub" --device-id "MyASAIoTDevice"
    
  4. 使用 az iot hub device-identity show-connection-string 命令获取设备连接字符串。Get the device connection string using the az iot hub device-identity show-connection-string command. 复制整个连接字符串并将其保存。这样,在创建 Raspberry Pi 模拟器时,就可以使用该字符串。Copy the entire connection string and save it for when you create the Raspberry Pi simulator.

    az iot hub device-identity show-connection-string --hub-name "MyASAIoTHub" --device-id "MyASAIoTDevice" --output table
    

    输出示例:Output example:

    HostName=MyASAIoTHub.azure-devices.net;DeviceId=MyASAIoTDevice;SharedAccessKey=a2mnUsg52+NIgYudxYYUNXI67r0JmNubmfVafojG8=
    

创建 Blob 存储Create blob storage

以下 Azure PowerShell 代码块使用命令来创建用于作业输出的 Blob 存储。The following Azure PowerShell code block uses commands to create blob storage that is used for job output. 查看介绍代码的部分。Review the sections to understand the code.

  1. 使用 New-AzStorageAccount cmdlet 创建标准的常规用途存储帐户。Create a standard general-purpose storage account using New-AzStorageAccount cmdlet. 本示例创建一个名为 myasaquickstartstorage 的存储帐户,该帐户默认启用本地冗余存储 (LRS) 和 Blob 加密。This example creates a storage account called myasaquickstartstorage with locally redundant storage(LRS) and blob encryption (enabled by default).

  2. 检索存储帐户上下文 $storageAccount.Context,该上下文定义要使用的存储帐户。Retrieve the storage account context $storageAccount.Context that defines the storage account to be used. 使用存储帐户时,请引用上下文而不是重复提供凭据。When working with storage accounts, you reference the context instead of repeatedly providing the credentials.

  3. 使用 New-AzStorageContainer 创建存储容器。Create a storage container using New-AzStorageContainer.

  4. 复制代码所输出的存储密钥,然后保存该密钥,以便稍后创建流式处理作业的输出。Copy the storage key that is outputted by the code, and save that key to create the streaming job's output later on.

    $storageAccountName = "myasaquickstartstorage"
    $storageAccount = New-AzStorageAccount `
      -ResourceGroupName $resourceGroup `
      -Name $storageAccountName `
      -Location $location `
      -SkuName Standard_LRS `
      -Kind Storage
    
    $ctx = $storageAccount.Context
    $containerName = "container1"
    
    New-AzStorageContainer `
      -Name $containerName `
      -Context $ctx
    
    $storageAccountKey = (Get-AzStorageAccountKey `
      -ResourceGroupName $resourceGroup `
      -Name $storageAccountName).Value[0]
    
    Write-Host "The <storage account key> placeholder needs to be replaced in your output json files with this key value:"
    Write-Host $storageAccountKey -ForegroundColor Cyan
    

创建流分析作业Create a Stream Analytics job

请使用 New-AzStreamAnalyticsJob cmdlet 创建流分析作业。Create a Stream Analytics job with New-AzStreamAnalyticsJob cmdlet. 此 cmdlet 使用作业名称、资源组名称和作业定义作为参数。This cmdlet takes the job name, resource group name, and job definition as parameters. 作业名称可以是用于标识作业的任何友好名称,The job name can be any friendly name that identifies your job. 但只能包含字母数字字符、连字符和下划线,且其长度必须介于 3 到 63 个字符之间。It can have alphanumeric characters, hyphens, and underscores only and it must be between 3 and 63 characters long. 作业定义是一个 JSON 文件,其中包含创建作业所需的属性。The job definition is a JSON file that contains the properties required to create a job. 在本地计算机上创建名为 JobDefinition.json 的文件,并向其添加以下 JSON 数据:On your local machine, create a file named JobDefinition.json and add the following JSON data to it:

{
  "location":"chinaeast",
  "properties":{
    "sku":{
      "name":"standard"
    },
    "eventsOutOfOrderPolicy":"adjust",
    "eventsOutOfOrderMaxDelayInSeconds":10,
    "compatibilityLevel": 1.1
  }
}

接下来运行 New-AzStreamAnalyticsJob cmdlet。Next, run the New-AzStreamAnalyticsJob cmdlet. jobDefinitionFile 变量的值替换为在其中存储了作业定义 JSON 文件的路径。Replace the value of jobDefinitionFile variable with the path where you've stored the job definition JSON file.

$jobName = "MyStreamingJob"
$jobDefinitionFile = "C:\JobDefinition.json"
New-AzStreamAnalyticsJob `
  -ResourceGroupName $resourceGroup `
  -File $jobDefinitionFile `
  -Name $jobName `
  -Force

配置作业输入Configure input to the job

使用 New-AzStreamAnalyticsInput cmdlet 将输入添加到作业。Add an input to your job by using the New-AzStreamAnalyticsInput cmdlet. 此 cmdlet 使用作业名称、作业输入名称、资源组名称和作业输入定义作为参数。This cmdlet takes the job name, job input name, resource group name, and the job input definition as parameters. 作业输入定义是一个 JSON 文件,其中包含配置作业的输入所需的属性。The job input definition is a JSON file that contains the properties required to configure the job’s input. 在此示例中,需将 Blob 存储创建为输入。In this example, you'll create a blob storage as an input.

在本地计算机上创建名为 JobInputDefinition.json 的文件,并向其添加以下 JSON 数据。On your local machine, create a file named JobInputDefinition.json and add the following JSON data to it. 确保将 accesspolicykey 的值替换为在上一部分保存的 IoT 中心设备连接字符串的 SharedAccessKey 部分。Make sure to replace the value for accesspolicykey with the SharedAccessKey portion of the IoT Hub connection string you saved in a previous section.

{
    "properties": {
        "type": "Stream",
        "datasource": {
            "type": "Microsoft.Devices/IotHubs",
            "properties": {
                "iotHubNamespace": "MyASAIoTHub",
                "sharedAccessPolicyName": "iothubowner",
                "sharedAccessPolicyKey": "accesspolicykey",
                "endpoint": "messages/events",
                "consumerGroupName": "$Default"
                }
        },
        "compression": {
            "type": "None"
        },
        "serialization": {
            "type": "Json",
            "properties": {
                "encoding": "UTF8"
            }
        }
    },
    "name": "IoTHubInput",
    "type": "Microsoft.StreamAnalytics/streamingjobs/inputs"
}

接下来运行 New-AzStreamAnalyticsInput cmdlet,确保将 jobDefinitionFile 变量的值替换为在其中存储了作业输入定义 JSON 文件的路径。Next, run the New-AzStreamAnalyticsInput cmdlet, make sure to replace the value of jobDefinitionFile variable with the path where you've stored the job input definition JSON file.

$jobInputName = "IoTHubInput"
$jobInputDefinitionFile = "C:\JobInputDefinition.json"
New-AzStreamAnalyticsInput `
  -ResourceGroupName $resourceGroup `
  -JobName $jobName `
  -File $jobInputDefinitionFile `
  -Name $jobInputName

配置作业输出Configure output to the job

使用 New-AzStreamAnalyticsOutput cmdlet 将输出添加到作业。Add an output to your job by using the New-AzStreamAnalyticsOutput cmdlet. 此 cmdlet 使用作业名称、作业输出名称、资源组名称和作业输出定义作为参数。This cmdlet takes the job name, job output name, resource group name, and the job output definition as parameters. 作业输出定义是一个 JSON 文件,其中包含配置作业的输出所需的属性。The job output definition is a JSON file that contains the properties required to configure job’s output. 此示例使用 Blob 存储作为输出。This example uses blob storage as output.

在本地计算机上创建名为 JobOutputDefinition.json 的文件,并向其添加以下 JSON 数据。On your local machine, create a file named JobOutputDefinition.json, and add the following JSON data to it. 确保将 accountKey 的值替换为存储帐户的访问密钥,该密钥是存储在 $storageAccountKey 值中的值。Make sure to replace the value for accountKey with your storage account’s access key that is the value stored in $storageAccountKey value.

{
    "properties": {
        "datasource": {
            "type": "Microsoft.Storage/Blob",
            "properties": {
                "storageAccounts": [
                    {
                      "accountName": "asaquickstartstorage",
                      "accountKey": "<storage account key>"
                    }
                ],
                "container": "container1",
                "pathPattern": "output/",
                "dateFormat": "yyyy/MM/dd",
                "timeFormat": "HH"
            }
        },
        "serialization": {
            "type": "Json",
            "properties": {
                "encoding": "UTF8",
                "format": "LineSeparated"
            }
        }
    },
    "name": "BlobOutput",
    "type": "Microsoft.StreamAnalytics/streamingjobs/outputs"
}

接下来运行 New-AzStreamAnalyticsOutput cmdlet。Next, run the New-AzStreamAnalyticsOutput cmdlet. 确保将 jobOutputDefinitionFile 变量的值替换为在其中存储了作业输出定义 JSON 文件的路径。Make sure to replace the value of jobOutputDefinitionFile variable with the path where you have stored the job output definition JSON file.

$jobOutputName = "BlobOutput"
$jobOutputDefinitionFile = "C:\JobOutputDefinition.json"
New-AzStreamAnalyticsOutput `
  -ResourceGroupName $resourceGroup `
  -JobName $jobName `
  -File $jobOutputDefinitionFile `
  -Name $jobOutputName -Force

定义转换查询Define the transformation query

使用 New-AzStreamAnalyticsTransformation cmdlet 将转换添加到作业。Add a transformation your job by using the New-AzStreamAnalyticsTransformation cmdlet. 此 cmdlet 使用作业名称、作业转换名称、资源组名称和作业转换定义作为参数。This cmdlet takes the job name, job transformation name, resource group name, and the job transformation definition as parameters. 在本地计算机上创建名为 JobTransformationDefinition.json 的文件,并向其添加以下 JSON 数据。On your local machine, create a file named JobTransformationDefinition.json and add the following JSON data to it. 此 JSON 文件包含一个查询参数,用于定义转换查询:The JSON file contains a query parameter that defines the transformation query:

{
    "name":"MyTransformation",
    "type":"Microsoft.StreamAnalytics/streamingjobs/transformations",
    "properties":{
        "streamingUnits":1,
        "script":null,
        "query":" SELECT * INTO BlobOutput FROM IoTHubInput HAVING Temperature > 27"
    }
}

接下来运行 New-AzStreamAnalyticsTransformation cmdlet。Next run the New-AzStreamAnalyticsTransformation cmdlet. 确保将 jobTransformationDefinitionFile 变量的值替换为在其中存储了作业转换定义 JSON 文件的路径。Make sure to replace the value of jobTransformationDefinitionFile variable with the path where you've stored the job transformation definition JSON file.

$jobTransformationName = "MyJobTransformation"
$jobTransformationDefinitionFile = "C:\JobTransformationDefinition.json"
New-AzStreamAnalyticsTransformation `
  -ResourceGroupName $resourceGroup `
  -JobName $jobName `
  -File $jobTransformationDefinitionFile `
  -Name $jobTransformationName -Force

运行 IoT 模拟器Run the IoT simulator

  1. 打开 Raspberry Pi Azure IoT 联机模拟器Open the Raspberry Pi Azure IoT Online Simulator.

  2. 将第 15 行的占位符替换为在上一部分保存的整个 Azure IoT 中心设备连接字符串。Replace the placeholder in Line 15 with the entire Azure IoT Hub Device connection string you saved in a previous section.

  3. 单击 “运行”Click Run. 输出会显示传感器数据和发送到 IoT 中心的消息。The output should show the sensor data and messages that are being sent to your IoT Hub.

    Raspberry Pi Azure IoT 联机模拟器

启动流分析作业并检查输出Start the Stream Analytics job and check the output

请使用 Start-AzStreamAnalyticsJob cmdlet 启动作业。Start the job by using the Start-AzStreamAnalyticsJob cmdlet. 此 cmdlet 使用作业名称、资源组名称、输出启动模式和启动时间作为参数。This cmdlet takes the job name, resource group name, output start mode, and start time as parameters. OutputStartMode 接受的值为 JobStartTimeCustomTimeLastOutputEventTimeOutputStartMode accepts values of JobStartTime, CustomTime, or LastOutputEventTime. 若要详细了解每个值是指什么,请参阅 PowerShell 文档中的参数部分。To learn more about what each of these values are referring to, see the parameters section in PowerShell documentation.

以下 cmdlet 在运行以后会返回 True 作为输出(如果作业启动)。After you run the following cmdlet, it returns True as output if the job starts. 在存储容器中,创建的输出文件夹包含已转换的数据。In the storage container, an output folder is created with the transformed data.

Start-AzStreamAnalyticsJob `
  -ResourceGroupName $resourceGroup `
  -Name $jobName `
  -OutputStartMode 'JobStartTime'

清理资源Clean up resources

若不再需要资源组、流式处理作业以及所有相关资源,请将其删除。When no longer needed, delete the resource group, the streaming job, and all related resources. 删除作业可避免对作业使用的流单元进行计费。Deleting the job avoids billing the streaming units consumed by the job. 如果计划在将来使用该作业,可以跳过删除它的操作,暂时只需停止该作业。If you're planning to use the job in future, you can skip deleting it, and stop the job for now. 如果不打算继续使用该作业,请运行以下 cmdlet,删除本快速入门创建的所有资源:If you aren't going to continue to use this job, delete all resources created by this quickstart by running the following cmdlet:

Remove-AzResourceGroup `
  -Name $resourceGroup

后续步骤Next steps

在本快速入门中,你使用 PowerShell 部署了一个简单的流分析作业。In this quickstart, you deployed a simple Stream Analytics job using PowerShell. 你还可以使用 Azure 门户部署流分析作业。You can also deploy Stream Analytics jobs using the Azure portal.

若要了解如何配置其他输入源并执行实时检测,请继续阅读以下文章:To learn about configuring other input sources and performing real-time detection, continue to the following article: