使用 Python 管理 Azure Data Lake Storage Gen2 中的目录和文件

本文介绍了如何使用 Python 在具有分层命名空间的存储帐户中创建和管理目录与文件。

若要了解如何获取、设置和更新目录与文件的访问控制列表 (ACL),请参阅使用 Python 管理 Azure Data Lake Storage Gen2 中的 ACL

包 (PyPi) | 示例 | API 参考 | 提供反馈

先决条件

设置项目

本部分逐步指导你准备一个项目,使其与适用于 Python 的 Azure Data Lake Storage 客户端库配合使用。

从项目目录中,使用 pip install 命令安装 Azure Data Lake Storage 和 Azure 标识客户端库的包。 与 Azure 服务的无密码连接需要 azure-identity 包。

pip install azure-storage-file-datalake azure-identity

然后打开代码文件并添加必要的 import 语句。 在此示例中,我们将以下内容添加到 .py 文件:

import os
from azure.storage.filedatalake import (
    DataLakeServiceClient,
    DataLakeDirectoryClient,
    FileSystemClient
)
from azure.identity import DefaultAzureCredential

授权访问并连接到数据资源

若要使用本文中的代码示例,需创建一个表示存储帐户的授权 DataLakeServiceClient 实例。 可以使用 Microsoft Entra ID、帐户访问密钥或共享访问签名 (SAS) 来授权 DataLakeServiceClient 对象。

可以使用适用于 Python 的 Azure 标识客户端库,通过 Microsoft Entra ID 对应用程序进行验证。

创建 DataLakeServiceClient 类的实例并传入 DefaultAzureCredential 对象。

def get_service_client_token_credential(self, account_name) -> DataLakeServiceClient:
    account_url = f"https://{account_name}.dfs.core.chinacloudapi.cn"
    token_credential = DefaultAzureCredential()

    service_client = DataLakeServiceClient(account_url, credential=token_credential)

    return service_client

要详细了解如何使用 DefaultAzureCredential 授权访问数据,请参阅概述:使用 Azure SDK 向 Azure 验证 Python 应用的身份

创建容器

容器充当文件的文件系统。 可以使用以下方法创建容器:

下面的代码示例创建一个容器,并返回一个 FileSystemClient 对象供以后使用:

def create_file_system(self, service_client: DataLakeServiceClient, file_system_name: str) -> FileSystemClient:
    file_system_client = service_client.create_file_system(file_system=file_system_name)

    return file_system_client

创建目录

可以使用以下方法在容器中创建目录引用:

下面的代码示例将目录添加到容器,并返回一个 DataLakeDirectoryClient 对象供以后使用:

def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    directory_client = file_system_client.create_directory(directory_name)

    return directory_client

重命名或移动目录

可以使用以下方法重命名或移动目录:

new_name 参数中使用新目录名称传递路径。 值必须具有以下格式:{filesystem}/{directory}/{subdirectory}。

以下代码示例说明了如何重命名子目录:

def rename_directory(self, directory_client: DataLakeDirectoryClient, new_dir_name: str):
    directory_client.rename_directory(
        new_name=f"{directory_client.file_system_name}/{new_dir_name}")

将文件上传到目录

可以使用以下方法将内容上传到新的或现有的文件:

下面的代码示例演示如何使用 upload_data 方法将文件上传到目录:

def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

可以使用此方法创建内容并将其上传到新文件,也可以将 overwrite 参数设置为 True,以覆盖现有文件。

将数据追加到文件

可以使用以下方法上传要追加到文件的数据:

下面的代码示例演示如何使用以下步骤将数据追加到文件的末尾:

  • 创建一个 DataLakeFileClient 对象来表示正在使用的文件资源。
  • 使用 append_data 方法将数据上传到文件。
  • 通过调用 flush_data 方法完成上传,将以前上传的数据写入文件。
def append_data_to_file(self, directory_client: DataLakeDirectoryClient, file_name: str):
    file_client = directory_client.get_file_client(file_name)
    file_size = file_client.get_file_properties().size
    
    data = b"Data to append to end of file"
    file_client.append_data(data, offset=file_size, length=len(data))

    file_client.flush_data(file_size + len(data))

使用此方法时,数据只能追加到文件,并且每个请求的操作限制为 4000 MiB。

从目录下载

下面的代码示例演示如何使用以下步骤将文件从目录下载到本地文件:

  • 创建一个 DataLakeFileClient 对象来表示要下载的文件。
  • 打开用于写入的本地文件。
  • 调用 DataLakeFileClient.download_file,以便从文件读取字节,然后将这些字节写入本地文件。
def download_file_from_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="wb") as local_file:
        download = file_client.download_file()
        local_file.write(download.readall())
        local_file.close()

列出目录内容

可以通过使用以下方法并枚举结果来列出目录内容:

枚举结果中的路径可能会在提取值时向服务发出多个请求。

下面的代码示例输出目录中的每个子目录和文件的路径。

def list_directory_contents(self, file_system_client: FileSystemClient, directory_name: str):
    paths = file_system_client.get_paths(path=directory_name)

    for path in paths:
        print(path.name + '\n')

删除目录

可以使用以下方法删除目录:

以下代码示例说明了如何删除目录:

def delete_directory(self, directory_client: DataLakeDirectoryClient):
    directory_client.delete_directory()

另请参阅