具有 Azure 队列存储(旧版)的 Azure Blob 存储文件源
重要
本文档已过时,将来可能不会更新。 本内容中提及的产品、服务或技术不再受支持。 请参阅什么是自动加载程序?。
ABS-AQS 连接器提供优化的文件源,使用 Azure 队列存储 (AQS) 查找写入到 Azure Blob 存储 (ABS) 容器的新文件,而无需重复列出所有文件。 这有两个优点:
- 延迟较低:无需列出 ABS 上嵌套的目录结构,此列出操作速度很慢且会消耗大量资源。
- 成本较低:不再向 ABS 发出成本很高的 LIST API 请求。
注意
ABS-AQS 源在使用事件时会从 AQS 队列中删除消息。 如果希望其他管道使用此队列中的消息,请为经过优化的读取器设置单独的 AQS 队列。 你可以设置多个可发布到不同队列的事件网格订阅。
使用 ABS-AQS 文件源
若要使用 ABS-AQS 文件源,必须执行以下操作:
使用 Azure 事件网格订阅设置 ABS 事件通知,并将其路由到 AQS。 参阅对 Blob 存储事件做出反应。
指定
fileFormat
和queueUrl
选项以及架构。 例如: 。spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
使用 Azure 队列存储和 Blob 存储进行身份验证
若要使用 Azure 队列存储和 Blob 存储进行身份验证,请使用共享访问签名 (SAS) 令牌或存储帐户密钥。 必须为在其中部署了队列的存储帐户提供一个连接字符串,其中包含你的存储帐户的 SAS 令牌或访问密钥。 有关详细信息,请参阅配置 Azure 存储连接字符串。
你还需要提供对 Azure Blob 存储容器的访问权限。 有关如何配置对 Azure Blob 存储容器访问权限的信息,请参阅连接到 Azure Data Lake Storage Gen2 和 Blob 存储。
注意
强烈建议你使用机密来提供连接字符串。
配置
选项 | 类型 | 默认 | 说明 |
---|---|---|---|
allowOverwrites | 布尔 | true |
是否应重新处理重写的 blob。 |
connectionString | 字符串 | 无(必需参数) | 用来访问队列的连接字符串。 |
fetchParallelism | Integer | 1 | 从队列服务中提取消息时要使用的线程数。 |
fileFormat | 字符串 | 无(必需参数) | 文件的格式,例如 parquet 、json 、csv 、text ,等等。 |
ignoreFileDeletion | 布尔 | false |
如果你进行了生命周期配置或手动删除了源文件,则必须将此选项设置为 true 。 |
maxFileAge | Integer | 604800 | 确定将文件通知作为状态存储多长时间(以秒为单位)以防止重复处理。 |
pathRewrites | JSON 字符串。 | "{}" |
如果使用装入点,则可以使用装入点重写 container@storageAccount/key 路径的前缀。 只能重写前缀。 例如,对于配置 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} ,路径 wasbs://myContainer@myStorageAccount.blob.core.chinacloudapi.cn/path/2017/08/fileA.json 被重写为dbfs:/mnt/data-warehouse/2017/08/fileA.json 。 |
queueFetchInterval | 持续时间字符串,例如,2m 表示 2 分钟。 |
"5s" |
当队列为空时,在两次提取之间等待的时长。 Azure 按照向 AQS 发出的 API 请求收费。 因此,如果数据不是频繁到达,则可以将此值设置为较长的持续时间。 只要队列不为空,我们就会连续提取。 如果每 5 分钟创建一次新文件,则可能需要设置较高的 queueFetchInterval 以降低 AQS 成本。 |
queueName | 字符串 | 无(必需参数) | AQS 队列的名称。 |
如果在驱动程序日志中看到类似于 Fetched 0 new events and 3 old events.
的大量消息,则在这种情况下,你通常会看到比新事件更多的旧事件,应缩短流的触发间隔。
如果要从 Blob 存储中的某个位置使用文件,而这些文件可能会在处理之前被删除,则可以设置以下配置以忽略错误并继续处理:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
常见问题 (FAQ)
如果 ignoreFileDeletion
为 False(默认值),并且对象已被删除,这是否会导致整个管道故障?
是的,如果收到一个事件,指出该文件已被删除,则会导致整个管道故障。
应当如何设置 maxFileAge
?
Azure 队列存储提供“至少一次”消息传送语义,因此,我们需要保留状态以进行重复数据删除。 maxFileAge
的默认设置为 7 天,这等于队列中消息的最大 TTL。