共用方式為

从 SFTP 服务器引入文件

重要

SFTP 连接器为 公共预览版

了解如何使用 Lakeflow Connect 从 SFTP 服务器引入文件。 SFTP 连接器扩展了自动加载功能,通过 Unity 目录治理从 SFTP 服务器提供安全的增量摄取功能。

主要功能

SFTP 连接器提供以下内容:

  • 私钥和基于密码的身份验证。
  • 增量文件引入和处理,并提供一次性保证。
  • 自动架构推理、演变和数据救援。
  • 用于安全引入和凭据的 Unity 目录治理。
  • 宽文件格式支持:JSON、、、CSVXMLPARQUETAVROTEXT、和BINARYFILEORC
  • 内置支持模式和通配符匹配,以轻松定位数据子集。
  • 支持所有计算类型,包括 Lakeflow Spark 声明性管道、Databricks SQL、无服务器架构和经典架构,并且要求 Databricks Runtime 17.3 及以上版本。

在您开始之前

若要创建连接和引入管道,必须具有以下各项:

  • 一个启用了 Unity Catalog 的工作区。
  • CREATE CONNECTION 特权。
  • 使用版本为 17.3 或更高的 Databricks Runtime 的计算。

配置 SFTP

首先,确认 Databricks 群集环境可以访问源 SFTP 服务器:

  • 确保远程服务器在配置了您工作区的 VPC 中可用。
  • 确保您的 SSH 规则允许 Databricks VPC 的 IP 范围(如果您使用的是经典计算)或稳定的 IP(如果您使用的是无服务器计算)。
  • 在经典计算平面中,连同负载均衡器、NAT 网关、Internet 网关或等效网关一起设置一个稳定的 IP 地址,并将其与部署 Azure Databricks 计算的子网连接。 这样,计算资源就可以共享可在 SFTP 服务器端列入允许列表的稳定公共 IP 地址。 有关配置网络设置的说明,请参阅 对等虚拟网络

创建连接

创建用于存储 SFTP 凭据的 Unity 目录连接。 必须具有 CREATE CONNECTION 权限。

连接器支持以下身份验证方法:

  • PEM 私钥
  • 基于密码的身份验证

Databricks 建议使用 PEM 私钥身份验证。 Databricks 还建议在源 SFTP 服务器上使用最低权限的凭据(例如,只读访问权限受限的非根用户)。

创建管道时,连接器会尝试自动查找可以使用的连接,并且该连接与主机匹配。 如果有多个匹配的连接,连接器将选择成功连接到主机的第一个连接。 但是,Databricks 建议显式指定用户。 这可确保连接器不会为具有主机访问权限的另一个用户选择连接。

目录资源管理器

  1. 在 Azure Databricks 工作区中,单击“目录”>“外部数据”>“连接”。

  2. 单击“ 创建连接”。

  3. “设置连接向导”的“连接基础知识”页上,输入唯一的连接名称

  4. 对于 连接类型 ,请选择 SFTP

  5. 对于 身份验证类型,请选择 PEM 私钥

  6. 单击 “下一步”

  7. “身份验证 ”页上,对于 “主机”,输入外国服务器的主机名。

  8. 对于 用户,输入用于访问外部实例的用户标识。

  9. 单击 “下一步”

  10. “连接详细信息 ”页上,输入 PEM 格式的私钥。 此外,请提供密钥口令(如果适用)。

  11. 如果要跳过主机密钥指纹检查,请取消选择 “强制实施主机密钥指纹”。

    选择此选项后,仅当服务器的公钥与预期的 SHA-256 指纹匹配时,连接才会继续。 当禁用时,无论匹配情况如何,连接仍会继续。 在禁用此功能之前,请与网络管理员联系。

  12. 如果选中 “强制实施主机密钥指纹 ”,请输入 SFTP 服务器指纹。

    可以从服务器管理员或使用 CLI 命令检索指纹。 还可以按 “测试”并创建连接>测试。 生成的错误消息提供指纹。 例如:

    ECDSA key fingerprint is SHA256:XXX/YYY

  13. 单击“ 测试”并创建连接

  14. 如果连接成功,请单击“ 创建”。

SQL

-- Create a connection using a username and SSH private key.
CREATE CONNECTION my_sftp_connection
TYPE sftp
OPTIONS (
  host 'my.sftpserver.com',
  -- The following credentials can also be used in-line, but Databricks recommends
  -- accessing them using the secrets scope.
  user secret('my_secret_scope','my_sftp_username'),
  pem_private_key secret('my_secret_scope','my_sftp_private_key'),
  -- Port for the host
  port '22',
  -- Passphrase for the private key (optional).
  pem_key_passphrase secret('my_secret_scope','my_sftp_private_key_passphrase'),
  -- SFTP server fingerprint. You can retrieve this from your server administrator or using CLI commands.
  key_fingerprint 'SHA256:ASampleFingerprintValueZy...',
);

基于密码的身份验证

