使用 Python 通过异步计划复制 Blob
本文介绍了如何使用适用于 Python 的 Azure 存储客户端库通过异步计划复制 Blob。 你可以从同一存储帐户中的源、不同存储帐户中的源或从通过 HTTP GET 请求在给定 URL 上检索的任何可访问对象复制 Blob。 还可以中止挂起的复制操作。
本文中介绍的客户端库方法将使用 Copy Blob REST API 操作,并且可以在想要基于异步计划执行复制时使用。 对于要将数据移动到存储帐户且具有源对象 URL 的大多数复制方案,请参阅使用 Python 从源对象 URL 复制 Blob。
先决条件
- Azure 订阅 - 创建试用订阅。
- Azure 存储帐户 - 创建存储帐户
- Python 3.8+
设置你的环境
如果没有现有项目,请查看本部分,其中介绍如何设置项目来使用适用于 Python 的 Azure Blob 存储客户端库。 有关更多详细信息,请参阅 Azure Blob 存储和 Python 入门。
要使用本文中的代码示例,请按照以下步骤设置项目。
安装包
使用 pip install
安装以下包:
pip install azure-storage-blob azure-identity
添加 import 语句
添加以下 import
语句:
import datetime
from azure.identity import DefaultAzureCredential
from azure.storage.blob import (
BlobServiceClient,
BlobClient,
BlobLeaseClient,
BlobSasPermissions,
generate_blob_sas
)
授权
授权机制必须具有执行复制操作或中止挂起的复制的必要权限。 对于使用 Microsoft Entra ID的授权(建议),最小特权 Azure RBAC 内置角色因多种因素而异。 若要了解详细信息,请参阅复制 Blob (REST API) 或 中止复制 Blob (REST API) 的授权指导。
创建客户端对象
若要将应用连接到 Blob 存储,请创建 BlobServiceClient 的实例。 以下示例演示如何使用 DefaultAzureCredential
创建客户端对象进行授权:
# TODO: Replace <storage-account-name> with your actual storage account name
account_url = "https://<storage-account-name>.blob.core.chinacloudapi.cn"
credential = DefaultAzureCredential()
# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(account_url, credential=credential)
还可以为特定容器或 Blob 创建客户端对象,不管是直接创建还是通过 BlobServiceClient
对象创建。 要详细了解如何创建和管理客户端对象,请参阅 创建和管理与数据资源交互的客户端对象。
关于基于异步计划复制 Blob
Copy Blob
操作可以异步完成,并且是在尽最大努力的基础上执行的,这意味着该操作不一定会立即开始或在指定的时间范围内完成。 复制操作是在后台安排的,并在服务器具有可用资源时执行。 如果复制发生在同一存储帐户内,则操作可以同步完成。
Copy Blob
操作可以执行以下任何操作:
- 将源 Blob 复制到具有不同名称的目标 Blob。 目标 Blob 可以是相同类型(块、追加或页类型)的现有 Blob,也可以是复制操作创建的新 Blob。
- 将源 Blob 复制到具有相同名称的目标 Blob,从而替换目标 Blob。 此类复制操作会移除所有未提交的块,并覆盖目标 Blob 的元数据。
- 将 Azure 文件服务中的源文件复制到目标 Blob。 目标 Blob 可以是现有的块 Blob,也可以是复制操作创建的新块 Blob。 不支持从文件复制到页 Blob 或追加 Blob。
- 将快照复制到其基本 Blob 上。 通过将快照提升到基本 Blob 的位置,可还原早期版本的 Blob。
- 将快照复制到具有不同名称的目标 Blob。 生成的目标 Blob 是可写的 Blob,而不是快照。
复制操作的源 Blob 可以是以下类型之一:块 Blob、追加 Blob、页 Blob、Blob 快照或 Blob 版本。 该复制操作会始终复制整个源 Blob 或文件。 不支持复制字节范围或块集。
如果目标 Blob 已存在,则其类型必须与源 Blob 相同,并且系统将会覆盖现有目标 Blob。 进行复制操作时无法修改目标 Blob,并且目标 Blob 只能有一个未完成的复制操作。
若要详细了解 Copy Blob
操作,包括有关属性、索引标记、元数据和计费的信息,请参阅 Copy Blob 注解。
基于异步计划复制 Blob
本部分概述了适用于 Python 的 Azure 存储客户端库提供的基于异步计划执行复制操作的方法。
以下方法将会封装 Copy Blob REST API 操作,并开始从源 Blob 异步复制数据:
start_copy_from_url
返回包含 copy_status 和 copy_id 的字典。 如果复制已同步完成,则 copy_status 属性为“success”;如果复制已异步启动,则该属性为“pending”。
在 Azure 中从源复制 blob
如果要在同一存储帐户中复制 Blob,操作可以同步完成。 可以通过 Microsoft Entra ID、共享访问签名 (SAS) 或帐户密钥来授权对源 Blob 的访问。 有关可以用作替代方法的同步复制操作,请参阅使用 Python 从源对象 URL 复制 Blob。
如果复制源是其他存储帐户中的 Blob,则操作可以异步完成。 源 Blob 必须是公共的或通过 SAS 令牌获得授权。 SAS 令牌需要包含读取 ('r') 权限。 若要详细了解 SAS 令牌,请参阅使用共享访问签名委派权限。
以下示例演示使用异步计划从其他存储帐户复制源 Blob 的方案。 在此示例中,我们将使用追加的用户委派 SAS 令牌创建源 Blob URL。 此示例演示了如何使用客户端库生成 SAS 令牌,但你也可以提供自己的令牌。 此示例还演示了如何在复制操作期间租用源 Blob,以防止有人从其他客户端对该 Blob 进行更改。 Copy Blob
操作会在复制操作开始时保存源 Blob 的 ETag
值。 如果 ETag
值在复制操作完成之前发生了更改,则操作将失败。
def copy_from_source_in_azure_async(self, source_blob: BlobClient, destination_blob: BlobClient, blob_service_client: BlobServiceClient):
# Lease the source blob during copy to prevent other clients from modifying it
lease = BlobLeaseClient(client=source_blob)
sas_token = self.generate_user_delegation_sas(blob_service_client=blob_service_client, source_blob=source_blob)
source_blob_sas_url = source_blob.url + "?" + sas_token
# Create an infinite lease by passing -1 as the lease duration
lease.acquire(lease_duration=-1)
# Start the copy operation - specify False for the requires_sync parameter
copy_operation = dict()
copy_operation = destination_blob.start_copy_from_url(source_url=source_blob_sas_url, requires_sync=False)
# If start_copy_from_url returns copy_status of 'pending', the operation has been started asynchronously
# You can optionally add logic here to wait for the copy operation to complete
# Release the lease on the source blob
lease.break_lease()
def generate_user_delegation_sas(self, blob_service_client: BlobServiceClient, source_blob: BlobClient):
# Get a user delegation key
delegation_key_start_time = datetime.datetime.now(datetime.timezone.utc)
delegation_key_expiry_time = delegation_key_start_time + datetime.timedelta(hours=1)
key = blob_service_client.get_user_delegation_key(
key_start_time=delegation_key_start_time,
key_expiry_time=delegation_key_expiry_time
)
# Create a SAS token that's valid for one hour, as an example
sas_token = generate_blob_sas(
account_name=blob_service_client.account_name,
container_name=source_blob.container_name,
blob_name=source_blob.blob_name,
account_key=None,
user_delegation_key=key,
permission=BlobSasPermissions(read=True),
expiry=datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1),
start=datetime.datetime.now(datetime.timezone.utc)
)
return sas_token
注意
用户委派 SAS 令牌提供了更高的安全性,因为它们是使用 Microsoft Entra 凭据而不是帐户密钥进行签名的。 若要创建用户委托 SAS 令牌,Microsoft Entra 安全主体需要适当的权限。 有关授权要求,请参阅获取用户委派密钥。
从 Azure 外部的源复制 Blob
你可以对可通过 HTTP GET 请求在给定 URL 上检索的任何源对象执行复制操作,包括 Azure 外部的可访问对象。 以下示例演示了从可访问的源对象 URL 复制 Blob 的方案。
def copy_from_external_source_async(self, source_url: str, destination_blob: BlobClient):
# Start the copy operation - specify False for the requires_sync parameter
copy_operation = dict()
copy_operation = destination_blob.start_copy_from_url(source_url=source_url, requires_sync=False)
# If start_copy_from_url returns copy_status of 'pending', the operation has been started asynchronously
# You can optionally add logic here to wait for the copy operation to complete
检查复制操作的状态
若要检查异步 Copy Blob
操作的状态,可以轮询 get_blob_properties 方法并检查复制状态。
以下代码示例演示了如何检查挂起的复制操作的状态:
def check_copy_status(self, destination_blob: BlobClient):
# Get the copy status from the destination blob properties
copy_status = destination_blob.get_blob_properties().copy.status
return copy_status
中止复制操作
中止挂起的 Copy Blob
操作会导致目标 Blob 长度为零。 但是,目标 Blob 的元数据将包含从源 Blob 复制的新值,或者在复制操作过程中显式设置的新值。 若要在复制之前保留原始元数据,请在调用一个复制方法之前创建目标 Blob 的快照。
要中止挂起的复制操作,请调用以下某个操作:
此方法会包装中止复制 Blob REST API 操作,这会取消挂起的 Copy Blob
操作。 以下代码示例演示了如何中止挂起的 Copy Blob
操作:
def abort_copy(self, destination_blob: BlobClient):
# Get the copy operation details from the destination blob properties
copy_status = destination_blob.get_blob_properties().copy.status
copy_id = destination_blob.get_blob_properties().copy.id
# Check the copy status and abort if pending
if copy_status == 'pending':
destination_blob.abort_copy(copy_id)
print(f"Copy operation {copy_id} has been aborted")
资源
若要详细了解如何使用适用于 Python 的 Azure Blob 存储客户端库来通过异步计划复制 Blob,请查看以下资源。
代码示例
REST API 操作
Azure SDK for Python 包含基于 Azure REST API 而生成的库,允许你通过熟悉的 Python 范例与 REST API 操作进行交互。 本文中介绍的客户端库方法使用以下 REST API 操作:
客户端库资源
相关内容
- 本文是 Python 版 Blob 存储开发人员指南的一部分。 若要了解详细信息,请参阅生成 Python 应用中的开发人员指南文章的完整列表。