使用 Databricks JDBC 驱动程序 (OSS) 管理卷中的文件

Databricks 使用 Unity 目录卷提供批量引入功能,使用户能够将数据集传入和传出本地文件(如 CSV 文件)。 请参阅什么是 Unity Catalog 卷?

本文介绍如何使用 Databricks JDBC 驱动程序 (OSS) 管理卷中的文件,以及如何从卷和向卷读取和写入流。

启用卷操作

要启用 Unity Catalog 卷操作,请将连接属性 VolumeOperationAllowedLocalPaths 设置为允许卷操作的本地路径的逗号分隔列表。 请参阅 “其他功能属性”

必须启用 Unity Catalog 才能使用此功能。 使用 Databricks UI 提供类似的功能。 请参阅将文件上传到 Unity Catalog 卷

Unity Catalog 引入命令是 SQL 语句。 以下示例演示 PUT、GET 和 REMOVE 操作。

上传本地文件

要将本地文件 /tmp/test.csv 上传到 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv,请使用 PUT 操作:

  PUT '/tmp/test.csv' INTO '/Volumes/main/default/e2etests/file1.csv' OVERWRITE

下载文件

若要将文件从 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv 下载到本地文件 /tmp/test.csv,请使用 GET 操作:

  GET '/Volumes/main/default/e2etests/file1.csv' TO '/tmp/test.csv'

删除文件

若要删除具有 Unity Catalog 卷路径/Volumes/main/default/e2etests/file1.csv的文件,请使用 REMOVE 操作。

  REMOVE '/Volumes/main/default/e2etests/file1.csv'

使用流读取/写入数据

JDBC 驱动程序通过提供接口 IDatabricksVolumeClient支持流式传输数据,以便从 Unity 目录卷读取和写入数据。 有关可用 API,请参阅 IDatabricksVolumeClient 参考

可以使用 IDatabricksVolumeClient 工厂实用工具初始化 DatabricksVolumeClientFactory

import com.databricks.jdbc.api.impl.volume.DatabricksVolumeClientFactory;
import com.databricks.jdbc.api.volume.IDatabricksVolumeClient;

IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);

将文件从流写入到卷中

Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
volumeClient.putObject(catalog, schema, volume, objectPath, inputStream, contentLength, true /* overwrite */);

以流的形式读取卷文件

import org.apache.http.entity.InputStreamEntity;

Connection connection = DriverManager.getConnection(url, prop);
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);

// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
InputStreamEntity volumeDataStream = volumeClient.getObject(catalog, schema, volume, objectPath);

IDatabricksVolumeClient 接口

prefixExists
boolean prefixExists(String catalog, String schema, String volume, String prefix, boolean caseSensitive) throws SQLException
确定 Unity 目录卷中是否存在特定的前缀(类似于文件夹的结构)。 前缀必须是文件名的一部分。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 Unity 目录卷名称。
  • prefix - 用于检查存在的前缀,以及以卷的根目录为起点的相对路径。
  • caseSensitive - 检查是否应该区分大小写。

返回:
一个布尔值,指示前缀是否存在。
objectExists
boolean objectExists(String catalog, String schema, String volume, String objectPath, boolean caseSensitive) throws SQLException
确定 Unity 目录卷中是否存在特定对象(文件)。 对象必须与文件名完全匹配。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 Unity 目录卷名称。
  • objectPath - 以卷的根目录为起点的对象(文件)的路径,用于检查是否在卷中存在(在任何子文件夹中)。
  • caseSensitive - 一个布尔值,指示检查是否应区分大小写。

返回:
一个布尔值,指示对象是否存在。
volumeExists
boolean volumeExists(String catalog, String schema, String volumeName, boolean caseSensitive) throws SQLException
确定给定目录和架构中是否存在特定卷。 卷必须与卷名称完全匹配。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volumeName - 要检查是否存在的卷的名称。
  • caseSensitive 一个布尔值,指示检查是否应区分大小写。

返回:
一个布尔值,用于指示卷是否存在。
listObjects
List<String> listObjects(String catalog, String schema, String volume, String prefix, boolean caseSensitive) throws SQLException
返回 Unity 目录卷中以指定前缀开头的所有文件名的列表。 前缀必须是以卷的根目录为起点的文件路径的一部分。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • prefix - 要列出的文件名的前缀。 这包括以卷的根目录为起点的相对路径。
  • caseSensitive - 一个布尔值,指示检查是否应区分大小写。

返回:
一个字符串列表,指示以指定前缀开头的文件名。
getObject(文件)
boolean getObject(String catalog, String schema, String volume, String objectPath, String localPath) throws SQLException
从 Unity 目录卷检索对象(文件),并将其存储在指定的本地路径中。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • objectPath - 以卷的根目录为起点的对象(文件)的路径。
  • localPath - 要存储检索到数据的本地路径。

返回:
表示GET操作状态的布尔值。
getObject(流)
InputStreamEntity getObject(String catalog, String schema, String volume, String objectPath) throws SQLException
从 Unity Catalog 卷检索对象作为输入流。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • objectPath - 以卷的根目录为起点的对象(文件)的路径。

返回:
输入流实体的实例。
putObject(文件)
boolean putObject(String catalog, String schema, String volume, String objectPath, String localPath, boolean toOverwrite) throws SQLException
将数据从本地路径上传到 Unity 目录卷中的指定路径。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • objectPath 上传对象(文件)的目标路径,以卷的根目录为起点。
  • localPath 要从中上传数据的本地路径。
  • toOverwrite 一个布尔值,指示是否覆盖对象(如果对象已存在)。

返回:
一个布尔值,指示PUT操作的状态。
putObject(流)
boolean putObject(String catalog, String schema, String volume, String objectPath, InputStream inputStream, long contentLength, boolean toOverwrite) throws SQLException
将数据从输入流上传到 Unity 目录卷中的指定路径。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • objectPath - 上传对象(文件)的目标路径,以卷的根目录为起点。
  • inputStream - 要从中上传数据的输入流。
  • contentLength - 输入流的长度。
  • toOverwrite 一个布尔值,指示是否覆盖对象(如果对象已存在)。

返回:
一个布尔值,表示PUT操作的状态。
deleteObject # 删除对象
boolean deleteObject(String catalog, String schema, String volume, String objectPath) throws SQLException
从 Unity 目录卷中的指定路径中删除对象。
参数:
  • catalog - 云存储的目录名称。
  • schema - 云存储的架构名称。
  • volume - 云存储的 UC 卷名称。
  • objectPath - 以卷的根目录为起点的要删除的对象(文件)的路径。

返回:
一个布尔值,指示 DELETE 操作的状态。