read_kinesis
streaming table-valued function
Applies to: Databricks SQL Databricks Runtime 13.3 LTS and above
Returns a table with records read from Kinesis from one or more streams.
Syntax
read_kinesis ( { parameter => value } [, ...] )
Arguments
read_kinesis
requires named parameter invocation.
The only required argument is streamName
. All other arguments are optional.
The descriptions of the arguments are brief here. For more details, see the Amazon Kinesis documentation.
There are various connection options to connect and authenticate with AWS.
awsAccessKey
, and awsSecretKey
can either be specified in the function arguments using the secret function, manually set in the arguments, or configured as environment variables as indicated below.
roleArn
, roleExternalID
, roleSessionName
can also be used to authenticate with AWS by using instance profiles.
If none of these are specified, it will use the default AWS provider chain.
Parameter | Type | Description |
---|---|---|
streamName |
STRING |
Required, comma-separated list of one or more kinesis streams. |
awsAccessKey |
STRING |
The AWS Access key, if any. Can also be specified through the various options supported through the AWS default credential provider chain including environment variables (AWS_ACCESS_KEY_ID ) and a credential profiles file. |
awsSecretKey |
STRING |
The secret key which corresponds to the access key. Can be specified either in the arguments or through the various options supported through the AWS default credential provider chain including environment variables (AWS_SECRET_KEY or AWS_SECRET_ACCESS_KEY ) and a credentials profiles file. |
roleArn |
STRING |
Amazon resource name of the role to assume when accessing Kinesis. |
roleExternalId |
STRING |
Used when delegating access to the AWS account. |
roleSessionName |
STRING |
AWS role session name. |
stsEndpoint |
STRING |
An endpoint for requesting temporary access credentials. |
region |
STRING |
Region for the streams to be specified. The default is the locally resolved region. |
endpoint |
STRING |
regional endpoint for Kinesis data streams. The default is the locally resolved region. |
initialPosition |
STRING |
Starting position for reading from in the stream. One of: 'latest' (default), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
One of: 'polling' (default), or 'EFO' (enhanced-fan-out). |
consumerName |
STRING |
The name of the consumer. All consumers are prefixed with 'databricks_'. The default is an empty string. |
registerConsumerTimeoutInterval |
STRING |
the max timeout to wait for the Kinesis EFO consumer to be registered with the Kinesis stream before throwing an error. The default is '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true to de-register the EFO consumer on query termination. Default is false . |
deregisterConsumerTimeoutInterval |
STRING |
The max timeout to wait for the Kinesis EFO consumer to be deregistered with the Kinesis stream before throwing an error. The default is '300s'. |
consumerRefreshInterval |
STRING |
The interval at which the consumer is checked and refreshed. The default is '300s'. |
The following arguments are used for controlling the read throughput and latency for Kinesis:
Parameter | Type | Description |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Optional, with a default of 10,000 records to be read per API request to Kinesis. |
maxFetchRate |
STRING |
How fast to prefetch data per shard. A value between '1.0' and '2.0' that's measured in MB/s. The default is '1.0'. |
minFetchPeriod |
STRING |
The maximum wait time between consecutive prefetch attempts. The default is '400ms'. |
maxFetchDuration |
STRING |
The maximum duration to buffer prefetched new data. The default is '10s'. |
fetchBufferSize |
STRING |
The amount of data for the next trigger. The default is '20gb'. |
shardsPerTask |
INTEGER (>0) |
The number of Kinesis shards to prefetch from in parallel per spark task. The default is 5. |
shardFetchinterval |
STRING |
How often to poll for resharding. The default is '1s'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
The threshold at which automatic coalesce occurs. The default is 10,000,000. |
coalesce |
BOOLEAN |
true to coalesce prefetched requests. The default is true . |
coalesceBinSize |
INTEGER (>0) |
The approximate block size after coalescing. The default is 128,000,000. |
reuseKinesisClient |
BOOLEAN |
true to reuse the Kinesis client stored in the cache. The default is true except on a PE cluster. |
clientRetries |
INTEGER (>0) |
The number of retries in the retry scenario. The default is 5. |
Returns
A table of Kinesis records with the following schema:
Name | Data type | Nullable | Standard | Description |
---|---|---|---|---|
partitionKey |
STRING |
No | A key that is used to distribute data among the shards of a stream. All data records with the same partition key will be read from the same shard. | |
data |
BINARY |
No | The kinesis data payload, base-64 encoded. | |
stream |
STRING |
No | The name of the stream where the data was read from. | |
shardId |
STRING |
No | A unique identifier for the shard where the data was read from. | |
sequenceNumber |
BIGINT |
No | The unique identifier of the record within its shard. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | The approximate time that the record was inserted into the stream. |
The columns (stream, shardId, sequenceNumber)
constitute a primary key.
Examples
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');