从 Snowflake 读取和写入数据

Azure Databricks 在 Databricks Runtime 中提供了一个 Snowflake 连接器,以支持通过 Snowflake 读取和写入数据。

重要

本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。

在 Azure Databricks 中查询 Snowflake 表

可以配置与 Snowflake 的连接,然后查询数据。 在开始之前,请检查群集运行的 Databricks Runtime 版本。 以下代码提供了 Python、SQL 和 Scala 中的示例语法。

Python


# The following example applies to Databricks Runtime 11.3 LTS and above.

snowflake_table = (spark.read
  .format("snowflake")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 443 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("sfWarehouse", "warehouse_name")
  .option("database", "database_name")
  .option("schema", "schema_name") # Optional - will use default schema "public" if not specified.
  .option("dbtable", "table_name")
  .load()
)

# The following example applies to Databricks Runtime 10.4 and below.

snowflake_table = (spark.read
  .format("snowflake")
  .option("dbtable", table_name)
  .option("sfUrl", database_host_url)
  .option("sfUser", username)
  .option("sfPassword", password)
  .option("sfDatabase", database_name)
  .option("sfSchema", schema_name)
  .option("sfWarehouse", warehouse_name)
  .load()
)

SQL


/* The following example applies to Databricks Runtime 11.3 LTS and above. */

DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
    host '<hostname>',
    port '<port>', /* Optional - will use default port 443 if not specified. */
    user '<username>',
    password '<password>',
    sfWarehouse '<warehouse_name>',
    database '<database-name>',
    schema '<schema-name>', /* Optional - will use default schema "public" if not specified. */
    dbtable '<table-name>'
);
SELECT * FROM snowflake_table;

/* The following example applies to Databricks Runtime 10.4 LTS and below. */

DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
    dbtable '<table-name>',
    sfUrl '<database-host-url>',
    sfUser '<username>',
    sfPassword '<password>',
    sfDatabase '<database-name>',
    sfSchema '<schema-name>',
    sfWarehouse '<warehouse-name>'
);
SELECT * FROM snowflake_table;

Scala


# The following example applies to Databricks Runtime 11.3 LTS and above.

val snowflake_table = spark.read
  .format("snowflake")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 443 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("sfWarehouse", "warehouse_name")
  .option("database", "database_name")
  .option("schema", "schema_name") /* Optional - will use default schema "public" if not specified. */
  .option("dbtable", "table_name")
  .load()

# The following example applies to Databricks Runtime 10.4 and below.

val snowflake_table = spark.read
  .format("snowflake")
  .option("dbtable", table_name)
  .option("sfUrl", database_host_url)
  .option("sfUser", username)
  .option("sfPassword", password)
  .option("sfDatabase", database_name)
  .option("sfSchema", schema_name)
  .option("sfWarehouse", warehouse_name)
  .load()

笔记本示例:适用于 Spark 的 Snowflake 连接器

以下笔记本提供有关如何将数据写入到 Snowflake 或从 Snowflake 读取数据的简单示例。 有关详细信息,请参阅适用于 Spark 的 Snowflake 连接器

提示

使用笔记本中演示的机密,避免在笔记本中公开 Snowflake 用户名和密码。

Snowflake Python 笔记本

获取笔记本

笔记本示例:将模型训练结果保存到 Snowflake

以下笔记本介绍如何使用适用于 Spark 的 Snowflake 连接器的最佳做法。 它将数据写入到 Snowflake,使用 Snowflake 进行一些基本的数据操作,训练 Azure Databricks 中的机器学习模型,并将结果写回 Snowflake。

在 Snowflake 笔记本中存储 ML 训练结果

获取笔记本

常见问题 (FAQ)

为什么我的 Spark 数据帧列在 Snowflake 中的顺序不相同?

适用于 Spark 的 Snowflake 连接器不遵循要写入的表中的列的顺序;必须显式指定数据帧和 Snowflake 列之间的映射。 若要指定此映射,请使用 columnmap 参数

为什么写入 Snowflake 的 INTEGER 数据作为 DECIMAL 读回?

Snowflak 将所有 INTEGER 类型都表示为 NUMBER,这可能会导致在将数据写入到 Snowflak 并从中读取数据时数据类型发生更改。 例如,在写入 Snowflak 时可以将 INTEGER 数据转换为 DECIMAL,因为 INTEGERDECIMAL 在 Snowflak 中是等效的(请参阅 Snowflak 数字数据类型)。

为什么 Snowflak 表架构中的字段总是大写的?

默认情况下,Snowflak 使用大写字段,这意味着表架构将转换为大写字母。