read_kafka table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Reads data from an Apache Kafka cluster and returns the data in tabular form.

Can read data from one or more Kafka topics. It supports both batch queries and streaming ingestion.

Syntax

read_kafka([option_key => option_value ] [, ...])

Arguments

This function requires named parameter invocation.

  • option_key: The name of the option to configure. You must use backticks (`) for options that contain dots (.).
  • option_value: A constant expression to set the option. Accepts literals and scalar functions.

Returns

Records read from an Apache Kafka cluster with the following schema:

  • key BINARY: The key of the Kafka record.
  • value BINARY NOT NULL: The value of the Kafka record.
  • topic STRING NOT NULL: The name of the Kafka topic the record is read from.
  • partition INT NOT NULL: The ID of the Kafka partition the record is read from.
  • offset BIGINT NOT NULL: The offset number of the record in the Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: A timestamp value for the record. The timestampType column defines what this timestamp corresponds to.
  • timestampType INTEGER NOT NULL: The type of the timestamp specified in the timestamp column.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Header values provided as part of the record (if enabled).

Examples

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Options

You can find a detailed list of options in the Apache Spark documentation.

Required options

Provide the option below for connecting to your Kafka cluster.

Option
bootstrapServers

Type: String

A comma-separated list of host/port pairs pointing to Kafka cluster.

Default value: None

Provide only one of the options below to configure which Kafka topics to pull data from.

Option
assign

Type: String

A JSON string that contains the specific topic-partitions to consume from. For example, for '{"topicA":[0,1],"topicB":[2,4]}', topicA's 0'th and 1st partitions will be consumed from.

Default value: None
subscribe

Type: String

A comma-separated list of Kafka topics to read from.

Default value: None
subscribePattern

Type: String

A regular expression matching topics to subscribe to.

Default value: None

Miscellaneous options

read_kafka can be used in batch queries and in streaming queries. The options below specify which type of query they apply to.

Option
endingOffsets

Type: String Query Type: batch only

The offsets to read until for a batch query, either "latest" to specify the latest records, or a JSON string specifying an ending offset for each TopicPartition. In the JSON, -1 as an offset can be used to refer to latest. -2 (earliest) as an offset is not allowed.

Default value: "latest"
endingOffsetsByTimestamp

Type: String Query Type: batch only

A JSON string specifying an ending timestamp to read until for each TopicPartition. The timestamps need to be provided as a long value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC, for example
1686444353000. See note below on details of behavior with timestamps.
endingOffsetsByTimestamp takes precedence over endingOffsets.

Default value: None
endingTimestamp

Type: String Query Type: batch only

A string value of the timestamp in milliseconds since
1970-01-01 00:00:00 UTC, for example "1686444353000". If Kafka doesn't return the matched offset, the offset will be set to latest. See note below on details of behavior with timestamps. Note: endingTimestamp takes precedence over endingOffsetsByTimestamp and
endingOffsets.

Default value: None
includeHeaders

Type: Boolean Query Type: streaming and batch

Whether to include the Kafka headers in the row.

Default value: false
kafka.<consumer_option>

Type: String Query Type: streaming and batch

Any Kafka consumer specific options can be passed in with the kafka. prefix. These options need to be surrounded by backticks when provided, otherwise you will get a parser error. You can find the options in the Kafka documentation.

Note: You should not set the following options with this function:
key.deserializer, value.deserializer, bootstrap.servers, group.id

Default value: None
maxOffsetsPerTrigger

Type: Long Query Type: streaming only

Rate limit on the maximum number of offsets or rows processed per trigger interval. The specified total number of offsets will be proportionally split across TopicPartitions.

Default value: None
startingOffsets

Type: String Query Type: streaming and batch

The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a JSON string specifying a starting offset for each TopicPartition. In the JSON, -2 as an offset can be used to refer to earliest, -1 to latest.

Note: For batch queries, latest (either implicitly or by using -1 in JSON) is not allowed. For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest.

Default value: "latest" for streaming, "earliest" for batch
startingOffsetsByTimestamp

Type: String Query Type: streaming and batch

A JSON string specifying a starting timestamp for each TopicPartition. The timestamps need to be provided as a long value of the timestamp in milliseconds since 1970-01-01 00:00:00 UTC, for example 1686444353000. See note below on details of behavior with timestamps. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp takes precedence over startingOffsets.

Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start at earliest.

Default value: None
startingOffsetsByTimestampStrategy

Type: String Query Type: streaming and batch

This strategy is used when the specified starting offset by timestamp (either global or per partition) doesn't match with the offset Kafka returned. The available strategies are:

* "error": fail the query
* "latest": assigns the latest offset for these partitions so that Spark can read newer records from these partitions in later micro-batches.

Default value: "error"
startingTimestamp

Type: String Query Type: streaming and batch

A string value of the timestamp in milliseconds since
1970-01-01 00:00:00 UTC, for example "1686444353000". See note below on details of behavior with timestamps. If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy.
startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

Note: For streaming queries, this only applies when a new query is started. Restarted streaming queries will continue from the offsets defined in the query checkpoint. Newly discovered partitions during a query will start earliest.

Default value: None

Note

The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. The behavior varies across options if Kafka doesn't return the matched offset - check the description of each option.

Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value. For more details on KafkaConsumer.offsetsForTimes, please refer to the documentation. Also, the meaning of timestamp here can vary according to the Kafka configuration (log.message.timestamp.type). For details, see Apache Kafka documentation.