在 Azure SQL Edge 中创建数据流作业Create a data streaming job in Azure SQL Edge

本文介绍了如何在 Azure SQL Edge 中创建 T-SQL 流式处理作业。This article explains how to create a T-SQL streaming job in Azure SQL Edge. 你将创建外部流输入和输出对象,然后在创建流式处理作业的过程中定义流式处理作业查询。You create the external stream input and output objects, and then you define the streaming job query as part of the streaming job creation.

配置外部流输入和输出对象Configure the external stream input and output objects

T-SQL 流式处理使用 SQL Server 的外部数据源功能,来定义与流式处理作业的外部流输入和输出相关联的数据源。T-SQL streaming uses the external data source functionality of SQL Server to define the data sources associated with the external stream inputs and outputs of the streaming job. 使用以下 T-SQL 命令创建外部流输入或输出对象:Use the following T-SQL commands to create an external stream input or output object:

此外,如果将 Azure SQL Edge、SQL Server 或 Azure SQL 数据库用作输出流,则需要使用 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)Additionally, if Azure SQL Edge, SQL Server, or Azure SQL Database is used as an output stream, you need the CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). 此 T-SQL 命令定义用于访问数据库的凭据。This T-SQL command defines the credentials to access the database.

支持的输入和输出流数据源Supported input and output stream data sources

Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。Azure SQL Edge currently only supports the following data sources as stream inputs and outputs.

数据源类型Data source type 输入Input 输出Output 说明Description
Azure IoT Edge 中心Azure IoT Edge hub YY YY 用于将流式处理数据读取和写入到 Azure IoT Edge 中心的数据源。Data source to read and write streaming data to an Azure IoT Edge hub. 有关详细信息,请参阅 IoT Edge 中心For more information, see IoT Edge Hub.
SQL 数据库SQL Database NN YY 将流式处理数据写入 SQL 数据库的数据源连接。Data source connection to write streaming data to SQL Database. 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。The database can be a local database in Azure SQL Edge, or a remote database in SQL Server or Azure SQL Database.
KafkaKafka YY NN 从 Kafka 主题读取流式处理数据的数据源。Data source to read streaming data from a Kafka topic. 此适配器目前仅适用于 Azure SQL Edge 的 Intel 或 AMD 版本。This adapter is currently only available for Intel or AMD versions of Azure SQL Edge. 它不适用于 Azure SQL Edge 的 ARM64 版本。It isn't available for the ARM64 version of Azure SQL Edge.

示例:为 Azure IoT Edge 中心创建外部流输入/输出对象Example: Create an external stream input/output object for Azure IoT Edge hub

