教程:使用 Pandas 在 Synapse Analytics 的无服务器 Apache Spark 池中读取/写入 Azure Data Lake Storage Gen2 数据
了解如何使用 Azure Synapse Analytics 中的无服务器 Apache Spark 池,使用 Pandas 将数据读取/写入到 Azure Data Lake Storage Gen2 (ADLS)。 本教程中的示例演示了如何在 Synapse 中通过 Pandas 读取 csv 数据,以及 excel 和 parquet 文件。
本教程介绍以下操作:
- 使用 Pandas 在 Spark 会话中读取/写入 ADLS Gen2 数据。
如果没有 Azure 订阅,请在开始前创建一个试用帐户。
先决条件
Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储(或主存储)。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
Azure Synapse Analytics 工作区中的无服务器 Apache Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建 Spark 池。
配置辅助 Azure Data Lake Storage Gen2 账户(不是 Synapse 工作区的默认账户)。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
在 Azure Synapse Analytics 中创建链接服务,它定义了该服务的连接信息。 在本教程中,你将添加 Azure Synapse Analytics 和 Azure Data Lake Storage Gen2 链接服务。
- 打开 Azure Synapse Studio 并选择“管理”选项卡。
- 在“外部连接”下,选择“链接服务”。
- 若要添加链接服务,请选择“新建”。
- 从列表中选择“Azure Data Lake Storage Gen2”磁贴,然后选择“继续”。
- 输入你的身份验证凭据。 帐户密钥、服务主体 (SP)、凭据和托管服务标识 (MSI) 是当前支持的身份验证类型。 在选择其进行身份验证之前,请确保在存储上为 SP 和 MSI 分配了存储 Blob 数据参与者。 测试连接以验证你的凭据是否正确。 选择创建。
重要
注意
- Azure Synapse Analytics 中的 Python 3.8 和 Spark3 无服务器 Apache Spark 池支持 Pandas 功能。
- 支持以下版本:pandas 1.2.3、fsspec 2021.10.0、adlfs 0.7.7
- 能够支持 Azure Data Lake Storage Gen2 URI (abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/file_path)。
登录到 Azure 门户
登录到 Azure 门户。
将数据读取/写入到 Synapse 工作区的默认 ADLS 存储帐户
Pandas 可以通过直接指定文件路径来读取/写入 ADLS 数据。
运行以下代码。
注意
在运行此脚本之前,请更新其中的文件 URL。
#Read data file from URI of default Azure Data Lake Storage Gen2
import pandas
#read csv file
df = pandas.read_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/file_path')
print(df)
#write csv file
data = pandas.DataFrame({'Name':['A', 'B', 'C', 'D'], 'ID':[20, 21, 19, 18]})
data.to_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/file_path')
使用辅助 ADLS 帐户读取/写入数据
Pandas 可以读取/写入辅助 ADLS 帐户数据:
- 使用链接服务(身份验证选项 - 存储帐户密钥、服务主体、托管服务标识和凭据)。
- 使用存储选项直接传递客户端 ID & 机密、SAS 密钥、存储帐户密钥和连接字符串。
使用链接服务
运行以下代码。
注意
在运行之前,请更新此脚本中的文件 URL 和链接服务名称。
#Read data file from URI of secondary Azure Data Lake Storage Gen2
import pandas
#read data file
df = pandas.read_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/ file_path', storage_options = {'linked_service' : 'linked_service_name'})
print(df)
#write data file
data = pandas.DataFrame({'Name':['A', 'B', 'C', 'D'], 'ID':[20, 21, 19, 18]})
data.to_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/file_path', storage_options = {'linked_service' : 'linked_service_name'})
使用存储选项直接传递客户端 ID & 机密、SAS 密钥、存储帐户密钥和连接字符串。
运行以下代码。
注意
在运行之前,请更新此脚本中的文件 URL 和 storage_options。
#Read data file from URI of secondary Azure Data Lake Storage Gen2
import pandas
#read data file
df = pandas.read_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/ file_path', storage_options = {'account_key' : 'account_key_value'})
## or storage_options = {'sas_token' : 'sas_token_value'}
## or storage_options = {'connection_string' : 'connection_string_value'}
## or storage_options = {'tenant_id': 'tenant_id_value', 'client_id' : 'client_id_value', 'client_secret': 'client_secret_value'}
print(df)
#write data file
data = pandas.DataFrame({'Name':['A', 'B', 'C', 'D'], 'ID':[20, 21, 19, 18]})
data.to_csv('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/file_path', storage_options = {'account_key' : 'account_key_value'})
## or storage_options = {'sas_token' : 'sas_token_value'}
## or storage_options = {'connection_string' : 'connection_string_value'}
## or storage_options = {'tenant_id': 'tenant_id_value', 'client_id' : 'client_id_value', 'client_secret': 'client_secret_value'}
读取/写入 parquet 文件的示例
运行以下代码。
注意
在运行此脚本之前,请更新其中的文件 URL。
import pandas
#read parquet file
df = pandas.read_parquet('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/ parquet_file_path')
print(df)
#write parquet file
df.to_parquet('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/ parquet_file_path')
读取/写入 excel 文件的示例
运行以下代码。
注意
在运行此脚本之前,请更新其中的文件 URL。
import pandas
#read excel file
df = pandas.read_excel('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/ excel_file_path')
print(df)
#write excel file
df.to_excel('abfs[s]://file_system_name@account_name.dfs.core.chinacloudapi.cn/excel_file_path')