read_kinesis 流式表值函数

适用于:勾选“是” Databricks SQL 勾选“是” Databricks Runtime 13.3 LTS 及更高版本

返回一个表,其中包含从一个或多个流的 Kinesis 中读取的记录。

语法

read_kinesis ( { parameter => value } [, ...] )

参数

read_kinesis 需要命名参数调用

唯一必需的参数为 streamName。 所有其他参数都是可选的。

此处对参数的说明很简短。 有关详细信息,请参阅 Amazon Kinesis 文档。

可通过多个连接选项来连接 AWS 并对其进行身份验证。 可以使用 secret 函数在函数参数中指定 awsAccessKeyawsSecretKey,也可以在参数中手动设置,或按如下所示配置为环境变量。 roleArnroleExternalIDroleSessionName 也可通过使用实例配置文件用于对 AWS 进行身份验证。 如果未指定这些项,它将使用默认的 AWS 提供程序链。

参数 类型 描述
streamName STRING 一个或多个 Kinesis 流的必需逗号分隔列表。
awsAccessKey STRING AWS 访问密钥(如果有)。 还可以通过 AWS 默认凭据提供程序链支持的各种选项进行指定,包括环境变量(AWS_ACCESS_KEY_ID)和凭据配置文件。
awsSecretKey STRING 对应于访问密钥的密钥。 可以在参数中指定,也可以通过 AWS 默认凭据提供程序链支持的各种选项指定,包括环境变量(AWS_SECRET_KEYAWS_SECRET_ACCESS_KEY)以及凭据配置文件。
roleArn STRING 访问 Kinesis 时要承担的角色的 Amazon 资源名称。
roleExternalId STRING 在委派对 AWS 帐户的访问权限时使用。
roleSessionName STRING AWS 角色会话名称。
stsEndpoint STRING 用于请求临时访问凭据的终结点。
region STRING 要指定的流的区域。 默认值为本地解析区域。
endpoint STRING Kinesis 数据流的区域终结点。 默认值为本地解析区域。
initialPosition STRING 从流中读取的起始位置。 可取值为:“latest”(默认值)、“trim_horizon”、“earliest”、“at_timestamp”。
consumerMode STRING 可取值为:“polling”(默认值),或“EFO”(增强扇出)。
consumerName STRING 使用者的名称。 所有使用者都以“databricks_”为前缀。 默认值为空字符串。
registerConsumerTimeoutInterval STRING 在引发错误之前等待 Kinesis EFO 使用者注册到 Kinesis 流的最大超时。 默认值为“300s”。
requireConsumerDeregistration BOOLEAN true 则在查询终止时注册 EFO 使用者注册。 默认值为 false
deregisterConsumerTimeoutInterval STRING 在引发错误之前等待 Kinesis EFO 使用者在 Kinesis 流中取消注册的最大超时。 默认值为“300s”。
consumerRefreshInterval STRING 检查和刷新使用者的间隔。 默认值为“300s”。

以下参数用于控制 Kinesis 的读取吞吐量和延迟:

参数 类型 描述
maxRecordsPerFetch INTEGER (>0) 可选,默认值为 10,000,按 API 对 Kinesis 的请求读取的记录。
maxFetchRate STRING 每个分片预提取数据的速度。 以 MB/秒为单位的“1.0”和“2.0”之间的值。 默认值为“1.0”。
minFetchPeriod STRING 连续预提取尝试之间的最长等待时间。 默认值为“400ms”。
maxFetchDuration STRING 缓冲预提取的新数据的最长持续时间。 默认值为“10s”。
fetchBufferSize STRING 下一个触发器的数据量。 默认值为“20gb”。
shardsPerTask INTEGER (>0) 每个 Spark 任务要并行预提取的 Kinesis 分片数。 默认值为 5。
shardFetchinterval STRING 轮询以重新分片的频率。 默认值为“1s”。
coalesceThresholdBlockSize INTEGER (>0) 自动合并发生的阈值。 默认值为 10,000,000。
coalesce BOOLEAN true 则联合预提取的请求。 默认为 true
coalesceBinSize INTEGER (>0) 合并后的近似块大小。 默认值为 128,000,000。
reuseKinesisClient BOOLEAN true 则重用缓存中存储的 Kinesis 客户端。 默认值为 true(PE 群集上除外)。
clientRetries INTEGER (>0) 重试方案中的重试次数。 默认值为 5。

返回

具有以下架构的 Kinesis 记录表:

名称 数据类型 Nullable 标准 描述
partitionKey STRING 用于在流的分片之间分发数据的键。 具有相同分区键的所有数据记录将从同一分片读取。
data BINARY kinesis 数据有效负载,base-64 编码。
stream STRING 从中读取数据的流的名称。
shardId STRING 从中读取数据的分片的唯一标识符。
sequenceNumber BIGINT 记录分片中该记录的唯一标识符。
approximateArrivalTimestamp TIMESTAMP 将记录插入流中的大致时间。

(stream, shardId, sequenceNumber) 列构成主键。

示例

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret('test-databricks', 'awsAccessKey'),
        awsSecretKey => secret('test-databricks', 'awsSecretKey'),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');