다음을 통해 공유

从 Snowflake 读取和写入数据

Azure Databricks 在 Databricks Runtime 中提供了一个 Snowflake 连接器,以支持通过 Snowflake 读取和写入数据。

重要

旧查询联合文档已停用,可能不会更新。 此内容中提到的配置未经 Databricks 正式认可或测试。 如果 Lakehouse 联邦 支持源数据库,Databricks 建议改用它。

在 Azure Databricks 中查询 Snowflake 表

可以配置与 Snowflake 的连接,然后查询数据。 在开始之前,请检查群集运行的 Databricks Runtime 版本。 以下代码提供了 Python、SQL 和 Scala 中的示例语法。

Python


# The following example applies to Databricks Runtime 11.3 LTS and above.

snowflake_table = (spark.read
  .format("snowflake")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 443 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("sfWarehouse", "warehouse_name")
  .option("database", "database_name")
  .option("schema", "schema_name") # Optional - will use default schema "public" if not specified.
  .option("dbtable", "table_name")
  .load()
)

# The following example applies to Databricks Runtime 10.4 and below.

snowflake_table = (spark.read
  .format("snowflake")
  .option("dbtable", table_name)
  .option("sfUrl", database_host_url)
  .option("sfUser", username)
  .option("sfPassword", password)
  .option("sfDatabase", database_name)
  .option("sfSchema", schema_name)
  .option("sfWarehouse", warehouse_name)
  .load()
)

SQL


/* The following example applies to Databricks Runtime 11.3 LTS and above. */

DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
    host '<hostname>',
    port '<port>', /* Optional - will use default port 443 if not specified. */
    user '<username>',
    password '<password>',
    sfWarehouse '<warehouse_name>',
    database '<database-name>',
    schema '<schema-name>', /* Optional - will use default schema "public" if not specified. */
    dbtable '<table-name>'
);
SELECT * FROM snowflake_table;

/* The following example applies to Databricks Runtime 10.4 LTS and below. */

DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
    dbtable '<table-name>',
    sfUrl '<database-host-url>',
    sfUser '<username>',
    sfPassword '<password>',
    sfDatabase '<database-name>',
    sfSchema '<schema-name>',
    sfWarehouse '<warehouse-name>'
);
SELECT * FROM snowflake_table;

Scala(编程语言)


# The following example applies to Databricks Runtime 11.3 LTS and above.

val snowflake_table = spark.read
  .format("snowflake")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 443 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("sfWarehouse", "warehouse_name")
  .option("database", "database_name")
  .option("schema", "schema_name") /* Optional - will use default schema "public" if not specified. */
  .option("dbtable", "table_name")
  .load()

# The following example applies to Databricks Runtime 10.4 and below.

val snowflake_table = spark.read
  .format("snowflake")
  .option("dbtable", table_name)
  .option("sfUrl", database_host_url)
  .option("sfUser", username)
  .option("sfPassword", password)
  .option("sfDatabase", database_name)
  .option("sfSchema", schema_name)
  .option("sfWarehouse", warehouse_name)
  .load()

笔记本示例:适用于 Spark 的 Snowflake 连接器

以下笔记本提供有关如何将数据写入到 Snowflake 或从 Snowflake 读取数据的简单示例。 有关更多详细信息,请参阅 Spark 的 Snowflake 连接器

小窍门

避免在笔记本中公开 Snowflake 用户名和密码,应使用笔记本中展示的 机密功能。

Snowflake Python 笔记本

获取笔记本

笔记本示例:将模型训练结果保存到 Snowflake

以下笔记本介绍如何使用适用于 Spark 的 Snowflake 连接器的最佳做法。 它将数据写入到 Snowflake,使用 Snowflake 进行一些基本的数据操作,训练 Azure Databricks 中的机器学习模型,并将结果写回 Snowflake。

在 Snowflake 笔记本中存储 ML 训练结果

获取笔记本

常见问题 (FAQ)

为什么我的 Spark 数据帧列在 Snowflake 中的顺序不相同?

适用于 Spark 的 Snowflake 连接器不遵循要写入的表中的列的顺序;必须显式指定数据帧和 Snowflake 列之间的映射。 若要指定此映射,请使用 columnmap 参数

为什么向 Snowflake 写入的 INTEGER 数据作为 DECIMAL 读回?

Snowflak 将所有 INTEGER 类型都表示为 NUMBER,这可能会导致在将数据写入到 Snowflak 并从中读取数据时数据类型发生更改。 例如,写入 Snowflake 时,可以将 INTEGER 数据转换为 DECIMAL ,因为 INTEGERDECIMAL 在 Snowflake 中是语义等效的(请参阅 Snowflake 数值数据类型)。

为什么 Snowflak 表架构中的字段总是大写的?

默认情况下,Snowflak 使用大写字段,这意味着表架构将转换为大写字母。