如何在 Azure Cosmos DB for PostgreSQL 中使用 Azure 流分析引入数据

适用对象: Azure Cosmos DB for PostgreSQL(由 PostgreSQL 的 Citus 数据库扩展提供支持)

Azure 流分析是一个实时分析和事件处理引擎,用于处理从设备、传感器和网站快速传入的大量流数据。 它还可用于 Azure IoT Edge 运行时,在 IoT 设备上实现数据处理。

Diagram that shows Stream Analytics architecture with Azure Cosmos DB for PostgreSQL.

对于 IoT 等实时工作负载,Azure Cosmos DB for PostgreSQL 的作用非常显著。 对于这些工作负载,流分析可以充当无代码、高性能且可缩放的替代方案,对来自 Azure 事件中心、Azure IoT 中心和 Azure Blob 存储的数据进行预处理并将其流式传输到 Azure Cosmos DB for PostgreSQL。

设置流分析的步骤

注意

本文使用 Azure IoT 中心作为示例数据源,但其中所述的方法也适用于流分析支持的任何其他源。 此外,以下演示数据来自 Azure IoT 设备遥测模拟器。 本文不介绍如何设置该模拟器。

  1. 在 Azure 门户中,展开左上方的门户菜单并选择“创建资源”。

  2. 从结果列表中选择“分析”>“流分析作业”。

  3. 在“新建流分析作业”页中填充以下信息:

    • 订阅 - 选择要用于此作业的 Azure 订阅。
    • 资源组 - 选择你的 IoT 中心所在的同一资源组。
    • 名称 - 输入用于标识你的流分析作业的名称。
    • 区域 - 选择用于托管你的流分析作业的 Azure 区域。 使用最靠近用户的地理位置,以便改进性能并减少数据传输成本。
    • 托管环境 - 选择“云”以部署到 Azure 云,或选择“Edge”以部署到 IoT Edge 设备。
    • 流单元 - 为执行作业所需的计算资源选择流单元数量。
  4. 选择“查看 + 创建”,然后选择“创建”。 你将在右上方看到“部署正在进行”通知。

    Screenshot that shows the create Stream Analytics job form.

  5. 配置作业输入。

    Screenshot that shows configuring job input in Stream Analytics.

    1. 资源部署完成后,导航到你的流分析作业。 选择“输入”>“添加流输入”>“IoT 中心”。

    2. 使用以下值填写“IoT 中心”页

      • 输入别名 - 输入用于标识作业输入的名称。
      • 订阅 - 选择包含你的 IoT 中心帐户的 Azure 订阅。
      • IoT 中心 – 选择你的 IoT 中心的名称。
    3. 选择“保存”。

    4. 添加输入流后,还可以验证或下载流入的数据集。 以下代码演示了示例事件的数据:

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. 配置作业输出。

    1. 在流分析作业页上,选择“输出”>“添加”>“PostgreSQL 数据库(预览版)”。

      Screenshot that shows selecting PostgreSQL database output.

    2. 在“Azure PostgreSQL”页上填写以下值:

      • 输出别名 - 输入用于标识作业输出的名称。
      • 选择“手动提供 PostgreSQL 数据库设置”,并填写“服务器完全限定的域名”、“数据库”、“表”、“用户名”和“密码”字段。 在示例数据集中,使用表 device_data。
    3. 选择“保存”。

    Configure job output in Azure Stream Analytics.

  7. 定义转换查询。

    Transformation query in Azure Stream Analytics.

    1. 在流分析作业页上,从左侧菜单中选择“查询”。

    2. 本教程仅将来自 IoT 中心的备用事件引入 Azure Cosmos DB for PostgreSQL,以减少总体数据大小。 复制并将以下查询粘贴到查询窗格中:

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. 选择“保存查询”。

      注意

      使用查询的目的不仅是对数据进行采样,而且还要从数据流中提取所需的属性。 在流分析中使用自定义查询选项有助于在将数据引入数据库之前对其进行预处理/转换。

  8. 启动流分析作业并验证输出。

    1. 返回到作业概览页,然后选择“启动”。

    2. 在“启动作业”页上,为“作业输出开始时间”选择“现在”,然后选择“开始”。

    3. 首次启动该作业需要一段时间,但一旦触发,它就会在传入数据时持续运行。 几分钟后,你可以查询群集以验证数据是否已加载。

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

注意

Azure Cosmos DB for PostgreSQL 目前不支持“测试连接”功能,即使连接正常,它也可能会引发错误。

后续步骤

了解如何使用 Azure Cosmos DB for PostgreSQL 创建实时仪表板