使用 Azure Databricks 查询 Amazon Redshift
可以使用 Azure Databricks 从 Amazon Redshift 读取和写入表。
重要
本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。
Databricks Redshift 数据源使用 Amazon S3 高效地将数据传入和传出 Redshift,并使用 JDBC 在 Redshift 上自动触发相应的 COPY
和 UNLOAD
命令。
注意
在 Databricks Runtime 11.3 LTS 及更高版本中,Databricks Runtime 包含 Redshift JDBC 驱动程序,可对格式选项使用 redshift
关键字进行访问。 有关每个 Databricks Runtime 中包含的驱动程序版本,请参阅 Databricks Runtime 发行说明版本和兼容性。 用户提供的驱动程序仍然受支持,且优先于捆绑的 JDBC 驱动程序。
在 Databricks Runtime 10.4 LTS 及更低版本中,需要手动安装 Redshift JDBC 驱动程序,并且查询应该为该格式使用驱动程序 (com.databricks.spark.redshift
)。 请参阅 Redshift 驱动程序安装。
使用情况
以下示例演示如何连接 Redshift 驱动程序。 如果使用的是 PostgreSQL JDBC 驱动程序,请替换 url
参数值。
配置 AWS 凭据后,可以将数据源与 Python、SQL、R 或 Scala 中的 Spark 数据源 API 结合使用。
重要
Unity Catalog 中定义的外部位置不支持 tempdir
位置。
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
在 Databricks Runtime 10.4 LTS 及更低版本上使用 SQL 读取数据:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
在 Databricks Runtime 11.3 LTS 及更高版本上使用 SQL 读取数据:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
使用 SQL 写入数据:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL API 仅支持创建新表,不支持覆盖或追加。
R
在 Databricks Runtime 10.4 LTS 及更低版本上使用 R 读取数据:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
在 Databricks Runtime 11.3 LTS 及更高版本上使用 R 读取数据:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
有关使用 Redshift 的建议
查询执行可能会将大量数据提取到 S3。 如果你计划对 Redshift 中的同一数据执行多个查询,Databricks 建议使用 Delta Lake 保存提取的数据。
配置
向 S3 和 Redshift 进行身份验证
数据源涉及多个网络连接,如下图所示:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
数据源在向/从 Redshift 传输数据时读取数据并写入 S3。 因此,它需要对 S3 存储桶具有读写访问权限的 AWS 凭据(使用 tempdir
配置参数指定)。
注意
数据源不会清理在 S3 中创建的临时文件。 因此,建议使用具有对象生命周期配置的专用临时 S3 存储桶,确保在指定的到期期限后自动删除临时文件。 有关如何加密这些文件的详细信息,请参阅本文档的加密部分。 不能将 Unity Catalog中定义的外部位置用作 tempdir
位置。
以下部分介绍每个连接的身份验证配置选项:
Spark 驱动程序到 Redshift
Spark 驱动程序使用用户名和密码通过 JDBC 连接到 Redshift。 Redshift 不支持使用 IAM 角色来对此连接进行身份验证。 默认情况下,此连接使用 SSL 加密;有关详细信息,请参阅加密。
Spark 到 S3
S3 充当中介,用于在 Redshift 中进行读取或写入操作时存储批量数据。 Spark 使用 Hadoop FileSystem 接口并直接使用 Amazon Java SDK 的 S3 客户端连接到 S3。
注意
不能使用 DBFS 装载来配置对 S3 for Redshift 的访问。
在 Hadoop conf 中设置密钥:可以使用 Hadoop 配置属性指定 AWS 密钥。 如果
tempdir
配置指向s3a://
文件系统,则可以在 Hadoop XML 配置文件中设置fs.s3a.access.key
和fs.s3a.secret.key
属性,或调用sc.hadoopConfiguration.set()
以配置 Spark 的全局 Hadoop 配置。 如果使用s3n://
文件系统,则可以提供旧版配置键,如以下示例所示。Scala
例如,如果使用
s3a
文件系统,请添加:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
对于旧版
s3n
文件系统,请添加:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
下面的命令依赖于某些 Spark 内部组件,但它应该适用于所有 PySpark 版本,将来不太可能更改:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift 到 S3
将 forward_spark_s3_credentials
选项设置为 true
,以自动将 Spark 用于通过 JDBC 连接到 S3 的 AWS 密钥凭据转发到 Redshift。 JDBC 查询嵌入这些凭据,因此 Databricks 强烈建议启用 JDBC 连接的 SSL 加密。
加密
保护 JDBC:除非 JDBC URL 中存在任何与 SSL 相关的设置,否则数据源默认启用 SSL 加密并验证 Redshift 服务器是否可信(即
sslmode=verify-full
)。 因此,首次需要服务器证书时会自动从 Amazon 服务器下载该证书。 如果失败,则使用预捆绑的证书文件作为备用。 这适用于 Redshift 和 PostgreSQL JDBC 驱动程序。如果此功能有任何问题,或者只想禁用 SSL,可以在
DataFrameReader
或DataFrameWriter
上调用.option("autoenablessl", "false")
。如果想指定与 SSL 相关的自定义设置,可以按照 Redshift 文档中的说明进行操作:在 Java 中使用 SSL 和服务器证书和 JDBC 驱动程序配置选项 JDBC
url
中与数据源一起使用的任何与 SSL 相关的选项都具有高优先级(即不会触发自动配置)。加密 S3 中存储的 UNLOAD 数据(从 Redshift 读取时存储的数据):根据有关将数据转存到 S3 的 Redshift 文档,“UNLOAD 使用 Amazon S3 服务器端加密 (SSE-S3) 自动加密数据文件。”
Redshift 还支持使用自定义密钥进行客户端加密(请参阅:转存加密的数据文件),但数据源无法指定所需的对称密钥。
加密 S3 中存储的 COPY 数据(写入 Redshift 时存储的数据):根据有关 从 Amazon S3 加载加密的数据文件的 Redshift 文档:
可以使用 COPY
命令加载通过服务器端加密(使用 AWS 管理的加密密钥 (SSE-S3 或 SSE-KMS))、客户端加密或同时通过这两种加密上传到 Amazon S3 的数据文件。 COPY 不支持使用客户提供的密钥 (SSE-C) 进行 Amazon S3 服务器端加密。
参数
Spark SQL 中提供的参数映射或 OPTIONS 支持以下设置:
参数 | 必须 | 默认 | 说明 |
---|---|---|---|
dbtable | 是,除非指定了查询。 | 无 | 要从 Redshift 中创建或读取的表。 将数据保存回 Redshift 时,需要此参数。 |
查询 | 是,除非指定了 dbtable。 | 无 | 要从 Redshift 中读取的查询。 |
user | 否 | 无 | Redshift 用户名。 必须与密码选项一起使用。 仅当 URL 中未传递用户和密码时才能使用,同时传递两者将导致错误。 当用户名包含需要转义的特殊字符时,请使用此参数。 |
password | 否 | 无 | Redshift 密码。 必须与 user 选项一起使用。 仅当 URL 中未传递用户和密码时才能使用;同时传递两者将导致错误。 当密码包含需要转义的特殊字符时,请使用此参数。 |
url | 是 | 无 | 格式的 JDBC URLjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol 可以是 postgresql 或 redshift ,具体取决于已加载的 JDBC 驱动程序。 一个 Redshift 兼容的驱动程序必须位于类路径上,并匹配此 URL。 host 和 port 应指向 Redshift 主节点,因此必须将安全组和/或 VPC 配置为允许从驱动程序应用程序进行访问。database 标识 Redshift 数据库名称 user ,并且 password 是访问数据库的凭据,该数据库必须嵌入到 JDBC 的此 URL 中,并且用户帐户应具有引用表所需的权限。 |
search_path | 否 | 无 | 在 Redshift 中设置架构搜索路径。 将使用 SET search_path to 命令进行设置。 应该是要在其中搜索表的逗号分隔的架构名称列表。 请参阅 search_path 的 Redshift 文档。 |
aws_iam_role | 仅当使用 IAM 角色进行授权时。 | 无 | 附加到 Redshift 群集的 IAM Redshift COPY/UNLOAD 操作角色完全指定的 ARN,例如 arn:aws:iam::123456789000:role/<redshift-iam-role> 。 |
forward_spark_s3_credentials | 否 | false |
如果 true ,数据源会自动发现 Spark 用于连接到 S3 的凭据,并通过 JDBC 将这些凭据转发到 Redshift。 这些凭据作为 JDBC 查询的一部分发送,因此强烈建议在使用此选项时启用 JDBC 连接的 SSL 加密。 |
temporary_aws_access_key_id | 否 | 无 | AWS 访问密钥必须具有 S3 存储桶的写入权限。 |
temporary_aws_secret_access_key | 否 | 无 | 与提供的访问密钥对应的 AWS 机密访问密钥。 |
temporary_aws_session_token | 否 | 无 | 与提供的访问密钥对应的 AWS 会话令牌。 |
tempdir | 是 | 无 | Amazon S3 中的可写位置,用于存放在读取时卸载的数据以及在写入时加载到 Redshift 中的 Avro 数据。 如果正在将 Redshift 数据源作为常规 ETL 管道的一部分用于 Spark,则可以在存储桶上设置生命周期策略并将存储桶用作此数据的临时位置。 不能将 Unity Catalog 中定义的外部位置用作 tempdir 位置。 |
jdbcdriver | 否 | 取决于 JDBC URL 的子协议。 | 要使用的 JDBC 驱动程序的类名。 此类必须位于类路径中。 在大多数情况下无需指定此选项,因为相应的驱动程序类名会由 JDBC URL 的子协议自动确定。 |
diststyle | 否 | EVEN |
创建表时要使用的 Redshift 分发方式。 可以是 EVEN 、KEY 或 ALL 中的一个(请参阅 Redshift 文档)。 使用 KEY 时,还必须使用 distkey 选项设置分发密钥。 |
distkey | 否,除非使用 DISTSTYLE KEY |
无 | 创建表时表中用作分发密钥的列的名称。 |
sortkeyspec | 否 | 无 | 完整的 Redshift 排序键 定义。 示例包括: * SORTKEY(my_sort_column) * COMPOUND SORTKEY(sort_col_1, sort_col_2) * INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable(已弃用) | 否 | true |
将此已弃用的选项设置为 false 会导致覆盖操作的目标表在写入开始时立即被删除,使覆盖操作成为非原子操作,并减少目标表的可用性。 这可能会减少覆盖的临时磁盘空间要求。由于设置 usestagingtable=false 操作可能会造成数据丢失或不可用,因此弃用该操作,而是要求您手动删除目标表。 |
description | 否 | 无 | 表的说明。 将使用 SQL COMMENT 命令进行设置,并且应在大多数查询工具中显示。 另请参阅用于设置各个列的说明的 description 元数据。 |
preactions | 否 | 无 | 加载 COPY 命令之前要执行的 SQL 命令的 ; 分隔列表。 在加载新数据之前,在此处运行某些 DELETE 命令或类似的命令可能很有用。 如果命令包含 %s ,则表名称在执行前进行格式化(如果使用临时表)。请注意,如果这些命令失败,则会将其视为错误并引发异常。 如果使用临时表,则会还原更改,如果预操作失败,备份表将还原。 |
postactions | 否 | 无 | 加载数据时成功 COPY 后要执行的 SQL 命令的 ; 分隔列表。 在加载新数据时,在此处运行某些 GRANT 命令或类似的命令可能很有用。 如果命令包含 %s ,则表名称在执行前进行格式化(如果使用临时表)。请注意,如果这些命令失败,则会将其视为错误并引发异常。 如果使用临时表,则会还原更改,如果发布操作失败,备份表将还原。 |
extracopyoptions | 否 | 无 | 加载数据时追加到 Redshift COPY 命令的额外选项列表,例如TRUNCATECOLUMNS 或 MAXERROR n (有关其他选项,请参阅 Redshift 文档)。由于这些选项追加到 COPY 命令末尾,因此只能使用命令末尾有意义的选项,但应涵盖大多数可能的用例。 |
tempformat | 否 | AVRO |
写入 Redshift 时,在 S3 中保存临时文件的格式。 默认为 AVRO ;对于 CSV 和 gzip 压缩的 CSV,其他允许的值分别为 CSV 和 CSV GZIP 。在加载 CSV 时,Redshift 的速度比加载 Avro 文件时要快得多,因此在写入 Redshift 时,使用临时格式可能会提高性能。 |
csvnullstring | 否 | @NULL@ |
使用 CSV 临时格式时要为 null 写入的字符串值。 此值应不显示在实际数据中。 |
csvseparator | 否 | , |
在将 tempformat 设置为 CSV 或CSV GZIP 。 它必须是有效的 ASCII 字符,例如“, ”或“\| ”。 |
csvignoreleadingwhitespace | 否 | true |
设置为 true 时,在以下情况下,则会在写入期间从值中删除前导空格:tempformat 设置为 CSV 或 CSV GZIP 。 否则,将保留空格。 |
csvignoretrailingwhitespace | 否 | true |
设置为 true 时,在以下情况下,则会在写入期间从值中删除尾随空格:tempformat 设置为 CSV 或 CSV GZIP 。 否则,将保留空格。 |
infer_timestamp_ntz_type | 否 | false |
如果 true ,则类型 Redshift TIMESTAMP 的值在读取期间被解释为 TimestampNTZType (不含时区的时间戳)。 否则,所有时间戳都将被解释为 TimestampType ,无论底层 Redshift 表的类型如何。 |
其他配置选项
配置字符串列的最大大小
创建 Redshift 表时,默认行为是为字符串列创建 TEXT
列。 Redshift 将 TEXT
列存储为 VARCHAR(256)
,因此这些列的最大大小为 256 个字符(源)。
若要支持更大的列,可以使用 maxlength
列元数据字段来指定单个字符串列的最大长度。 这对于以下目标也很有用:通过声明最大长度小于默认值的列来优化内存节省性能。
注意
由于 Spark 中的限制,SQL 和 R 语言 API 不支持列元数据修改。
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
下面是使用 Spark 的 Scala API 更新多个列的元数据字段的示例:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
设置自定义列类型
如果需要手动设置列类型,可以使用 redshift_type
列元数据。 例如,如果要覆盖 Spark SQL Schema -> Redshift SQL
类型匹配程序以分配用户定义的列类型,可以执行以下操作:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
配置列编码
创建表时,使用 encoding
列元数据字段为每列指定压缩编码(有关可用编码,请参阅 Amazon 文档)。
在列上设置说明
Redshift 允许列附加应在大多数查询工具中显示的说明(使用 COMMENT
命令)。 可以设置 description
列元数据字段来指定各个列的说明。
查询下推到 Redshift
Spark 优化器将以下运算符下推到 Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
在 Project
和 Filter
中,它支持以下表达式:
- 大多数布尔逻辑运算符
- 比较
- 基本算术运算
- 数值和字符串强制转换
- 大多数字符串函数
- 标量子查询(如果它们可以完全下推到 Redshift)。
注意
此下推不支持对日期和时间戳进行操作的表达式。
在 Aggregation
中,它支持以下聚合函数:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
适用时与 DISTINCT
子句结合使用。
在 Join
中,它支持以下类型的联接:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- 由优化器重写为
Join
的子查询,例如WHERE EXISTS
、WHERE NOT EXISTS
注意
联接下推不支持 FULL OUTER JOIN
。
在使用 LIMIT
的查询中,下推可能最适用。 SELECT * FROM large_redshift_table LIMIT 10
等查询可能耗时很长,因为整个表将首先作为中间结果转存到 S3。 下推时,在 Redshift 中执行 LIMIT
。 在具有聚合的查询中,将聚合下推到 Redshift 也有助于减少需传输的数据量。
默认启用查询下推到 Redshift。 可通过将 spark.databricks.redshift.pushdown
设置为 false
来将其禁用。 即使禁用,Spark 仍会将筛选器下推到 Redshift 并在其中执列消除。
Redshift 驱动程序安装
Redshift 数据源还需要与 Redshift 兼容的 JDBC 驱动程序。 由于 Redshift 基于 PostgreSQL 数据库系统,可以使用 Databricks Runtime 附带的 PostgreSQL JDBC 驱动程序或 Amazon 推荐的 Redshift JDBC 驱动程序。 PostgreSQL JDBC 驱动程序无需安装即可使用。 Databricks Runtime 发行说明中列出了每个 Databricks Runtime 版本中附带的 PostgreSQL JDBC 驱动程序的版本。
若要手动安装 Redshift JDBC 驱动程序,请执行以下操作:
注意
Databricks 建议使用最新版本的 Redshift JDBC 驱动程序。 版本低于 1.2.41 的 Redshift JDBC 驱动程序存在以下限制:
- 在 SQL 查询中使用
where
子句时,1.2.16 版驱动程序返回空数据。 - 版本低于 1.2.41 的驱动程序可能会返回无效结果,因为列的可为 Null 性未被报告为“未知”,而是被错误地报告为“不可为 Null”。
事务性保证
本部分介绍 Spark 的 Redshift 数据源的事务性保证。
Redshift 和 S3 属性的一般背景知识
有关 Redshift 事务性保证的一般信息,请参阅 Redshift 文档中的管理并发写入操作一章。 简而言之,Redshift 根据 Redshift BEGIN 命令的文档提供可序列化隔离:
[尽管] 可以使用四个事务隔离级别中的任意一个,Amazon Redshift 仍将所有隔离级别都处理为可序列化的。
根据 Redshift 文档:
Amazon Redshift 支持默认的自动提交行为,其中每个单独执行的 SQL 命令单独提交。
因此,COPY
和 UNLOAD
等单独的命令是原子性的和事务性的,而显式的 BEGIN
和 END
只需强制执行多个命令或查询的原子性。
在 Redshift 中读取和写入时,数据源会在 S3 中读取和写入数据。 Spark 和 Redshift 都生成分区输出并存储在 S3 中的多个文件中。 根据 Amazon S3 数据一致性模型文档,S3 存储桶列表操作是最终一致的,因此文件必须为特定长度,以避免因这种最终一致性而导致数据丢失或不完整。
Spark Redshift 数据源的保证
追加到现有表
将行插入 Redshift 时,数据源使用 COPY 命令并指定清单来防止某些最终一致的 S3 操作。 因此,spark-redshift
追加到与常规 Redshift COPY
命令具有相同原子性和事务性属性的现有表中。
新建表 (SaveMode.CreateIfNotExists
)
新建表是一个两步式过程,包括依次使用 CREATE TABLE
命令和 COPY 命令来追加初始行集。 这两个操作都在同一事务中执行。
覆盖现有表
默认情况下,数据源使用事务来执行覆盖,具体方法是删除目标表、新建空表并向其追加行。
如果已弃用的 usestagingtable
设置设为 false
,则数据源会先提交 DELETE TABLE
命令,然后将行追加到新表,这会牺牲覆盖操作的原子性,但可减少 Redshift 在覆盖期间所需的过渡内存量。
查询 Redshift 表
查询使用 Redshift UNLOAD 命令执行查询并将其结果保存到 S3,并且使用清单来防止某些最终一致的 S3 操作。 因此,Spark 的 Redshift 数据源的查询应与常规 Redshift 查询具有相同的一致性属性。
常见问题和解决方法
S3 存储桶和 Redshift 群集位于不同的 AWS 区域
默认情况下,如果 S3 存储桶和 Redshift 群集位于不同的 AWS 区域,则 S3 <-> Redshift 副本不起作用。
如果在 S3 存储桶位于不同区域时尝试读取 Redshift 表,则可能会看到如下错误:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
同样,尝试使用不同区域中的 S3 存储桶写入 Redshift 可能会导致以下错误:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
写入:Redshift COPY 命令支持显式指定 S3 存储桶区域,因此可以通过将
region 'the-region-name'
添加到extracopyoptions
设置来使对 Redshift 的写入在这些情况下正常生效。 例如,对于“cn-north-1
(北京)”区域中的 Bucket 和 Scala API,请使用以下命令:.option("extracopyoptions", "region 'cn-north-1'")
也可以使用
awsregion
设置:.option("awsregion", "cn-north-1")
读取:Redshift UNLOAD 命令还支持显式指定 S3 存储桶区域。 可以通过将区域添加到
awsregion
设置来使读取正常生效:.option("awsregion", "cn-north-1")
在 JDBC url 中使用带有特殊字符的密码时出现身份验证错误
如果 JDBC url 中包含用户名和密码,且密码包含特殊字符,例如 ;
、?
或 &
,则可能会显示以下异常:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
该异常是由于 JDBC 驱动程序无法正确转义用户名或密码中的特殊字符而造成的。 确保使用相应的 DataFrame 选项 user
和 password
指定用户名和密码。 有关详细信息,请参阅参数。
即使已完成相应的 Redshift 操作,长时间运行的 Spark 查询也会无限期挂起
如果从/向 Redshift 读取或写入大量数据,Spark 查询可能会无限期挂起,即使 AWS Redshift 监视页面显示相应的 LOAD
或 UNLOAD
操作已完成且群集处于空闲状态。 这种情况是因为 Redshift 与 Spark 之间的连接超时而造成的。为避免这种情况,请确保启用 tcpKeepAlive
JDBC 标记并将 TCPKeepAliveMinutes
设置为较小的值(例如 1)。
有关详细信息,请参阅 Amazon Redshift JDBC 驱动程序配置。
包含时区语义的时间戳
读取数据时,Redshift TIMESTAMP
和 TIMESTAMPTZ
数据类型都映射到 Spark TimestampType
,并且值会转换为协调世界时 (UTC) 并存储为 UTC 时间戳。 对于 Redshift TIMESTAMP
,假定为本地时区,因为值没有任何时区信息。 将数据写入 Redshift 表时,Spark TimestampType
被映射到 Redshift TIMESTAMP
数据类型。
迁移指南
数据源现在要求在将 Spark S3 凭据转发到 Redshift 之前显式设置 forward_spark_s3_credentials
。 如果使用 aws_iam_role
或 temporary_aws_*
身份验证机制,此更改不会产生任何影响。 但如果使用旧的默认机制,则现在必须将 forward_spark_s3_credentials
显式设置为 true
才能继续使用之前的 Redshift 到 S3 的身份验证机制。 有关三种身份验证机制及其安全权衡的详细信息,请参阅本文档的向 S3 和 Redshift 进行身份验证部分。