使用 Python 通过异步计划复制 Blob

本文介绍了如何使用适用于 Python 的 Azure 存储客户端库通过异步计划复制 Blob。 你可以从同一存储帐户中的源、不同存储帐户中的源或从通过 HTTP GET 请求在给定 URL 上检索的任何可访问对象复制 Blob。 还可以中止挂起的复制操作。

本文中介绍的客户端库方法将使用 Copy Blob REST API 操作,并且可以在想要基于异步计划执行复制时使用。 对于要将数据移动到存储帐户且具有源对象 URL 的大多数复制方案,请参阅使用 Python 从源对象 URL 复制 Blob

先决条件

设置你的环境

如果没有现有项目,请查看本部分,其中介绍如何设置项目来使用适用于 Python 的 Azure Blob 存储客户端库。 有关更多详细信息,请参阅 Azure Blob 存储和 Python 入门

要使用本文中的代码示例,请按照以下步骤设置项目。

安装包

使用 pip install 安装以下包:

pip install azure-storage-blob azure-identity

添加 import 语句

添加以下 import 语句:

import datetime
from azure.identity import DefaultAzureCredential
from azure.storage.blob import (
    BlobServiceClient,
    BlobClient,
    BlobLeaseClient,
    BlobSasPermissions,
    generate_blob_sas
)

授权

授权机制必须具有执行复制操作或中止挂起的复制的必要权限。 对于使用 Microsoft Entra ID的授权(建议),最小特权 Azure RBAC 内置角色因多种因素而异。 若要了解详细信息,请参阅复制 Blob (REST API)中止复制 Blob (REST API) 的授权指导。

创建客户端对象

若要将应用连接到 Blob 存储,请创建 BlobServiceClient 的实例。 以下示例演示如何使用 DefaultAzureCredential 创建客户端对象进行授权:

# TODO: Replace <storage-account-name> with your actual storage account name
account_url = "https://<storage-account-name>.blob.core.chinacloudapi.cn"
credential = DefaultAzureCredential()

# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(account_url, credential=credential)

还可以为特定容器Blob 创建客户端对象,不管是直接创建还是通过 BlobServiceClient 对象创建。 要详细了解如何创建和管理客户端对象,请参阅 创建和管理与数据资源交互的客户端对象

关于基于异步计划复制 Blob

Copy Blob 操作可以异步完成,并且是在尽最大努力的基础上执行的,这意味着该操作不一定会立即开始或在指定的时间范围内完成。 复制操作是在后台安排的,并在服务器具有可用资源时执行。 如果复制发生在同一存储帐户内,则操作可以同步完成。

Copy Blob 操作可以执行以下任何操作:

  • 将源 Blob 复制到具有不同名称的目标 Blob。 目标 Blob 可以是相同类型(块、追加或页类型)的现有 Blob,也可以是复制操作创建的新 Blob。
  • 将源 Blob 复制到具有相同名称的目标 Blob,从而替换目标 Blob。 此类复制操作会移除所有未提交的块,并覆盖目标 Blob 的元数据。
  • 将 Azure 文件服务中的源文件复制到目标 Blob。 目标 Blob 可以是现有的块 Blob,也可以是复制操作创建的新块 Blob。 不支持从文件复制到页 Blob 或追加 Blob。
  • 将快照复制到其基本 Blob 上。 通过将快照提升到基本 Blob 的位置,可还原早期版本的 Blob。
  • 将快照复制到具有不同名称的目标 Blob。 生成的目标 Blob 是可写的 Blob,而不是快照。

复制操作的源 Blob 可以是以下类型之一:块 Blob、追加 Blob、页 Blob、Blob 快照或 Blob 版本。 该复制操作会始终复制整个源 Blob 或文件。 不支持复制字节范围或块集。

如果目标 Blob 已存在,则其类型必须与源 Blob 相同,并且系统将会覆盖现有目标 Blob。 进行复制操作时无法修改目标 Blob,并且目标 Blob 只能有一个未完成的复制操作。

若要详细了解 Copy Blob 操作,包括有关属性、索引标记、元数据和计费的信息,请参阅 Copy Blob 注解

基于异步计划复制 Blob

本部分概述了适用于 Python 的 Azure 存储客户端库提供的基于异步计划执行复制操作的方法。

以下方法将会封装 Copy Blob REST API 操作,并开始从源 Blob 异步复制数据:

start_copy_from_url 返回包含 copy_status 和 copy_id 的字典。 如果复制已同步完成,则 copy_status 属性为“success”;如果复制已异步启动,则该属性为“pending”。

在 Azure 中从源复制 blob

如果要在同一存储帐户中复制 Blob,操作可以同步完成。 可以通过 Microsoft Entra ID、共享访问签名 (SAS) 或帐户密钥来授权对源 Blob 的访问。 有关可以用作替代方法的同步复制操作,请参阅使用 Python 从源对象 URL 复制 Blob

如果复制源是其他存储帐户中的 Blob,则操作可以异步完成。 源 Blob 必须是公共的或通过 SAS 令牌获得授权。 SAS 令牌需要包含读取 ('r') 权限。 若要详细了解 SAS 令牌,请参阅使用共享访问签名委派权限

