快速入门:使用 Azure CLI 创建 Azure 流分析作业

在本快速入门中,将使用 Azure CLI 定义一个流分析作业,以便筛选温度读数高于 27 的实时传感器消息。 该流分析作业会从 IoT 中心读取数据,对数据进行转换,然后将输出数据写入到 Blob 存储中的容器。 在本快速入门中使用的输入数据由 Raspberry Pi 联机模拟器生成。

开始之前

如果没有 Azure 订阅,可在开始前创建一个试用帐户

注意

在可以在由世纪互联运营的 Microsoft Azure 中使用 Azure CLI 之前,请先运行 az cloud set -n AzureChinaCloud 来更改云环境。 若要切换回 Azure 公有云,请再次运行 az cloud set -n AzureCloud

先决条件

可以使用本地 Azure CLI。

  • 如果需要,请安装 Azure CLI 来运行 CLI 参考命令。

  • 本地 Azure CLI,请了解如何安装 Azure CLI。 如果在 Windows 或 macOS 上运行,请考虑在 Docker 容器中运行 Azure CLI。 有关详细信息,请参阅如何在 Docker 容器中运行 Azure CLI

    • 通过使用 az login 命令登录到 Azure CLI。 若要完成身份验证过程,请遵循终端中显示的步骤。 有关其他登录选项,请参阅使用 Azure CLI 登录

    • 出现提示时,请在首次使用时安装 Azure CLI 扩展。 有关扩展详细信息,请参阅使用 Azure CLI 的扩展

    • 运行 az version 以查找安装的版本和依赖库。 若要升级到最新版本,请运行 az upgrade

  • 创建资源组。 必须将所有 Azure 资源部署到资源组。 使用资源组可以组织和管理相关的 Azure 资源。

    对于本快速入门,请使用以下 az group create 命令在 chinanorth 位置创建名为 streamanalyticsrg 的资源组 :

    az group create --name streamanalyticsrg --location chinanorth
    

对输入数据进行准备

在定义流分析作业之前,请准备用于作业输入的数据。 以下 Azure CLI 命名将准备作业所需的输入数据。

  1. 使用 az iot hub create 命令创建 IoT 中心。 此示例创建名为 MyASAIoTHub 的 IoT 中心。 由于 IoT 中心名称是唯一的,因此需使用你自己的 IoT 中心名称。 将 SKU 设置为 F1 即可使用免费层,前提是它在订阅中可用。 否则,请选择下一个最低的层。

    iotHubName=MyASAIoTHub
    az iot hub create --name $iotHubName --resource-group streamanalyticsrg --sku S1
    

    创建 IoT 中心以后,请使用 az iot hub connection-string show 命令获取 IoT 中心连接字符串。 复制整个连接字符串并进行保存。 在将 IoT 中心作为输入添加到流分析作业时使用它。

    az iot hub connection-string show --hub-name $iotHubName
    
  2. 使用 az iothub device-identity create 命令将设备添加到 IoT 中心。 此示例创建名为 MyASAIoTDevice 的设备。

    az iot hub device-identity create --hub-name $iotHubName --device-id "MyASAIoTDevice"
    
  3. 使用 az iot hub device-identity connection-string show 命令获取设备连接字符串。 复制整个连接字符串并将其保存。这样,在创建 Raspberry Pi 模拟器时,就可以使用该字符串。

    az iot hub device-identity connection-string show --hub-name $iotHubName --device-id "MyASAIoTDevice" --output table
    

    输出示例:

    HostName=MyASAIoTHub.azure-devices.net;DeviceId=MyASAIoTDevice;SharedAccessKey=a2mnUsg52+NIgYudxYYUNXI67r0JmNubmfVafojG8=
    

创建 Blob 存储帐户

以下 Azure CLI 命令将创建用于作业输出的 Blob 存储帐户

  1. 使用 az storage account create 命令创建常规用途存储帐户。 常规用途的存储帐户可用于以下四种服务:Blob、文件、表和队列。

    storageAccountName="asatutorialstorage$RANDOM"
    az storage account create \
        --name $storageAccountName \
        --resource-group streamanalyticsrg \
        --location chinanorth \
        --sku Standard_LRS \
        --encryption-services blob
    
  1. 通过运行 az storage account keys list 命令获取存储帐户的密钥。 保存此密钥以便在下一步中使用。

    key=$(az storage account keys list -g streamanalyticsrg -n $storageAccountName --query "[0].value" -o tsv)
    echo $key
    

    重要

    记下 Azure 存储帐户的访问密钥。 稍后将在本快速入门中使用此密钥。

  2. 使用 az storage container create 命令创建一个名为 state 的容器来存储 Blob。 使用存储帐户密钥来授权操作创建容器。 有关使用 Azure CLI 授权数据操作的详细信息,请参阅使用 Azure CLI 授权访问 blob 或队列数据

    az storage container create \
        --account-name $storageAccountName \
        --name state \
        --account-key $key \
        --auth-mode key
    

创建流分析作业

使用 az stream-analytics job create 命令创建流分析作业。

az stream-analytics job create \
    --job-name "streamanalyticsjob" \
    --resource-group "streamanalyticsrg" \
    --location "chinanorth" \
    --output-error-policy "Drop" \
    --out-of-order-policy "Drop" \
    --order-max-delay 5 \
    --arrival-max-delay 16 \
    --data-locale "en-US"

配置作业输入

