CREATE EXTERNAL STREAM (Transact-SQL)CREATE EXTERNAL STREAM (Transact-SQL)

EXTERNAL STREAM 对象的输入和输出流都具有双重目的。The EXTERNAL STREAM object has a dual purpose of both an input and output stream. 它可用作从事件引入服务(如 Azure 事件中心、Azure IoT 中心[或 Edge 中心]或 Kafka)查询流式处理数据的输入,也可用作输出来指定在何处以及如何存储流式处理查询的结果。It can be used as an input to query streaming data from event ingestion services such as Azure Event Hub, Azure IoT Hub (or Edge Hub) or Kafka or it can be used as an output to specify where and how to store results from a streaming query.

此外,还可以指定 EXTERNAL STREAM,并将其创建为事件中心或 Blob 存储等服务的输出和输入。An EXTERNAL STREAM can also be specified and created as both an output and input for services such as Event Hub or Blob storage. 这适用于链接场景:一个流式处理查询将结果作为输出保存到外部流,另一个流式处理查询从相同的外部流读取结果作为输入。This facilitates chaining scenarios where a streaming query is persisting results to the external stream as output and another streaming query reading from the same external stream as input.

Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。Azure SQL Edge currently only supports the following data sources as stream inputs and outputs.

数据源类型Data source type 输入Input 输出Output 说明Description
Azure IoT Edge 中心Azure IoT Edge hub YY YY 将流式处理数据读/写到 Azure IoT Edge 中心的数据源。Data source to read and write streaming data to an Azure IoT Edge hub. 有关详细信息,请参阅 IoT Edge 中心For more information, see IoT Edge Hub.
SQL 数据库SQL Database NN YY 将流式处理数据写入 SQL 数据库的数据源连接。Data source connection to write streaming data to SQL Database. 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。The database can be a local database in Azure SQL Edge, or a remote database in SQL Server or Azure SQL Database.
KafkaKafka YY NN 从 Kafka 主题读取流式处理数据的数据源。Data source to read streaming data from a Kafka topic. Kafka 支持不适用于 Azure SQL Edge 的 ARM64 版本。Kafka support is not available for the ARM64 version of Azure SQL Edge.

语法Syntax

CREATE EXTERNAL STREAM {external_stream_name}  
( <column_definition> [, <column_definition> ] * ) -- Used for Inputs - optional 
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name, 
  LOCATION = location_name, 
  [FILE_FORMAT = external_file_format_name], --Used for Inputs - optional 
  [<optional_input_options>],
  [<optional_output_options>], 
  TAGS = <tag_column_value>

<optional_input_options> ::= 
  INPUT_OPTIONS = '[<Input_options_data>]'

<Input_option_data> ::= 
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [number_of_partitions]
  | CONSUMER_GROUP: [ consumer_group_name ] 
  | TIME_POLICY: [ time_policy ] 
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ] 
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
  
<optional_output_options> ::= 
  OUTPUT_OPTIONS = '[<output_option_data>]'

<output_option_data> ::= 
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ] 
   | MINIMUM_ROWS: [ row_value ] 
   | MAXIMUM_TIME: [ time_value_minutes] 
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ] 
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ] 
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ] 
   | PARTITION_KEY: [ partition_key_name ] 
   | ROW_KEY: [ row_key_name ] 
   | BATCH_SIZE: [ batch_size_value ] 
   | MAXIMUM_BATCH_COUNT: [ batch_value ] 
   | STAGING_AREA: [ blob_data_source ]
 
<tag_column_value> ::= -- Reserved for Future Usage
); 