目录资源管理器

  1. 在 Azure Databricks 工作区中,单击“目录”>“外部数据”>“连接”。

  2. 单击“ 创建连接”。

  3. “设置连接向导”的“连接基础知识”页上,输入唯一的连接名称

  4. 对于 连接类型 ,请选择 SFTP

  5. 对于 身份验证类型,请选择 “用户名和密码”。

  6. 单击 “下一步”

  7. “身份验证 ”页上,对于 “主机”,输入外国服务器的主机名。

  8. 对于 用户,输入用于访问外部实例的用户标识。

  9. 对于 “密码”,请输入外部实例的密码。

  10. 单击 “下一步”

  11. 如果要跳过主机密钥指纹检查,请取消选择 “强制实施主机密钥指纹”。

    选择此选项后,仅当服务器的公钥与预期的 SHA-256 指纹匹配时,连接才会继续。 当禁用时,无论匹配情况如何,连接仍会继续。 在禁用此功能之前,请与网络管理员联系。

  12. 如果选中 “强制实施主机密钥指纹 ”,请输入 SFTP 服务器指纹。

    可以从服务器管理员或使用 CLI 命令检索指纹。 还可以按 “测试”并创建连接>测试。 生成的错误消息提供指纹。 例如:

    ECDSA key fingerprint is SHA256:XXX/YYY

  13. 单击“ 测试”并创建连接

  14. 如果连接成功,请单击“ 创建”。

SQL

-- Create a connection using a username and password.
CREATE CONNECTION my_sftp_connection
TYPE sftp
OPTIONS (
  host 'my.sftpserver.com',
  user secret('my_secret_scope','my_sftp_username'),
  password secret('my_secret_scope','my_sftp_password'),
  -- Port for the host.
  port '22',
  -- SFTP server fingerprint. You can retrieve this from your server administrator or using CLI commands.
  key_fingerprint 'SHA256:ASampleFingerprintValueZy...',
);

从 SFTP 服务器读取文件

以下示例演示如何使用自动加载程序流式处理功能从 SFTP 服务器读取文件。 有关自动加载程序使用情况的详细信息,请参阅 常见数据加载模式

# Run the Auto Loader job to ingest all existing data in the SFTP server.
df = (spark.readStream.format("cloudFiles")
  .option("cloudFiles.schemaLocation", "<path to store schema information>") # This is a cloud storage path
  .option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
  # Specify the absolute path on the SFTP server starting from the root /.
  # Example: /home/<username>/data/files or /uploads/csv_files
  .load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")
  .writeStream
  .format("delta")
  .option("checkpointLocation", "<path to store checkpoint information>") # This is a cloud storage path.
  .trigger(availableNow = True)
  .table("<table name>"))
df.awaitTermination()

以下示例演示如何在 Lakeflow Spark 声明性管道中使用自动加载程序从 SFTP 服务器读取文件:

Python

from pyspark import pipelines as dp

@dp.table
def sftp_bronze_table():
  return (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
    # Specify the absolute path on the SFTP server starting from the root /.
    # Example: /home/username/data/files or /uploads/csv_files
    .load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")))

SQL

CREATE OR REFRESH STREAMING TABLE sftp_bronze_table
AS SELECT * FROM STREAM read_files(
  "sftp://<username>@<host>:<port>/<absolute_path_to_files>",
  format => "csv"
)

配置 自动加载程序选项。 除以下选项外,支持所有选项:

  • cloudFiles.useNotifications
  • cloudFiles.useManagedFileEvents
  • cloudFiles.cleanSource
  • 特定于云的选项

局限性

  • 其他引入图面(包括 COPY INTOspark.readdbutils.ls)不支持 SFTP。
  • 不支持写回到 SFTP 服务器。
  • 不支持自动加载器 cleanSource (在引入后删除或归档源文件)。
  • 不支持 FTP 协议。

FAQ

查找有关 SFTP 连接器的常见问题的解答。

如何使用通配符或文件名模式选择要引入的文件?

SFTP 连接器基于标准 Auto Loader 框架构建,用于从 SFTP 服务器读取数据。 这意味着兼容所有 Auto Loader 选项。 对于文件名模式和通配符,请使用 pathGlobFilterfileNamePattern 选项。 请参阅 自动加载程序选项

SFTP 连接器是否可以引入加密文件? (是否支持 PGP?)

连接器不会在传输过程中解密,但你可以将加密文件以二进制文件的形式引入,并在引入后对其进行解密。

如何处理不兼容的私钥格式?

仅支持 PEM 格式。 可以通过执行以下作之一,以 PEM 格式生成私钥:

  • (选项 1)以标准 PEM 格式创建新的 RSA 密钥:

    % ssh-keygen -t rsa -m pem
    
  • (选项 2)将现有的 OpenSSH 格式密钥转换为 PEM 格式:

    % ssh-keygen -p -m pem -f /path/to/key  # This updates the key file.
    

其他资源