使用 az stream-analytics input cmdlet 将输入添加到作业。 此 cmdlet 将作业名称、作业输入名称、资源组名称和 JSON 格式的输入属性作为参数。 在此示例中,需将 IoT 中心创建为输入。

重要

  • IOT HUB ACCESS KEY 替换为你保存的 IOT 中心连接字符串中的共享访问密钥值。 例如,如果 IOT 中心连接字符串为 HostName=MyASAIoTHub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=xxxxxxxxxxxxxx=,则共享访问密钥值为 xxxxxxxxxxxxxx=。 替换值时,请确保不要删除 "(双引号)的 \(转义)字符。
  • 如果使用了 MyASAIoTHub 以外的名称,请在以下命令中更新 iotHubNamespace 的值。 运行 echo $iotHubName 可查看 IoT 中心的名称。
az stream-analytics input create \
    --properties "{\"type\":\"Stream\",\"datasource\":{\"type\":\"Microsoft.Devices/IotHubs\",\"properties\":{\"consumerGroupName\":\"\$Default\",\"endpoint\":\"messages/events\",\"iotHubNamespace\":\"MyASAIoTHub\",\"sharedAccessPolicyKey\":\"IOT HUB ACCESS KEY\",\"sharedAccessPolicyName\":\"iothubowner\"}},\"serialization\":{\"type\":\"Json\",\"encoding\":\"UTF8\"}}" \
    --input-name "asaiotinput" \
    --job-name "streamanalyticsjob" \
    --resource-group "streamanalyticsrg"

配置作业输出

使用 az stream-analytics output create cmdlet 将输出添加到作业。 此 cmdlet 将作业名称、作业输出名称、资源组名称、JSON 格式的数据源和序列化类型作为参数。

重要

STORAGEACCOUNTNAME> 替换为 Azure 存储帐户的名称,并将 STORAGEACCESSKEY> 替换为存储帐户的访问密钥。 如果未记下这些值,请运行以下命令来获取这些值:echo $storageAccountNameecho $key。 替换这些值时,请确保不要删除 "(双引号)的 \(转义)字符。

az stream-analytics output create \
    --job-name streamanalyticsjob \
    --datasource "{\"type\":\"Microsoft.Storage/Blob\",\"properties\":{\"container\":\"state\",\"dateFormat\":\"yyyy/MM/dd\",\"pathPattern\":\"{date}/{time}\",\"storageAccounts\":[{\"accountKey\":\"STORAGEACCESSKEY\",\"accountName\":\"STORAGEACCOUNTNAME\"}],\"timeFormat\":\"HH\"}}" \
    --serialization "{\"type\":\"Json\",\"properties\":{\"format\":\"Array\",\"encoding\":\"UTF8\"}}" \
    --output-name asabloboutput \
    --resource-group streamanalyticsrg

定义转换查询

使用 az stream-analytics transformation create cmdlet 将转换添加到作业。

az stream-analytics transformation create \
    --resource-group streamanalyticsrg \
    --job-name streamanalyticsjob \
    --name Transformation \
    --streaming-units "6" \
    --saql "SELECT * INTO asabloboutput FROM asaiotinput WHERE Temperature > 27"

运行 IoT 模拟器

  1. 打开 Raspberry Pi Azure IoT 联机模拟器

  2. 将第 15 行中的占位符替换为在快速入门开头保存的整个 Azure IoT 中心设备连接字符串(而不是 IoT 中心连接字符串)。

  3. 选择“运行”。 输出会显示传感器数据和发送到 IoT 中心的消息。

    Raspberry Pi Azure IoT Online Simulator

启动流分析作业并检查输出

使用 az stream-analytics job start cmdlet 启动作业。 此 cmdlet 使用作业名称、资源组名称、输出启动模式和启动时间作为参数。 OutputStartMode 接受的值为 JobStartTimeCustomTimeLastOutputEventTime

以下 cmdlet 在运行以后会返回 True 作为输出(如果作业启动)。

az stream-analytics job start \
    --resource-group streamanalyticsrg \
    --name streamanalyticsjob \
    --output-start-mode JobStartTime

等待几分钟,然后验证是否已在 state Blob 容器中创建输出文件。

Screenshot showing the output file in the State blob container.

下载并打开该文件,可以看到类似于以下内容的几个条目:

{
    "messageId": 229,
    "deviceId": "Raspberry Pi Web Client",
    "temperature": 31.85214010589595,
    "humidity": 60.278830289656284,
    "EventProcessedUtcTime": "2023-02-28T22:06:33.5567789Z",
    "PartitionId": 3,
    "EventEnqueuedUtcTime": "2023-02-28T22:05:49.6520000Z",
    "IoTHub": {
        "MessageId": null,
        "CorrelationId": null,
        "ConnectionDeviceId": "MyASAIoTDevice",
        "ConnectionDeviceGenerationId": "638132150746523845",
        "EnqueuedTime": "2023-02-28T22:05:49.6520000Z",
        "StreamId": null
    }
}

清理资源

删除资源组,这将删除资源组中的所有资源,包括流分析作业、IoT 中心和 Azure 存储帐户。

az group delete \
    --name streamanalyticsrg \
    --no-wait

后续步骤

在本快速入门中,你使用 Azure CLI 部署了一个简单的流分析作业。 你还可以使用 Azure 门户部署流分析作业。

若要了解如何配置其他输入源并执行实时检测,请继续阅读以下文章: