什么是自动加载程序文件通知模式?

在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 使用大型输入目录或大量文件时,文件通知模式相对于目录列表模式而言具有更高的性能和可伸缩性,但需要额外的云权限。

可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。

注意

Azure 高级存储帐户不支持文件通知模式,因为高级帐户不支持队列存储。

警告

文件通知模式不支持更改自动加载程序的源路径。 如果使用文件通知模式并更改了路径,则可能无法引入目录更新时新目录中已存在的文件。

自动加载程序文件通知模式中使用的云资源

重要

需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:

如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。

下表汇总了自动加载程序创建的资源。

云存储 订阅服务 队列服务 前缀 * 限制 **
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 每个 S3 存储桶 100 个
ADLS Gen2 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
Azure Blob 存储 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
  • 自动加载程序使用此前缀命名资源。

** 可以启动多少个并发文件通知管道

如果需要为给定存储帐户运行超过有限数量的文件通知管道,则可以:

  • 利用 AWS Lambda 或 Azure Functions 之类的服务,将来自单个队列(此队列侦听整个容器或 Bucket)的通知扇出,放入特定于目录的队列

文件通知事件

当文件上传到 S3 存储桶时,AWS S3 都会提供一个 ObjectCreated 事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。

ADLS Gen2 为 Gen2 容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 自动加载程序流支持用于发现文件的 RenameFile 操作。 RenameFile 操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory 操作,以便发现文件。 RenameDirectory 操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

为 ADLS Gen2 和 Azure Blob 存储配置文件通知所需的权限

你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储

若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。 只需创建用于身份验证的服务主体。

  • 服务主体 - 使用 Azure 内置角色

    以客户端 ID 和客户端密码的形式创建 Microsoft Entra ID(前 Azure Active Directory)应用和服务主体

    为此应用分配输入路径所在的存储帐户的以下角色:

    • 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
    • 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,才需要此角色。

    为此应用分配相关资源组的以下角色:

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 服务主体 - 使用自定义角色

    如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    然后,可以将此自定义角色分配给应用。

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

自动加载程序权限

排查常见错误

错误:

java.lang.RuntimeException: Failed to create event grid subscription.

如果你第一次运行自动加载程序时看到此错误消息,可能是事件网格未在 Azure 订阅中注册为资源提供程序。 若要在 Azure 门户中注册它,请执行以下操作:

  1. 转到你的订阅。
  2. 单击“设置”部分的“资源提供程序”。
  3. 注册提供程序 Microsoft.EventGrid

错误:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

如果你第一次运行自动加载程序时看到此错误消息,请确保已向你的事件网格和存储帐户的服务主体授予了“参与者”角色。

为 AWS S3 配置文件通知所需的权限

你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息

若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

其中:

  • <bucket-name>:流将在其中读取文件的 S3 存储桶名称,例如,auto-logs。 可以使用 * 作为通配符,例如 databricks-*-logs。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行 %fs mounts 列出笔记本中的所有 DBFS 装入点。
  • <region>:S3 存储桶所在的 AWS 区域,例如 cn-north-2。 如果你不想要指定区域,请使用 *
  • <account-number>:拥有 S3 存储桶的 AWS 帐号,例如 123456789012。 如果你不想要指定帐号,请使用 *

SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*cloudFiles 源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。

注意

上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果你需要添加 S3 只读权限,请在 JSON 文档的 DatabricksAutoLoaderSetup 语句的 Action 列表中添加以下内容:

  • s3:ListBucket
  • s3:GetObject

初始设置后权限减少

上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。

重要

权限降低后,无法在出现故障时(例如,SQS 队列被意外删除)启动新的流式处理查询或重新创建资源;也无法使用云资源管理 API 来列出或拆卸资源。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

手动配置或管理文件通知资源

特权用户可以手动配置或管理文件通知资源。

  • 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项
  • 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:

注意

必须要具有适当权限才能配置或修改云基础结构。 请参阅 AzureS3GCS 的权限文档。

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 创建名为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于自动加载程序文件通知模式中使用的云资源中总结的存储系统)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用文件通知选项中的标识符将该标识符传递给 cloudFiles 源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。

只有调用 setUpNotificationServices 时才需要提供 newManager"path" 选项;对于 listNotificationServicestearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path

以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:

云存储 安装程序 API 列出 API 拆解 API
AWS S3 所有版本 所有版本 所有版本
ADLS Gen2 所有版本 所有版本 所有版本
Azure Blob 存储 所有版本 所有版本 所有版本