以下示例演示使用异步计划从其他存储帐户复制源 Blob 的方案。 在此示例中,我们将使用追加的用户委派 SAS 令牌创建源 Blob URL。 此示例演示了如何使用客户端库生成 SAS 令牌,但你也可以提供自己的令牌。 此示例还演示了如何在复制操作期间租用源 Blob,以防止有人从其他客户端对该 Blob 进行更改。 Copy Blob 操作会在复制操作开始时保存源 Blob 的 ETag 值。 如果 ETag 值在复制操作完成之前发生了更改,则操作将失败。

def copy_from_source_in_azure_async(self, source_blob: BlobClient, destination_blob: BlobClient, blob_service_client: BlobServiceClient):
    # Lease the source blob during copy to prevent other clients from modifying it
    lease = BlobLeaseClient(client=source_blob)

    sas_token = self.generate_user_delegation_sas(blob_service_client=blob_service_client, source_blob=source_blob)
    source_blob_sas_url = source_blob.url + "?" + sas_token

    # Create an infinite lease by passing -1 as the lease duration
    lease.acquire(lease_duration=-1)

    # Start the copy operation - specify False for the requires_sync parameter
    copy_operation = dict()
    copy_operation = destination_blob.start_copy_from_url(source_url=source_blob_sas_url, requires_sync=False)
    
    # If start_copy_from_url returns copy_status of 'pending', the operation has been started asynchronously
    # You can optionally add logic here to wait for the copy operation to complete

    # Release the lease on the source blob
    lease.break_lease()

def generate_user_delegation_sas(self, blob_service_client: BlobServiceClient, source_blob: BlobClient):
    # Get a user delegation key
    delegation_key_start_time = datetime.datetime.now(datetime.timezone.utc)
    delegation_key_expiry_time = delegation_key_start_time + datetime.timedelta(hours=1)
    key = blob_service_client.get_user_delegation_key(
        key_start_time=delegation_key_start_time,
        key_expiry_time=delegation_key_expiry_time
    )

    # Create a SAS token that's valid for one hour, as an example
    sas_token = generate_blob_sas(
        account_name=blob_service_client.account_name,
        container_name=source_blob.container_name,
        blob_name=source_blob.blob_name,
        account_key=None,
        user_delegation_key=key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1),
        start=datetime.datetime.now(datetime.timezone.utc)
    )

    return sas_token

注意

用户委派 SAS 令牌提供了更高的安全性,因为它们是使用 Microsoft Entra 凭据而不是帐户密钥进行签名的。 若要创建用户委托 SAS 令牌,Microsoft Entra 安全主体需要适当的权限。 有关授权要求,请参阅获取用户委派密钥

从 Azure 外部的源复制 Blob

你可以对可通过 HTTP GET 请求在给定 URL 上检索的任何源对象执行复制操作,包括 Azure 外部的可访问对象。 以下示例演示了从可访问的源对象 URL 复制 Blob 的方案。

def copy_from_external_source_async(self, source_url: str, destination_blob: BlobClient):
    # Start the copy operation - specify False for the requires_sync parameter
    copy_operation = dict()
    copy_operation = destination_blob.start_copy_from_url(source_url=source_url, requires_sync=False)
    
    # If start_copy_from_url returns copy_status of 'pending', the operation has been started asynchronously
    # You can optionally add logic here to wait for the copy operation to complete

检查复制操作的状态

若要检查异步 Copy Blob 操作的状态,可以轮询 get_blob_properties 方法并检查复制状态。

以下代码示例演示了如何检查挂起的复制操作的状态:

def check_copy_status(self, destination_blob: BlobClient):
    # Get the copy status from the destination blob properties
    copy_status = destination_blob.get_blob_properties().copy.status

    return copy_status

中止复制操作

中止挂起的 Copy Blob 操作会导致目标 Blob 长度为零。 但是,目标 Blob 的元数据将包含从源 Blob 复制的新值,或者在复制操作过程中显式设置的新值。 若要在复制之前保留原始元数据,请在调用一个复制方法之前创建目标 Blob 的快照。

要中止挂起的复制操作,请调用以下某个操作:

此方法会包装中止复制 Blob REST API 操作,这会取消挂起的 Copy Blob 操作。 以下代码示例演示了如何中止挂起的 Copy Blob 操作:

def abort_copy(self, destination_blob: BlobClient):
    # Get the copy operation details from the destination blob properties
    copy_status = destination_blob.get_blob_properties().copy.status
    copy_id = destination_blob.get_blob_properties().copy.id

    # Check the copy status and abort if pending
    if copy_status == 'pending':
        destination_blob.abort_copy(copy_id)
        print(f"Copy operation {copy_id} has been aborted")

资源

若要详细了解如何使用适用于 Python 的 Azure Blob 存储客户端库来通过异步计划复制 Blob,请查看以下资源。

代码示例

REST API 操作

Azure SDK for Python 包含基于 Azure REST API 而生成的库,允许你通过熟悉的 Python 范例与 REST API 操作进行交互。 本文中介绍的客户端库方法使用以下 REST API 操作:

客户端库资源

  • 本文是 Python 版 Blob 存储开发人员指南的一部分。 若要了解详细信息,请参阅生成 Python 应用中的开发人员指南文章的完整列表。