read_pulsar
streaming table-valued function
Applies to: Databricks SQL Databricks Runtime 14.1 and above
Important
This feature is in Public Preview.
Returns a table with records read from Pulsar.
This table-valued function only supports streaming and not batch query.
Syntax
read_pulsar ( { option_key => option_value } [, ...] )
Arguments
This function requires named parameter invocation for the option keys.
The options serviceUrl
and topic
are mandatory.
The descriptions of the arguments are brief here. See structured streaming Pulsar documentation for extended descriptions.
Option | Type | Default | Description |
---|---|---|---|
serviceUrl | STRING | Mandatory | The URI of the Pulsar service. |
topic | STRING | Mandatory | The topic to read from. |
predefinedSubscription | STRING | None | The predefined subscription name used by the connector to track spark application progress. |
subscriptionPrefix | STRING | None | A prefix used by the connector to generate a random subscription to track spark application progress. |
pollTimeoutMs | LONG | 120000 | The timeout for reading messages from Pulsar in milliseconds. |
failOnDataLoss | BOOLEAN | true | Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy). |
startingOffsets | STRING | latest | The start point when a query is started, either earliest, latest, or a JSON string that specifies a specific offset. If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset. |
startingTime | STRING | None | When specified, Pulsar source will read messages starting from the position of the specified startingTime. |
The following arguments are used for authentication of the pulsar client:
Option | Type | Default | Description |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | None | Name of the authentication plugin. |
pulsarClientAuthParams | STRING | None | Parameters for the authentication plugin. |
pulsarClientUseKeyStoreTls | STRING | None | Whether to use KeyStore for tls authentication. |
pulsarClientTlsTrustStoreType | STRING | None | TrustStore file type for tls authentication. |
pulsarClientTlsTrustStorePath | STRING | None | TrustStore file path for tls authentication. |
pulsarClientTlsTrustStorePassword | STRING | None | TrustStore password for tls authentication. |
These arguments are used for configuration and authentication of pulsar admission control, pulsar admin configuration is only required when admission control is enabled(when maxBytesPerTrigger is set)
Option | Type | Default | Description |
---|---|---|---|
maxBytesPerTrigger | BIGINT | None | A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified. |
adminUrl | STRING | None | The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified. |
pulsarAdminAuthPlugin | STRING | None | Name of the authentication plugin. |
pulsarAdminAuthParams | STRING | None | Parameters for the authentication plugin. |
pulsarClientUseKeyStoreTls | STRING | None | Whether to use KeyStore for tls authentication. |
pulsarAdminTlsTrustStoreType | STRING | None | TrustStore file type for tls authentication. |
pulsarAdminTlsTrustStorePath | STRING | None | TrustStore file path for tls authentication. |
pulsarAdminTlsTrustStorePassword | STRING | None | TrustStore password for tls authentication. |
Returns
A table of pulsar records with the following schema.
__key STRING NOT NULL
: Pulsar message key.value BINARY NOT NULL
: Pulsar message value.Note: For topics with Avro or JSON schema, instead of loading content into a binary value field, the content will be expanded to preserve the field names and field types of the Pulsar topic.
__topic STRING NOT NULL
: Pulsar topic name.__messageId BINARY NOT NULL
: Pulsar message id.__publishTime TIMESTAMP NOT NULL
: Pulsar message publish time.__eventTime TIMESTAMP NOT NULL
: Pulsar message event time.__messageProperties MAP<STRING, STRING>
: Pulsar message properties.
Examples
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.