Databricks 使用 Unity 目录卷提供批量引入功能,使用户能够将数据集传入和传出本地文件(如 CSV 文件)。 请参阅什么是 Unity Catalog 卷?。
本文介绍了如何使用版本 3 及更高版本的 Databricks JDBC 驱动程序,来管理卷中的文件,以及读取和写入卷中的数据流。
启用卷操作
要启用 Unity Catalog 卷操作,请将连接属性 VolumeOperationAllowedLocalPaths 设置为允许卷操作的本地路径的逗号分隔列表。 请参阅 “其他功能属性”
必须启用 Unity Catalog 才能使用此功能。 使用 Databricks UI 提供类似的功能。 请参阅将文件上传到 Unity Catalog 卷。
Unity Catalog 引入命令是 SQL 语句。 以下示例演示 PUT、GET 和 REMOVE 操作。
上传本地文件
要将本地文件 /tmp/test.csv 上传到 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv,请使用 PUT 操作:
PUT '/tmp/test.csv' INTO '/Volumes/main/default/e2etests/file1.csv' OVERWRITE
下载文件
若要将文件从 Unity Catalog 卷路径 /Volumes/main/default/e2etests/file1.csv 下载到本地文件 /tmp/test.csv,请使用 GET 操作:
GET '/Volumes/main/default/e2etests/file1.csv' TO '/tmp/test.csv'
删除文件
若要删除具有 Unity Catalog 卷路径/Volumes/main/default/e2etests/file1.csv的文件,请使用 REMOVE 操作。
REMOVE '/Volumes/main/default/e2etests/file1.csv'
使用流读取/写入数据
JDBC 驱动程序通过提供接口 IDatabricksVolumeClient支持流式传输数据,以便从 Unity 目录卷读取和写入数据。 有关可用 API,请参阅 IDatabricksVolumeClient 参考 。
可以使用 IDatabricksVolumeClient 工厂实用工具初始化 DatabricksVolumeClientFactory:
import com.databricks.jdbc.api.impl.volume.DatabricksVolumeClientFactory;
import com.databricks.jdbc.api.volume.IDatabricksVolumeClient;
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
将文件从流写入到卷中
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
volumeClient.putObject(catalog, schema, volume, objectPath, inputStream, contentLength, true /* overwrite */);
以流的形式读取卷文件
import org.apache.http.entity.InputStreamEntity;
// Create the connection and enable the volume operations.
Connection connection = DriverManager.getConnection(url, prop);
connection.setClientInfo("enableVolumeOperations", "1");
File file = new File("/tmp/test.csv");
FileInputStream fileInputStream = new FileInputStream(file);
// Upload the file stream to UC Volume path
IDatabricksVolumeClient volumeClient = DatabricksVolumeClientFactory.getVolumeClient(Connection conn);
InputStreamEntity volumeDataStream = volumeClient.getObject(catalog, schema, volume, objectPath);