以下示例为 Azure IoT Edge 中心创建外部流对象。The following example creates an external stream object for Azure IoT Edge hub. 若要为 Azure IoT Edge 中心创建外部流输入/输出数据源,首先也需针对要读取或写入的数据的布局创建一个外部文件格式。To create an external stream input/output data source for Azure IoT Edge hub, you first need to create an external file format for the layout of the data being read or written too.

  1. 创建 JSON 类型的外部文件格式。Create an external file format of the type JSON.

    Create External file format InputFileFormat
    WITH 
    (  
       format_type = JSON,
    )
    go
    
  2. 为 Azure IoT Edge 中心创建外部数据源。Create an external data source for Azure IoT Edge hub. 以下 T-SQL 脚本创建与 IoT Edge 中心的数据源连接,该中心与 Azure SQL Edge 在同一 Docker 主机上运行。The following T-SQL script creates a data source connection to an IoT Edge hub that runs on the same Docker host as Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput 
    WITH 
    (
        LOCATION = 'edgehub://'
    )
    go
    
  3. 为 Azure IoT Edge 中心创建外部流对象。Create the external stream object for Azure IoT Edge hub. 以下 T-SQL 脚本为 IoT Edge 中心创建流对象。The following T-SQL script creates a stream object for the IoT Edge hub. 对于 IoT Edge 中心流对象,LOCATION 参数是要读取或写入的 IoT Edge 中心主题或通道的名称。In case of an IoT Edge hub stream object, the LOCATION parameter is the name of the IoT Edge hub topic or channel being read or written to.

    CREATE EXTERNAL STREAM MyTempSensors 
    WITH 
    (
        DATA_SOURCE = EdgeHubInput,
        FILE_FORMAT = InputFileFormat,
        LOCATION = N'TemperatureSensors',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    go
    

示例:创建 Azure SQL 数据库的外部流对象Example: Create an external stream object to Azure SQL Database

下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。The following example creates an external stream object to the local database in Azure SQL Edge.

  1. 在数据库上创建主密钥。Create a master key on the database. 这是加密凭据密钥所必需的。This is required to encrypt the credential secret.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. 创建用于访问 SQL Server 源的、以数据库为作用域的凭据。Create a database-scoped credential for accessing the SQL Server source. 下面的示例为外部数据源创建一个凭据,其中 IDENTITY = username 且 SECRET = password。The following example creates a credential to the external data source, with IDENTITY = username, and SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'
    go
    
  3. 使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。Create an external data source with CREATE EXTERNAL DATA SOURCE. 下面的示例:The following example:

    • 创建名为 LocalSQLOutput 的外部数据源。Creates an external data source named LocalSQLOutput.
    • 标识外部数据源 (LOCATION = '://[:]')。Identifies the external data source (LOCATION = '://[:]'). 在示例中,它指向 Azure SQL Edge 的本地实例。In the example, it points to a local instance of Azure SQL Edge.
    • 使用先前创建的凭据。Uses the credential created previously.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput 
    WITH 
    (
        LOCATION = 'sqlserver://tcp:.,1433',
        CREDENTIAL = SQLCredential
    )
    go
    
  4. 创建外部流对象。Create the external stream object. 下面的示例创建一个指向 MySQLDatabase 数据库中的 dbo.TemperatureMeasurements 表的外部流对象。The following example creates an external stream object pointing to a table dbo.TemperatureMeasurements, in the database MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements 
    WITH 
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

示例:为 Kafka 创建外部流对象Example: Create an external stream object for Kafka

下面的示例为 Azure SQL Edge 中的本地数据库创建了一个外部流对象。The following example creates an external stream object to the local database in Azure SQL Edge. 此示例假设已将 kafka 服务器配置为使用匿名访问。This example assumes that the kafka server is configured for anonymous access.

  1. 使用 CREATE EXTERNAL DATA SOURCE 创建外部数据源。Create an external data source with CREATE EXTERNAL DATA SOURCE. 下面的示例:The following example:

    Create EXTERNAL DATA SOURCE [KafkaInput] 
    With
    (
        LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'
    )
    GO
    
  2. 为 kafka 输入创建外部文件格式。Create an external file format for the kafka input. 以下示例创建了采用 GZipped 压缩的 JSON 文件格式。The following example created a JSON file format with GZipped Compression.

    CREATE EXTERNAL FILE FORMAT JsonGzipped  
     WITH 
     (  
         FORMAT_TYPE = JSON , 
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' 
     )
    
  3. 创建外部流对象。Create the external stream object. 以下示例创建指向 Kafka 主题 *TemperatureMeasurement* 的外部流对象。The following example creates an external stream object pointing to Kafka topic *TemperatureMeasurement*.

    CREATE EXTERNAL STREAM TemperatureMeasurement 
    WITH 
    (  
        DATA_SOURCE = KafkaInput, 
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10' 
    ); 
    

创建流式处理作业和流式处理查询Create the streaming job and the streaming queries

使用 sys.sp_create_streaming_job 系统存储过程来定义流式处理查询并创建流式处理作业。Use the sys.sp_create_streaming_job system stored procedure to define the streaming queries and create the streaming job. sp_create_streaming_job 存储过程采用以下参数:The sp_create_streaming_job stored procedure takes the following parameters:

  • job_name:流式处理作业的名称。job_name: The name of the streaming job. 流式处理作业名称在实例中是唯一的。Streaming job names are unique across the instance.
  • statement:基于流分析查询语言的流式处理查询语句。statement: Stream Analytics Query Language-based streaming query statements.

下面的示例创建一个简单的流式处理作业,其中包含一个流式处理查询。The following example creates a simple streaming job with one streaming query. 此查询从 IoT Edge 中心读取输入,并写入到数据库中的 dbo.TemperatureMeasurementsThis query reads the inputs from the IoT Edge hub, and writes to dbo.TemperatureMeasurements in the database.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

以下示例创建一个更复杂的流式处理作业,其中包含多个不同查询。The following example creates a more complex streaming job with multiple different queries. 这些查询中有一个查询使用内置的 AnomalyDetection_ChangePoint 函数来识别温度数据中的异常。These queries include one that uses the built-in AnomalyDetection_ChangePoint function to identify anomalies in the temperature data.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1

Select * Into TemperatureMeasurements2 from MyEdgeHubInput2

Select * Into TemperatureMeasurements3 from MyEdgeHubInput3

SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go

启动、停止、删除和监视流式处理作业Start, stop, drop, and monitor streaming jobs

若要在 Azure SQL Edge 中启动流式处理作业,请运行 sys.sp_start_streaming_job 存储过程。To start a streaming job in Azure SQL Edge, run the sys.sp_start_streaming_job stored procedure. 该存储过程需要使用要启动的流式处理作业的名称作为输入。The stored procedure requires the name of the streaming job to start, as input.

exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go

若要停止流式处理作业,请运行 sys.sp_stop_streaming_job 存储过程。To stop a streaming job, run the sys.sp_stop_streaming_job stored procedure. 该存储过程需要使用要停止的流式处理作业的名称作为输入。The stored procedure requires the name of the streaming job to stop, as input.

exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go

若要丢弃(或删除)流式处理作业,请运行 sys.sp_drop_streaming_job 存储过程。To drop (or delete) a streaming job, run the sys.sp_drop_streaming_job stored procedure. 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。The stored procedure requires the name of the streaming job to drop, as input.

exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go

若要获取流式处理作业的当前状态,请运行 sys.sp_get_streaming_job 存储过程。To get the current status of a streaming job, run the sys.sp_get_streaming_job stored procedure. 该存储过程需要使用要丢弃的流式处理作业的名称作为输入。The stored procedure requires the name of the streaming job to drop, as input. 它输出流式处理作业的名称和当前状态。It outputs the name and the current status of the streaming job.

exec sys.sp_get_streaming_job @name=N'StreamingJob1'
        WITH RESULT SETS
(
       (
       name nvarchar(256),
       status nvarchar(256),
       error nvarchar(256)
       )
)

流式处理作业可以处于以下任一状态:The streaming job can have any one of the following statuses:

状态Status 说明Description
创建Created 流式处理作业已创建,但尚未启动。The streaming job was created, but hasn't yet been started.
正在启动Starting 流式处理作业处于开始阶段。The streaming job is in the starting phase.
空闲Idle 流式处理作业正在运行,但没有要处理的输入。The streaming job is running, but there's no input to process.
ProcessingProcessing 流式处理作业正在运行,且正在处理输入。The streaming job is running, and is processing inputs. 此状态指示流式处理作业的正常运行状态。This state indicates a healthy state for the streaming job.
已降级Degraded 流式处理作业正在运行,但在处理输入期间出现一些非致命错误。The streaming job is running, but there were some non-fatal errors during input processing. 输入作业将继续运行,但将删除遇到错误的输入。The input job will continue to run, but will drop inputs that encounter errors.
已停止Stopped 流式处理作业已停止。The streaming job has been stopped.
FailedFailed 流式处理作业失败。The streaming job failed. 这通常表示在处理过程中出现灾难性错误。This is generally an indication of a fatal error during processing.

后续步骤Next steps