Azure Blob storage file source with Azure Queue Storage (legacy)
Important
This documentation has been retired and might not be updated. The products, services, or technologies mentioned in this content are no longer supported. See What is Auto Loader?.
The ABS-AQS connector provides an optimized file source that uses Azure Queue Storage (AQS) to find new files written to an Azure Blob storage (ABS) container without repeatedly listing all of the files. This provides two advantages:
- Lower latency: no need to list nested directory structures on ABS, which is slow and resource intensive.
- Lower costs: no more costly LIST API requests made to ABS.
Note
The ABS-AQS source deletes messages from the AQS queue as it consumes events. If you want other pipelines to consume messages from this queue, set up a separate AQS queue for the optimized reader. You can set up multiple Event Grid Subscriptions to publish to different queues.
Use the ABS-AQS file source
To use the ABS-AQS file source you must:
Set up ABS event notifications by leveraging Azure Event Grid Subscriptions and route them to AQS. See Reacting to Blob storage events.
Specify the
fileFormat
andqueueUrl
options and a schema. For example:spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
Authenticate with Azure Queue Storage and Blob storage
To authenticate with Azure Queue Storage and Blob storage, use Shared Access Signature (SAS) tokens or storage account keys. You must provide a connection string for the storage account where your queue is deployed that contains either your SAS token or access keys to your storage account. For more information, see Configure Azure Storage connection strings.
You will also need to provide access to your Azure Blob storage containers. See Connect to Azure Data Lake Storage Gen2 and Blob Storage for information on how to configure access to your Azure Blob storage container.
Note
We strongly recommend that you use Secrets for providing your connection strings.
Configuration
Option | Type | Default | Description |
---|---|---|---|
allowOverwrites | Boolean | true |
Whether a blob that gets overwritten should be reprocessed. |
connectionString | String | None (required param) | The connection string to access your queue. |
fetchParallelism | Integer | 1 | Number of threads to use when fetching messages from the queueing service. |
fileFormat | String | None (required param) | The format of the files such as parquet , json , csv , text , and so on. |
ignoreFileDeletion | Boolean | false |
If you have lifecycle configurations or you delete the source files manually, you must set this option to true . |
maxFileAge | Integer | 604800 | Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing. |
pathRewrites | A JSON string. | "{}" |
If you use mount points, you can rewrite the prefix of the container@storageAccount/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} , the path wasbs://myContainer@myStorageAccount.blob.core.chinacloudapi.cn/path/2017/08/fileA.json is rewritten todbfs:/mnt/data-warehouse/2017/08/fileA.json . |
queueFetchInterval | A duration string, for example, 2m for 2 minutes. |
"5s" |
How long to wait in between fetches if the queue is empty. Azure charges per API request to AQS. Therefore if data isn't arriving frequently, this value can be set to a long duration. As long as the queue is not empty, we will fetch continuously. If new files are created every 5 minutes, you might want to set a high queueFetchInterval to reduce AQS costs. |
queueName | String | None (required param) | The name of the AQS queue. |
If you observe a lot of messages in the driver logs that look like Fetched 0 new events and 3 old events.
, where you tend to observe a lot more old events than new, you should reduce the trigger interval of your stream.
If you are consuming files from a location on Blob storage where you expect that some files may be deleted before they can be processed, you can set the following configuration to ignore the error and continue processing:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
Frequently asked questions (FAQ)
If ignoreFileDeletion
is False (default) and the object has been deleted, will it fail the whole pipeline?
Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.
How should I set maxFileAge
?
Azure Queue Storage provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. The default setting for maxFileAge
is 7 days, which is equal to the maximum TTL of a message in the queue.