参数Arguments

  • DATA_SOURCEDATA_SOURCE

  • FILE_FORMATFILE_FORMAT

  • LOCATION:指定数据源中的实际数据或位置的名称。LOCATION: Specifies the name for the actual data or location in the data source.

    • 对于 Edge 中心或 Kafka 流对象,location 指定要从中读取或向其写入的 Edge 中心或 Kafka 主题的名称。For Edge Hub or Kafka stream objects, location specifies the name of the Edge Hub or Kafka topic to read from or write to.
    • 对于 SQL 流对象(SQL Server、Azure SQL 数据库或 Azure SQL Edge),location 指定表的名称。For SQL stream objects(SQL Server, Azure SQL Database or Azure SQL Edge) location specifies the name of the table. 如果在与目标表相同的数据库和架构中创建流,则仅提供表名就够了。If the stream is created in the same database and schema as the destination table, then just the Table name suffices. 否则,需要完全限定 (<database_name.schema_name.table_name) 表名。Otherwise you need to fully qualify (<database_name.schema_name.table_name) the table name.
    • 对于 Azure Blob 存储流对象,location 指的是要在 blob 容器内使用的路径模式。For Azure Blob Storage stream object location refers to the path pattern to use inside the blob container. 有关此功能的详细信息,请参阅 (/articles/stream-analytics/stream-analytics-define-outputs.md#blob-storage-and-azure-data-lake-gen2)For more information on this feature refer to (/articles/stream-analytics/stream-analytics-define-outputs.md#blob-storage-and-azure-data-lake-gen2)
  • INPUT_OPTIONS:将选项指定为 Kafka、IoT Edge 中心等服务(这些服务作为流式处理查询的输入)的键值对INPUT_OPTIONS: Specify options as key-value pairs for services such as Kafka, IoT Edge Hub that are inputs to streaming queries

    • PARTITIONS:为主题定义的分区数。PARTITIONS: Number of partitions defined for a topic. 可以使用的最大分区数限制为 32 个。The maximum number of partitions which can be used is limited to 32.
      • 适用于 Kafka 输入流Applies to Kafka Input Streams
    • CONSUMER_GROUP:事件中心和 IoT 中心限制一个使用者组中的读者数量(最多 5 个)。CONSUMER_GROUP: Event and IoT Hubs limit the number of readers within one consumer group (to 5). 将此字段留空将使用“$Default”使用者组。Leaving this field empty will use the '$Default' consumer group.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • TIME_POLICY:描述当延迟事件或无序事件超过其容错值时,是删除事件还是调整事件时间。TIME_POLICY: Describes whether to drop events or adjust the event time when late events or out of order events pass their tolerance value.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • LATE_EVENT_TOLERANCE:最大可接受的时间延迟。LATE_EVENT_TOLERANCE: The maximum acceptable time delay. 延迟表示事件时间戳与系统时钟之间的差异。The delay represents the difference between the event's timestamp and the system clock.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • OUT_OF_ORDER_EVENT_TOLERANCE:事件在经历从输入到流式处理查询的行程后,可能会乱序。OUT_OF_ORDER_EVENT_TOLERANCE: Events can arrive out of order after they've made the trip from the input to the streaming query. 可以按原样接受这些事件,也可以选择在设定的时间段内暂停以对它们重新排序。These events can be accepted as-is, or you can choose to pause for a set period to reorder them.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
  • OUTPUT_OPTIONS:将选项指定为支持服务(此服务作为流式处理查询输出)的键值对OUTPUT_OPTIONS: Specify options as key-value pairs for supported services that are outputs to streaming queries

    • REJECT_POLICY:DROP | RETRY 会在出现数据转换错误时,指定数据错误处理策略。REJECT_POLICY: DROP | RETRY Species the data error handling policies when data conversion errors occur.
      • 适用于所有支持的输出Applies to all supported outputs
    • MINIMUM_ROWS:MINIMUM_ROWS:
      写入输出时每批所需的最小行数。Minimum rows required per batch written to an output. 对于 Parquet,每批都将创建一个新文件。For Parquet, every batch will create a new file.
      • 适用于所有支持的输出Applies to all supported outputs
    • MAXIMUM_TIME:MAXIMUM_TIME:
      每批的最长等待时间(分钟)。Maximum wait time in minutes per batch. 在此时间后,即使不满足最小行数要求,也会将该批写入输出。After this time, the batch will be written to the output even if the minimum rows requirement is not met.
      • 适用于所有支持的输出Applies to all supported outputs
    • PARTITION_KEY_COLUMN:PARTITION_KEY_COLUMN:
      该列用于分区键。The column that is used for the partition key.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • PROPERTY_COLUMNS:PROPERTY_COLUMNS:
      要作为自定义属性(如果提供)附加到消息的输出列的列名称逗号分隔列表。A comma-separated list of the names of output columns that will be attached to messages as custom properties if provided.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • SYSTEM_PROPERTY_COLUMNS:SYSTEM_PROPERTY_COLUMNS:
      要在服务总线消息上填充的系统属性名称和输出列的名称/值对的 JSON 格式集合。A JSON-formatted collection of name/value pairs of System Property names and output columns to be populated on Service Bus messages. 例如,{ "MessageId": "column1", "PartitionKey": "column2"}e.g. { "MessageId": "column1", "PartitionKey": "column2"}
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • PARTITION_KEY:PARTITION_KEY:
      包含分区键的输出列的名称。The name of the output column containing the partition key. 分区键是某个给定表中分区的唯一标识,分区键构成了实体主键的第一部分。The partition key is a unique identifier for the partition within a given table that forms the first part of an entity's primary key. 分区键是一个最大为 1 KB 的字符串值。It is a string value that may be up to 1 KB in size.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • ROW_KEY:ROW_KEY:
      包含行键的输出列的名称。The name of the output column containing the row key. 行键是某个给定分区中实体的唯一标识符。The row key is a unique identifier for an entity within a given partition. 行键构成了实体主键的第二部分。It forms the second part of an entity's primary key. 行键是一个最大为 1 KB 的字符串值。The row key is a string value that may be up to 1 KB in size.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • BATCH_SIZE:BATCH_SIZE:
      这表示最大可达 100 条记录的表存储的事务数。This represents the number of transactions for table storage where the maximum can go up to 100 records. 对于 Azure Functions,这表示每次调用时发送到函数的批大小(以字节为单位)- 默认值为 256 kB。For Azure Functions, this represents the batch size in bytes sent to the function per call - default is 256 kB.
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.
    • MAXIMUM_BATCH_COUNT:MAXIMUM_BATCH_COUNT:
      每次调用 Azure 函数发送给该函数的最大事件数 - 默认值为 100。Maximum number of events sent to the function per call for Azure function - default is 100. 对于 SQL 数据库,这表示随每个批量插入事务一起发送的记录数上限 - 默认值为 10,000。For SQL Database, this represents the maximum number of records sent with every bulk insert transaction - default is 10,000.
      • 适用于所有基于 SQL 的输出Applies to all SQL based outputs
    • STAGING_AREA:Blob 存储的 EXTERNAL DATA SOURCE 对象 - 用于将高吞吐量数据引入到 Azure Synapse Analytics 的临时区域STAGING_AREA: EXTERNAL DATA SOURCE object to Blob Storage The staging area for high-throughput data ingestion into Azure Synapse Analytics
      • 保留以供将来使用。Reserved for future usage. 不适用于 Azure SQL Edge。Does not apply to Azure SQL Edge.

示例Examples

示例 1 - EdgeHubExample 1 - EdgeHub

键入:输入或输出Type: Input or Output

语法:Syntax:

CREATE EXTERNAL DATA SOURCE MyEdgeHub 
WITH  
(      
  LOCATION = 'edgehub://'       
); 
 
CREATE EXTERNAL FILE FORMAT myFileFormat  
WITH (  
   FORMAT_TYPE = 'JSON', 
); 
 
CREATE EXTERNAL STREAM Stream_A  
WITH    
(   
   DATA_SOURCE = MyEdgeHub, 
   FILE_FORMAT = myFileFormat, 
   LOCATION = '<mytopicname>', 
   OUTPUT_OPTIONS =   
     'REJECT_TYPE: Drop'
);

示例 2 - Azure SQL 数据库、Azure SQL Edge、SQL ServerExample 2 - Azure SQL Database, Azure SQL Edge, SQL Server

键入:输出Type: Output

语法:Syntax:

CREATE DATABASE SCOPED CREDENTIAL SQLCredName 
WITH IDENTITY = '<user>', 
SECRET = '<password>'; 
 
-- Azure SQL Database 
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl 
WITH 
(     
  LOCATION = '<my_server_name>.database.windows.net', 
  CREDENTIAL = SQLCredName 
); 
 
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl 
WITH 
(     
  LOCATION = ' <sqlserver://<ipaddress>,<port>', 
  CREDENTIAL = SQLCredName 
); 

CREATE EXTERNAL STREAM Stream_A 
WITH   
(  
    DATA_SOURCE = MyTargetSQLTable, 
    LOCATION = '<DatabaseName>.<SchemaName>.<TableName>' ,
   --Note: If table is contained in the database, <TableName> should be sufficient 
    OUTPUT_OPTIONS =  
      'REJECT_TYPE: Drop'
); 

示例 3 - KafkaExample 3 - Kafka

键入:输入Type: Input

语法:Syntax:

CREATE EXTERNAL DATA SOURCE MyKafka_tweets 
WITH 
( 
  --The location maps to KafkaBootstrapServer 
  LOCATION = 'kafka://<kafkaserver>:<ipaddress>', 
  CREDENTIAL = kafkaCredName 
); 
 
CREATE EXTERNAL FILE FORMAT myFileFormat 
WITH ( 
    FORMAT_TYPE = JSON, 
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
); 

CREATE EXTERNAL STREAM Stream_A (user_id VARCHAR, tweet VARCHAR) 
WITH   
(  
    DATA_SOURCE = MyKafka_tweets, 
    LOCATION = '<KafkaTopicName>', 
    FILE_FORMAT = myFileFormat,  
    INPUT_OPTIONS =  
      'PARTITIONS: 5'
); 

另请参阅See also