将优化的 Azure Blob 存储文件源与 Azure 队列存储配合使用Optimized Azure Blob storage file source with Azure Queue Storage

Databricks ABS-AQS 连接器使用 Azure 队列存储 (AQS) 提供优化的文件源,允许你查找写入到 Azure Blob 存储 (ABS) 容器的新文件,而无需重复列出所有文件。The Databricks ABS-AQS connector uses Azure Queue Storage (AQS) to provide an optimized file source that lets you find new files written to an Azure Blob storage (ABS) container without repeatedly listing all of the files. 这有两个主要优点:This provides two major advantages:

  • 延迟较低:无需列出 ABS 上嵌套的目录结构,此列出操作速度很慢且会消耗大量资源。Lower latency: no need to list nested directory structures on ABS, which is slow and resource intensive.
  • 成本较低:不再向 ABS 发出成本很高的 LIST API 请求。Lower costs: no more costly LIST API requests made to ABS.


  • 当事件被确认时,ABS-AQS 源将从 AQS 队列中删除消息。The ABS-AQS source deletes messages from the AQS queue as events are acknowledged. 如果要让其他管道使用同一队列中的内容,请为经过优化的读取器设置单独的 AQS 队列。If you want to have other pipelines consuming from the same queue, set up a separate AQS queue for the optimized reader. 你可以设置多个可发布到不同队列的事件网格订阅。You can set up multiple Event Grid Subscriptions to publish to different queues.
  • ABS-AQS 源仅支持加载存储在 Azure Blob 存储中的文件。The ABS-AQS source supports only loading files stored on Azure Blob storage.

使用 ABS-AQS 文件源Use the ABS-AQS file source

若要使用 ABS-AQS 文件源,必须执行以下操作:To use the ABS-AQS file source you must:

  • 使用 Azure 事件网格订阅设置 ABS 事件通知,并将其路由到 AQS。Set up ABS event notifications by leveraging Azure Event Grid Subscriptions and route them to AQS. 参阅对 Blob 存储事件做出反应See Reacting to Blob storage events.

  • 指定 fileFormatqueueUrl 选项以及架构。Specify the fileFormat and queueUrl options and a schema. 例如: 。For example:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \

使用 Azure 队列存储和 Blob 存储进行身份验证Authenticate with Azure Queue Storage and Blob storage

若要使用 Azure 队列存储和 Blob 存储进行身份验证,需要使用共享访问签名 (SAS) 令牌或存储帐户密钥。In order to authenticate with Azure Queue Storage and Blob storage, you will need to use Shared Access Signature (SAS) tokens or storage account keys. 需要为在其中部署了队列的存储帐户提供一个连接字符串,其中会包含你的存储帐户的 SAS 令牌或访问密钥。You will need to provide a connection string for the storage account where your queue is deployed which will contain either your SAS token or access keys to your storage account. 有关详细信息,请参阅配置 Azure 存储连接字符串Refer to Configure Azure Storage connection strings for more information.

你还需要提供对 Azure Blob 存储容器的访问权限。You will also need to provide access to your Azure Blob storage containers. 若要了解如何配置对 Azure Blob 存储容器的访问权限,请参阅 Azure Blob 存储Refer to Azure Blob storage for information on how to configure access to your Azure Blob storage container.


强烈建议你使用机密来提供连接字符串。We strongly recommend that you use Secrets for providing your connection strings.


选项Option 类型Type 默认Default 说明Description
queueNamequeueName 字符串String 无(必需参数)None (required param) AQS 队列的名称。The name of the AQS queue.
fileFormatfileFormat 字符串String 无(必需参数)None (required param) 文件的格式,例如 parquetjsoncsvtext,等等。The format of the files such as parquet, json, csv, text, and so on.
connectionStringconnectionString 字符串String 无(必需参数)None (required param) 用来访问队列的连接字符串The connection string to access your queue.
queueFetchIntervalqueueFetchInterval 持续时间字符串,例如,2m 表示 2 分钟。A duration string, for example, 2m for 2 minutes. "5s" 当队列为空时,在两次提取之间等待的时长。How long to wait in between fetches if the queue is empty. Azure 按照向 AQS 发出的 API 请求收费。Azure charges per API request to AQS. 因此,如果数据不是频繁到达,则可以将此值设置为较长的持续时间。Therefore if data isn’t arriving frequently, this value can be set to a long duration. 只要队列不为空,我们就会连续提取。As long as the queue is not empty, we will fetch continuously. 如果每 5 分钟创建一次新文件,则可能需要设置较高的 queueFetchInterval 以降低 AQS 成本。If new files are created every 5 minutes, you might want to set a high queueFetchInterval to reduce AQS costs.
pathRewritespathRewrites JSON 字符串。A JSON string. "{}" 如果使用装入点,则可以使用装入点重写 container@storageAccount/key 路径的前缀。If you use mount points, you can rewrite the prefix of the container@storageAccount/key path with the mount point. 只能重写前缀。Only prefixes can be rewritten. 例如,对于配置 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"},路径 wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json 被重写为For example, for the configuration {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}, the path wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json is rewritten to
ignoreFileDeletionignoreFileDeletion 布尔Boolean false 如果你进行了生命周期配置或手动删除了源文件,则必须将此选项设置为 trueIf you have lifecycle configurations or you delete the source files manually, you must set this option to true.
maxFileAgemaxFileAge IntegerInteger 604800604800 确定将文件通知作为状态存储多长时间(以秒为单位)以防止重复处理。Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing.
allowOverwritesallowOverwrites 布尔Boolean true 是否应重新处理重写的 blob。Whether a blob that gets overwritten should be reprocessed.

如果在驱动程序日志中看到类似于 Fetched 0 new events and 3 old events. 的大量消息,则在这种情况下,你通常会看到比新事件更多的旧事件,应缩短流的触发间隔。If you observe a lot of messages in the driver logs that look like Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new, you should reduce the trigger interval of your stream.

如果要从 Blob 存储中的某个位置使用文件,而这些文件可能会在处理之前被删除,则可以设置以下配置以忽略错误并继续处理:If you are consuming files from a location on Blob storage where you expect that some files may be deleted before they can be processed, you can set the following configuration to ignore the error and continue processing:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

常见问题 (FAQ)Frequently asked questions (FAQ)

如果 ignoreFileDeletion 为 False(默认值),并且对象已被删除,这是否会导致整个管道故障?If ignoreFileDeletion is False (default) and the object has been deleted, will it fail the whole pipeline?

是的,如果收到一个事件,指出该文件已被删除,则会导致整个管道故障。Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.

应当如何设置 maxFileAgeHow should I set maxFileAge?

Azure 队列存储提供“至少一次”消息传送语义,因此,我们需要保留状态以进行重复数据删除。Azure Queue Storage provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. maxFileAge 的默认设置为 7 天,这等于队列中消息的最大 TTL。The default setting for maxFileAge is 7 days, which is equal to the maximum TTL of a message in the queue.