从 Snowflake 读取和写入数据
Azure Databricks 在 Databricks Runtime 中提供了一个 Snowflake 连接器,以支持通过 Snowflake 读取和写入数据。
重要
本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。
可以配置与 Snowflake 的连接,然后查询数据。 在开始之前,请检查群集运行的 Databricks Runtime 版本。 以下代码提供了 Python、SQL 和 Scala 中的示例语法。
# The following example applies to Databricks Runtime 11.3 LTS and above.
snowflake_table = (spark.read
.format("snowflake")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 443 if not specified.
.option("user", "username")
.option("password", "password")
.option("sfWarehouse", "warehouse_name")
.option("database", "database_name")
.option("schema", "schema_name") # Optional - will use default schema "public" if not specified.
.option("dbtable", "table_name")
.load()
)
# The following example applies to Databricks Runtime 10.4 and below.
snowflake_table = (spark.read
.format("snowflake")
.option("dbtable", table_name)
.option("sfUrl", database_host_url)
.option("sfUser", username)
.option("sfPassword", password)
.option("sfDatabase", database_name)
.option("sfSchema", schema_name)
.option("sfWarehouse", warehouse_name)
.load()
)
/* The following example applies to Databricks Runtime 11.3 LTS and above. */
DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 443 if not specified. */
user '<username>',
password '<password>',
sfWarehouse '<warehouse_name>',
database '<database-name>',
schema '<schema-name>', /* Optional - will use default schema "public" if not specified. */
dbtable '<table-name>'
);
SELECT * FROM snowflake_table;
/* The following example applies to Databricks Runtime 10.4 LTS and below. */
DROP TABLE IF EXISTS snowflake_table;
CREATE TABLE snowflake_table
USING snowflake
OPTIONS (
dbtable '<table-name>',
sfUrl '<database-host-url>',
sfUser '<username>',
sfPassword '<password>',
sfDatabase '<database-name>',
sfSchema '<schema-name>',
sfWarehouse '<warehouse-name>'
);
SELECT * FROM snowflake_table;
# The following example applies to Databricks Runtime 11.3 LTS and above.
val snowflake_table = spark.read
.format("snowflake")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 443 if not specified. */
.option("user", "username")
.option("password", "password")
.option("sfWarehouse", "warehouse_name")
.option("database", "database_name")
.option("schema", "schema_name") /* Optional - will use default schema "public" if not specified. */
.option("dbtable", "table_name")
.load()
# The following example applies to Databricks Runtime 10.4 and below.
val snowflake_table = spark.read
.format("snowflake")
.option("dbtable", table_name)
.option("sfUrl", database_host_url)
.option("sfUser", username)
.option("sfPassword", password)
.option("sfDatabase", database_name)
.option("sfSchema", schema_name)
.option("sfWarehouse", warehouse_name)
.load()
以下笔记本提供有关如何将数据写入到 Snowflake 或从 Snowflake 读取数据的简单示例。 有关详细信息,请参阅适用于 Spark 的 Snowflake 连接器。
提示
使用笔记本中演示的机密,避免在笔记本中公开 Snowflake 用户名和密码。
以下笔记本介绍如何使用适用于 Spark 的 Snowflake 连接器的最佳做法。 它将数据写入到 Snowflake,使用 Snowflake 进行一些基本的数据操作,训练 Azure Databricks 中的机器学习模型,并将结果写回 Snowflake。
适用于 Spark 的 Snowflake 连接器不遵循要写入的表中的列的顺序;必须显式指定数据帧和 Snowflake 列之间的映射。 若要指定此映射,请使用 columnmap 参数。
Snowflak 将所有 INTEGER
类型都表示为 NUMBER
,这可能会导致在将数据写入到 Snowflak 并从中读取数据时数据类型发生更改。 例如,在写入 Snowflak 时可以将 INTEGER
数据转换为 DECIMAL
,因为 INTEGER
和 DECIMAL
在 Snowflak 中是等效的(请参阅 Snowflak 数字数据类型)。
默认情况下,Snowflak 使用大写字段,这意味着表架构将转换为大写字母。