重要
SFTP 连接器为 公共预览版。
了解如何使用 Lakeflow Connect 从 SFTP 服务器引入文件。 SFTP 连接器扩展了自动加载功能,通过 Unity 目录治理从 SFTP 服务器提供安全的增量摄取功能。
主要功能
SFTP 连接器提供以下内容:
- 私钥和基于密码的身份验证。
- 增量文件引入和处理,并提供一次性保证。
- 自动架构推理、演变和数据救援。
- 用于安全引入和凭据的 Unity 目录治理。
- 宽文件格式支持:
JSON、、、CSV、XMLPARQUETAVRO、TEXT、和BINARYFILEORC。 - 内置支持模式和通配符匹配,以轻松定位数据子集。
- 支持所有计算类型,包括 Lakeflow Spark 声明性管道、Databricks SQL、无服务器架构和经典架构,并且要求 Databricks Runtime 17.3 及以上版本。
在您开始之前
若要创建连接和引入管道,必须具有以下各项:
- 一个启用了 Unity Catalog 的工作区。
-
CREATE CONNECTION特权。 - 使用版本为 17.3 或更高的 Databricks Runtime 的计算。
配置 SFTP
首先,确认 Databricks 群集环境可以访问源 SFTP 服务器:
- 确保远程服务器在配置了您工作区的 VPC 中可用。
- 确保您的 SSH 规则允许 Databricks VPC 的 IP 范围(如果您使用的是经典计算)或稳定的 IP(如果您使用的是无服务器计算)。
- 在经典计算平面中,连同负载均衡器、NAT 网关、Internet 网关或等效网关一起设置一个稳定的 IP 地址,并将其与部署 Azure Databricks 计算的子网连接。 这样,计算资源就可以共享可在 SFTP 服务器端列入允许列表的稳定公共 IP 地址。 有关配置网络设置的说明,请参阅 对等虚拟网络。
创建连接
创建用于存储 SFTP 凭据的 Unity 目录连接。 必须具有 CREATE CONNECTION 权限。
连接器支持以下身份验证方法:
- PEM 私钥
- 基于密码的身份验证
Databricks 建议使用 PEM 私钥身份验证。 Databricks 还建议在源 SFTP 服务器上使用最低权限的凭据(例如,只读访问权限受限的非根用户)。
创建管道时,连接器会尝试自动查找可以使用的连接,并且该连接与主机匹配。 如果有多个匹配的连接,连接器将选择成功连接到主机的第一个连接。 但是,Databricks 建议显式指定用户。 这可确保连接器不会为具有主机访问权限的另一个用户选择连接。
PEM 私钥(建议)
目录资源管理器
在 Azure Databricks 工作区中,单击“目录”>“外部数据”>“连接”。
单击“ 创建连接”。
在“设置连接向导”的“连接基础知识”页上,输入唯一的连接名称。
对于 连接类型 ,请选择 SFTP。
对于 身份验证类型,请选择 PEM 私钥。
单击 “下一步” 。
在 “身份验证 ”页上,对于 “主机”,输入外国服务器的主机名。
对于 用户,输入用于访问外部实例的用户标识。
单击 “下一步” 。
在 “连接详细信息 ”页上,输入 PEM 格式的私钥。 此外,请提供密钥口令(如果适用)。
如果要跳过主机密钥指纹检查,请取消选择 “强制实施主机密钥指纹”。
选择此选项后,仅当服务器的公钥与预期的 SHA-256 指纹匹配时,连接才会继续。 当禁用时,无论匹配情况如何,连接仍会继续。 在禁用此功能之前,请与网络管理员联系。
如果选中 “强制实施主机密钥指纹 ”,请输入 SFTP 服务器指纹。
可以从服务器管理员或使用 CLI 命令检索指纹。 还可以按 “测试”并创建连接>测试。 生成的错误消息提供指纹。 例如:
ECDSA key fingerprint is SHA256:XXX/YYY单击“ 测试”并创建连接。
如果连接成功,请单击“ 创建”。
SQL
-- Create a connection using a username and SSH private key.
CREATE CONNECTION my_sftp_connection
TYPE sftp
OPTIONS (
host 'my.sftpserver.com',
-- The following credentials can also be used in-line, but Databricks recommends
-- accessing them using the secrets scope.
user secret('my_secret_scope','my_sftp_username'),
pem_private_key secret('my_secret_scope','my_sftp_private_key'),
-- Port for the host
port '22',
-- Passphrase for the private key (optional).
pem_key_passphrase secret('my_secret_scope','my_sftp_private_key_passphrase'),
-- SFTP server fingerprint. You can retrieve this from your server administrator or using CLI commands.
key_fingerprint 'SHA256:ASampleFingerprintValueZy...',
);
基于密码的身份验证
目录资源管理器
在 Azure Databricks 工作区中,单击“目录”>“外部数据”>“连接”。
单击“ 创建连接”。
在“设置连接向导”的“连接基础知识”页上,输入唯一的连接名称。
对于 连接类型 ,请选择 SFTP。
对于 身份验证类型,请选择 “用户名和密码”。
单击 “下一步” 。
在 “身份验证 ”页上,对于 “主机”,输入外国服务器的主机名。
对于 用户,输入用于访问外部实例的用户标识。
对于 “密码”,请输入外部实例的密码。
单击 “下一步” 。
如果要跳过主机密钥指纹检查,请取消选择 “强制实施主机密钥指纹”。
选择此选项后,仅当服务器的公钥与预期的 SHA-256 指纹匹配时,连接才会继续。 当禁用时,无论匹配情况如何,连接仍会继续。 在禁用此功能之前,请与网络管理员联系。
如果选中 “强制实施主机密钥指纹 ”,请输入 SFTP 服务器指纹。
可以从服务器管理员或使用 CLI 命令检索指纹。 还可以按 “测试”并创建连接>测试。 生成的错误消息提供指纹。 例如:
ECDSA key fingerprint is SHA256:XXX/YYY单击“ 测试”并创建连接。
如果连接成功,请单击“ 创建”。
SQL
-- Create a connection using a username and password.
CREATE CONNECTION my_sftp_connection
TYPE sftp
OPTIONS (
host 'my.sftpserver.com',
user secret('my_secret_scope','my_sftp_username'),
password secret('my_secret_scope','my_sftp_password'),
-- Port for the host.
port '22',
-- SFTP server fingerprint. You can retrieve this from your server administrator or using CLI commands.
key_fingerprint 'SHA256:ASampleFingerprintValueZy...',
);
从 SFTP 服务器读取文件
以下示例演示如何使用自动加载程序流式处理功能从 SFTP 服务器读取文件。 有关自动加载程序使用情况的详细信息,请参阅 常见数据加载模式。
# Run the Auto Loader job to ingest all existing data in the SFTP server.
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.schemaLocation", "<path to store schema information>") # This is a cloud storage path
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# Specify the absolute path on the SFTP server starting from the root /.
# Example: /home/<username>/data/files or /uploads/csv_files
.load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")
.writeStream
.format("delta")
.option("checkpointLocation", "<path to store checkpoint information>") # This is a cloud storage path.
.trigger(availableNow = True)
.table("<table name>"))
df.awaitTermination()
以下示例演示如何在 Lakeflow Spark 声明性管道中使用自动加载程序从 SFTP 服务器读取文件:
Python
from pyspark import pipelines as dp
@dp.table
def sftp_bronze_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# Specify the absolute path on the SFTP server starting from the root /.
# Example: /home/username/data/files or /uploads/csv_files
.load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")))
SQL
CREATE OR REFRESH STREAMING TABLE sftp_bronze_table
AS SELECT * FROM STREAM read_files(
"sftp://<username>@<host>:<port>/<absolute_path_to_files>",
format => "csv"
)
配置 自动加载程序选项。 除以下选项外,支持所有选项:
cloudFiles.useNotificationscloudFiles.useManagedFileEventscloudFiles.cleanSource- 特定于云的选项
局限性
- 其他引入图面(包括
COPY INTO,spark.read和dbutils.ls)不支持 SFTP。 - 不支持写回到 SFTP 服务器。
- 不支持自动加载器
cleanSource(在引入后删除或归档源文件)。 - 不支持 FTP 协议。
FAQ
查找有关 SFTP 连接器的常见问题的解答。
如何使用通配符或文件名模式选择要引入的文件?
SFTP 连接器基于标准 Auto Loader 框架构建,用于从 SFTP 服务器读取数据。 这意味着兼容所有 Auto Loader 选项。 对于文件名模式和通配符,请使用 pathGlobFilter 或 fileNamePattern 选项。 请参阅 自动加载程序选项。
SFTP 连接器是否可以引入加密文件? (是否支持 PGP?)
连接器不会在传输过程中解密,但你可以将加密文件以二进制文件的形式引入,并在引入后对其进行解密。
如何处理不兼容的私钥格式?
仅支持 PEM 格式。 可以通过执行以下作之一,以 PEM 格式生成私钥:
(选项 1)以标准 PEM 格式创建新的 RSA 密钥:
% ssh-keygen -t rsa -m pem(选项 2)将现有的 OpenSSH 格式密钥转换为 PEM 格式:
% ssh-keygen -p -m pem -f /path/to/key # This updates the key file.