具有 Azure 队列存储(旧版)的 Azure Blob 存储文件源

重要

本文档已过时,将来可能不会更新。 本内容中提及的产品、服务或技术不再受支持。 请参阅什么是自动加载程序?

ABS-AQS 连接器提供优化的文件源,使用 Azure 队列存储 (AQS) 查找写入到 Azure Blob 存储 (ABS) 容器的新文件,而无需重复列出所有文件。 这有两个优点:

  • 延迟较低:无需列出 ABS 上嵌套的目录结构,此列出操作速度很慢且会消耗大量资源。
  • 成本较低:不再向 ABS 发出成本很高的 LIST API 请求。

注意

ABS-AQS 源在使用事件时会从 AQS 队列中删除消息。 如果希望其他管道使用此队列中的消息,请为经过优化的读取器设置单独的 AQS 队列。 你可以设置多个可发布到不同队列的事件网格订阅。

使用 ABS-AQS 文件源

若要使用 ABS-AQS 文件源,必须执行以下操作:

  • 使用 Azure 事件网格订阅设置 ABS 事件通知,并将其路由到 AQS。 参阅对 Blob 存储事件做出反应

  • 指定 fileFormatqueueUrl 选项以及架构。 例如: 。

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

使用 Azure 队列存储和 Blob 存储进行身份验证

若要使用 Azure 队列存储和 Blob 存储进行身份验证,请使用共享访问签名 (SAS) 令牌或存储帐户密钥。 必须为在其中部署了队列的存储帐户提供一个连接字符串,其中包含你的存储帐户的 SAS 令牌或访问密钥。 有关详细信息,请参阅配置 Azure 存储连接字符串

你还需要提供对 Azure Blob 存储容器的访问权限。 有关如何配置对 Azure Blob 存储容器访问权限的信息,请参阅连接到 Azure Data Lake Storage Gen2 和 Blob 存储

注意

强烈建议你使用机密来提供连接字符串。

配置

选项 类型 默认 说明
allowOverwrites 布尔 true 是否应重新处理重写的 blob。
connectionString 字符串 无(必需参数) 用来访问队列的连接字符串
fetchParallelism Integer 1 从队列服务中提取消息时要使用的线程数。
fileFormat 字符串 无(必需参数) 文件的格式,例如 parquetjsoncsvtext,等等。
ignoreFileDeletion 布尔 false 如果你进行了生命周期配置或手动删除了源文件,则必须将此选项设置为 true
maxFileAge Integer 604800 确定将文件通知作为状态存储多长时间(以秒为单位)以防止重复处理。
pathRewrites JSON 字符串。 "{}" 如果使用装入点,则可以使用装入点重写 container@storageAccount/key 路径的前缀。 只能重写前缀。 例如,对于配置 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"},路径 wasbs://myContainer@myStorageAccount.blob.core.chinacloudapi.cn/path/2017/08/fileA.json 被重写为
dbfs:/mnt/data-warehouse/2017/08/fileA.json
queueFetchInterval 持续时间字符串,例如,2m 表示 2 分钟。 "5s" 当队列为空时,在两次提取之间等待的时长。 Azure 按照向 AQS 发出的 API 请求收费。 因此,如果数据不是频繁到达,则可以将此值设置为较长的持续时间。 只要队列不为空,我们就会连续提取。 如果每 5 分钟创建一次新文件,则可能需要设置较高的 queueFetchInterval 以降低 AQS 成本。
queueName 字符串 无(必需参数) AQS 队列的名称。

如果在驱动程序日志中看到类似于 Fetched 0 new events and 3 old events. 的大量消息,则在这种情况下,你通常会看到比新事件更多的旧事件,应缩短流的触发间隔。

如果要从 Blob 存储中的某个位置使用文件,而这些文件可能会在处理之前被删除,则可以设置以下配置以忽略错误并继续处理:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

常见问题 (FAQ)

如果 ignoreFileDeletion 为 False(默认值),并且对象已被删除,这是否会导致整个管道故障?

是的,如果收到一个事件,指出该文件已被删除,则会导致整个管道故障。

应当如何设置 maxFileAge

Azure 队列存储提供“至少一次”消息传送语义,因此,我们需要保留状态以进行重复数据删除。 maxFileAge 的默认设置为 7 天,这等于队列中消息的最大 TTL。