什么是自动加载程序文件通知模式?
在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 使用大型输入目录或大量文件时,文件通知模式相对于目录列表模式而言具有更高的性能和可伸缩性,但需要额外的云权限。
可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。
警告
文件通知模式不支持更改自动加载程序的源路径。 如果使用文件通知模式并更改了路径,则可能无法引入目录更新时新目录中已存在的文件。
自动加载程序文件通知模式中使用的云资源
重要
需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:
如果将 cloudFiles.useNotifications
选项设置为 true
并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。
下表汇总了自动加载程序创建的资源。
云存储 | 订阅服务 | 队列服务 | 前缀 * | 限制 ** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | 每个 S3 存储桶 100 个 |
ADLS Gen2 | Azure 事件网格 | Azure Queue Storage | databricks | 每存储帐户 500 |
Azure Blob 存储 | Azure 事件网格 | Azure Queue Storage | databricks | 每存储帐户 500 |
- 自动加载程序使用此前缀命名资源。
** 可以启动多少个并发文件通知管道
如果需要为给定存储帐户运行超过有限数量的文件通知管道,则可以:
- 利用 AWS Lambda 或 Azure Functions 之类的服务,将来自单个队列(此队列侦听整个容器或 Bucket)的通知扇出,放入特定于目录的队列
文件通知事件
当文件上传到 S3 存储桶时,AWS S3 都会提供一个 ObjectCreated
事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。
ADLS Gen2 为 Gen2 容器中显示的文件提供不同的事件通知。
- 自动加载程序会侦听
FlushWithClose
事件,以便处理某个文件。 - 自动加载程序流支持用于发现文件的
RenameFile
操作。RenameFile
操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。 - 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持
RenameDirectory
操作,以便发现文件。RenameDirectory
操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。
注意
云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval
选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。
为 ADLS Gen2 和 Azure Blob 存储配置文件通知所需的权限
你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储。
若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。 在 Databricks Runtime 8.1 及更高版本中,只需要用于身份验证的服务主体。 对于 Databricks Runtime 8.0 及更低版本,则必须提供一个服务主体和一个连接字符串。
服务主体 - 使用 Azure 内置角色
以客户端 ID 和客户端密码的形式创建 Microsoft Entra ID(前 Azure Active Directory)应用和服务主体。
为此应用分配输入路径所在的存储帐户的以下角色:
- 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
- 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,Databricks Runtime 8.1 及更高版本中才需要此角色。
为此应用分配相关资源组的以下角色:
- EventGrid EventSubscription 参与者:此角色用于执行事件网格订阅操作,例如创建或列出事件订阅。
有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。
服务主体 - 使用自定义角色
如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
然后,可以将此自定义角色分配给应用。
有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。
连接字符串
自动加载程序需要使用连接字符串对 Azure 队列存储操作(例如,创建队列以及从队列中读取和删除消息)进行身份验证。 队列是在输入目录路径所在的同一存储帐户中创建的。 你可以在帐户密钥或共享访问签名 (SAS) 中找到连接字符串。
如果使用 Databricks Runtime 8.1 或更高版本,则不需要连接字符串。
如果使用 Databricks Runtime 8.0 或更低版本,则必须提供一个连接字符串,用于对 Azure 队列存储操作(例如创建队列以及在队列中检索和删除消息)进行身份验证。 队列是在输入目录路径所在的同一存储帐户中创建的。 你可以在帐户密钥或共享访问签名 (SAS) 中找到连接字符串。 配置 SAS 令牌时,必须提供以下权限:
排查常见错误
错误:
java.lang.RuntimeException: Failed to create event grid subscription.
如果你第一次运行自动加载程序时看到此错误消息,可能是事件网格未在 Azure 订阅中注册为资源提供程序。 若要在 Azure 门户中注册它,请执行以下操作:
- 转到你的订阅。
- 单击“设置”部分的“资源提供程序”。
- 注册提供程序
Microsoft.EventGrid
。
错误:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
如果你第一次运行自动加载程序时看到此错误消息,请确保已向你的事件网格和存储帐户的服务主体授予了“参与者”角色。
为 AWS S3 配置文件通知所需的权限
你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息。
若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
其中:
<bucket-name>
:流将在其中读取文件的 S3 存储桶名称,例如,auto-logs
。 可以使用*
作为通配符,例如databricks-*-logs
。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行%fs mounts
列出笔记本中的所有 DBFS 装入点。<region>
:S3 存储桶所在的 AWS 区域,例如us-west-2
。 如果你不想要指定区域,请使用*
。<account-number>
:拥有 S3 存储桶的 AWS 帐号,例如123456789012
。 如果你不想要指定帐号,请使用*
。
SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*
是 cloudFiles
源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。
注意
上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果你需要添加 S3 只读权限,请在 JSON 文档的 DatabricksAutoLoaderSetup
语句的 Action
列表中添加以下内容:
s3:ListBucket
s3:GetObject
初始设置后权限减少
上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。
重要
权限降低后,无法在出现故障时(例如,SQS 队列被意外删除)启动新的流式处理查询或重新创建资源;也无法使用云资源管理 API 来列出或拆卸资源。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
手动配置或管理文件通知资源
特权用户可以手动配置或管理文件通知资源。
- 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项。
- 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
使用 setUpNotificationServices(<resource-suffix>)
创建名为 <prefix>-<resource-suffix>
的队列和订阅(前缀取决于自动加载程序文件通知模式中使用的云资源中总结的存储系统)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用文件通知选项中的标识符将该标识符传递给 cloudFiles
源。 这使得 cloudFiles
源用户拥有的权限少于创建资源的用户的权限。
只有调用 setUpNotificationServices
时才需要提供 newManager
的 "path"
选项;对于 listNotificationServices
或 tearDownNotificationServices
,则不需要提供。 这是你运行流式处理查询时使用的同一 path
。
以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:
云存储 | 安装程序 API | 列出 API | 拆解 API |
---|---|---|---|
AWS S3 | 所有版本 | 所有版本 | 所有版本 |
ADLS Gen2 | 所有版本 | 所有版本 | 所有版本 |
Azure Blob 存储 | 所有版本 | 所有版本 | 所有版本 |