Compartir a través de

在 Azure SQL Edge 中创建数据流作业

重要

Azure SQL Edge 将于 2025 年 9 月 30 日停用。 有关详细信息和迁移选项,请参阅停用通知

注意

Azure SQL Edge 不再支持 ARM64 平台。

本文介绍了如何在 Azure SQL Edge 中创建 T-SQL 流式处理作业。 你将创建外部流输入和输出对象,然后在创建流式处理作业的过程中定义流式处理作业查询。

配置外部流输入和输出对象

T-SQL 流式处理使用 SQL Server 的外部数据源功能,来定义与流式处理作业的外部流输入和输出相关联的数据源。 使用以下 T-SQL 命令创建外部流输入或输出对象:

此外,如果将 Azure SQL Edge、SQL Server 或 Azure SQL 数据库用作输出流,则需要使用 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)。 此 T-SQL 命令定义用于访问数据库的凭据。

支持的输入和输出流数据源

Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。

数据源类型 输入 输出 说明
Azure IoT Edge 中心 Y Y 将流式处理数据读/写到 Azure IoT Edge 中心的数据源。 有关详细信息,请参阅 IoT Edge 中心
SQL 数据库 N Y 将流式处理数据写入 SQL 数据库的数据源连接。 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。
Kafka Y N 从 Kafka 主题读取流式处理数据的数据源。

示例:为 Azure IoT Edge 中心创建外部流输入/输出对象

以下示例为 Azure IoT Edge 中心创建外部流对象。 若要为 Azure IoT Edge 中心创建外部流输入/输出数据源,首先也需针对要读取或写入的数据的布局创建一个外部文件格式。

  1. 创建 JSON 类型的外部文件格式。

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. 为 Azure IoT Edge 中心创建外部数据源。 以下 T-SQL 脚本创建与 IoT Edge 中心的数据源连接,该中心与 Azure SQL Edge 在同一 Docker 主机上运行。

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. 为 Azure IoT Edge 中心创建外部流对象。 以下 T-SQL 脚本为 IoT Edge 中心创建流对象。 对于 IoT Edge 中心流对象,LOCATION 参数是要读取或写入的 IoT Edge 中心主题或通道的名称。

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

示例:创建 Azure SQL 数据库的外部流对象

下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。

  1. 在数据库上创建主密钥。 这是加密凭据密钥所必需的。

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. 创建用于访问 SQL Server 源的、以数据库为作用域的凭据。 下面的示例为外部数据源创建一个凭据,其中 IDENTITY = username 且 SECRET = password。

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. 使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:

    • 创建名为 LocalSQLOutput 的外部数据源。
    • 标识外部数据源 (LOCATION = '<vendor>://<server>[:<port>]')。 在示例中,它指向 Azure SQL Edge 的本地实例。
    • 使用先前创建的凭据。
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. 创建外部流对象。 下面的示例创建一个指向 MySQLDatabase 数据库中的 dbo.TemperatureMeasurements 表的外部流对象。

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

示例:为 Kafka 创建外部流对象

下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。 此示例假设已将 kafka 服务器配置为使用匿名访问。

  1. 使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。 下面的示例:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. 为 Kafka 输入创建外部文件格式。 以下示例创建了采用 GZipped 压缩的 JSON 文件格式。

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. 创建外部流对象。 以下示例创建指向 Kafka 主题 TemperatureMeasurement 的外部流对象。

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

创建流式处理作业和流式处理查询

使用 sys.sp_create_streaming_job 系统存储过程来定义流式处理查询并创建流式处理作业。 sp_create_streaming_job 存储过程采用以下参数:

  • @job_name:流式处理作业的名称。 流式处理作业名称在实例中是唯一的。
  • @statement:基于流分析查询语言的流式处理查询语句。

下面的示例创建一个简单的流式处理作业,其中包含一个流式处理查询。 此查询从 IoT Edge 中心读取输入,并写入到数据库中的 dbo.TemperatureMeasurements

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

以下示例创建一个更复杂的流式处理作业,其中包含多个不同查询。 这些查询中有一个查询使用内置的 AnomalyDetection_ChangePoint 函数来识别温度数据中的异常。

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

启动、停止、删除和监视流式处理作业

若要在 Azure SQL Edge 中启动流式处理作业,请运行 sys.sp_start_streaming_job 存储过程。 该存储过程需要使用要启动的流式处理作业的名称作为输入。

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

若要停止流式处理作业,请运行 sys.sp_stop_streaming_job 存储过程。 该存储过程需要使用要停止的流式处理作业的名称作为输入。

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

若要丢弃(或删除)流式处理作业,请运行 sys.sp_drop_streaming_job 存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

若要获取流式处理作业的当前状态,请运行 sys.sp_get_streaming_job 存储过程。 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。 它输出流式处理作业的名称和当前状态。

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

流式处理作业可以处于以下任一状态:

状态 说明
创建 流式处理作业已创建,但尚未启动。
正在启动 流式处理作业处于开始阶段。
空闲 流式处理作业正在运行,但没有要处理的输入。
Processing 流式处理作业正在运行,且正在处理输入。 此状态指示流式处理作业的正常运行状态。
已降级 流式处理作业正在运行,但在处理输入期间出现一些非致命错误。 输入作业将继续运行,但将删除遇到错误的输入。
已停止 流式处理作业已停止。
Failed 流式处理作业失败。 这通常表示在处理过程中出现灾难性错误。

注意

由于流式处理作业是异步执行的,因此作业可能会在运行时遇到错误。 若要排查流式处理作业失败问题,请使用 sys.sp_get_streaming_job 存储过程,或查看来自 Azure SQL Edge 容器的 Docker 日志,该容器可以提供流式处理作业中的错误详细信息。