다음을 통해 공유

使用 Databricks JDBC 驱动程序来管理存储卷中的文件

Databricks 使用 Unity 目录卷提供批量引入功能,使用户能够将数据集传入和传出本地文件(如 CSV 文件)。 请参阅什么是 Unity Catalog 卷?

本文介绍了如何使用版本 3 及更高版本的 Databricks JDBC 驱动程序,来管理卷中的文件,以及读取和写入卷中的数据流。

启用卷操作

要启用 Unity Catalog 卷操作,请将连接属性 VolumeOperationAllowedLocalPaths 设置为允许卷操作的本地路径的逗号分隔列表。 请参阅 “其他功能属性”

必须启用 Unity Catalog 才能使用此功能。 使用 Databricks UI 提供类似的功能。 请参阅将文件上传到 Unity Catalog 卷

Unity Catalog 引入命令是 SQL 语句。 以下示例演示 PUT、GET 和 REMOVE 操作。

上传本地文件

要将本地文件 /tmp/test.csv 上传到 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv,请使用 PUT 操作:

  PUT '/tmp/test.csv' INTO '/Volumes/main/default/e2etests/file1.csv' OVERWRITE

下载文件

若要将文件从 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv 下载到本地文件 /tmp/test.csv,请使用 GET 操作:

  GET '/Volumes/main/default/e2etests/file1.csv' TO '/tmp/test.csv'

删除文件

若要删除具有 Unity Catalog 卷路径/Volumes/main/default/e2etests/file1.csv的文件,请使用 REMOVE 操作。

  REMOVE '/Volumes/main/default/e2etests/file1.csv'

使用流读取/写入数据

JDBC 驱动程序通过提供接口 IDatabricksVolumeClient支持流式传输数据,以便从 Unity 目录卷读取和写入数据。 有关可用 API,请参阅 IDatabricksVolumeClient 参考

可以使用 IDatabricksVolumeClient 工厂实用工具初始化 DatabricksVolumeClientFactory

import com.databricks.jdbc.api.impl.volume.DatabricksVolumeClientFactory;
import com.databricks.jdbc.api.volume.IDatabricksVolumeClient;

IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);

将文件从流写入到卷中

// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
volumeClient.putObject(catalog, schema, volume, objectPath, inputStream, contentLength, true /* overwrite */);

以流的形式读取卷文件

import org.apache.http.entity.InputStreamEntity;

// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
InputStreamEntity volumeDataStream = volumeClient.getObject(catalog, schema, volume, objectPath);