通常使用 Apache HBase 的低级别 API(扫描、获取和放置)或者通过 Apache Phoenix 使用 SQL 语法来查询 Apache HBase。 Apache 还提供 Apache Spark HBase 连接器。 对于查询和修改 HBase 存储的数据,使用该连接器是一种便捷高效的替代方式。
部署在同一虚拟网络中的两个单独的 HDInsight 群集。 一个HBase 和一个至少安装了 Spark 2.1 (HDInsight 3.6) 的 Spark。 有关详细信息,请参阅使用 Azure 门户在 HDInsight 中创建基于 Linux 的群集。
群集主存储的 URI 方案。 对于 Azure Blob 存储,此方案为 wasb://;对于 Azure Data Lake Storage Gen2,此方案为
abfs://
。 如果为 Blob 存储启用了安全传输,则 URI 将为wasbs://
。 另请参阅安全传输。
让 Spark 群集能够查询 HBase 群集的主要过程如下所示:
- 在 HBase 中准备一些示例数据。
- 从 HBase 群集配置文件夹 (/etc/hbase/conf) 获取 hbase-site.xml 文件,并将 hbase-site.xml 的副本放入 Spark 2 配置文件夹 (/etc/spark2/conf)。 (可选:使用 HDInsight 团队提供的脚本来自动执行此过程)
- 运行
spark-shell
,在packages
中按 Maven 坐标来引用 Spark HBase 连接器。 - 定义将架构从 Spark 映射到 HBase 的目录。
- 使用 RDD 或 DataFrame API 与 HBase 数据进行交互。
此步骤中,将在 Apache HBase 中创建并填充一个表,然后可使用 Spark 对其进行查询。
使用
ssh
命令连接到 HBase 群集。 编辑命令,将HBASECLUSTER
替换为 HBase 群集的名称,然后输入该命令:ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.cn
使用
hbase shell
命令启动 HBase 交互式 shell。 在 SSH 连接中输入以下命令。hbase shell
使用
create
命令创建包含双列系列的 HBase 表。 输入以下命令:create 'Contacts', 'Personal', 'Office'
使用
put
命令将指定列中的值插入特定表中的指定行。 输入以下命令:put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
使用
exit
命令停止 HBase 交互式 shell。 输入以下命令:exit
若要设置群集之间的通信,请执行相关步骤,在群集上运行两个脚本。 这些脚本将自动执行“手动设置通信”部分所述的文件复制过程。
- 从 HBase 群集运行的脚本会将
hbase-site.xml
和 HBase IP 映射信息上传到 Spark 群集所附加的默认存储。 - 从 Spark 群集运行的脚本设置两个 cron 作业,以定期运行两个帮助器脚本:
- HBase cron 作业 - 将新的
hbase-site.xml
文件和 HBase IP 映射从 Spark 默认存储帐户下载到本地节点 - Spark cron 作业 - 检查是否发生了 Spark 缩放以及群集是否安全。 如果是,则编辑
/etc/hosts
以包含本地存储的 HBase IP 映射
- HBase cron 作业 - 将新的
注意:在继续之前,请确保已将 Spark 群集的存储帐户作为辅助存储帐户添加到了 HBase 群集。 请确保按所示顺序运行脚本。
在 HBase 群集上使用脚本操作以应用更改(考虑以下因素):
properties Value Bash 脚本 URI https://hdiconfigactions2.blob.core.chinacloudapi.cn/hbasesparkconnect/connector-hbase.sh
节点类型 区域 parameters -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
持久化 是 SECONDARYS_STORAGE_URL
是 Spark 端默认存储的 URL。 参数示例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.chinacloudapi.cn -d "securehadooprc"
在 Spark 群集上使用脚本操作以应用更改(考虑以下因素):
properties Value Bash 脚本 URI https://hdiconfigactions2.blob.core.chinacloudapi.cn/hbasesparkconnect/connector-spark.sh
节点类型 头、辅助角色、Zookeeper 参数 -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
持久化 是 - 可以指定希望此群集自动检查更新的频率。 默认值:-s "*/1 * * * *" -h 0(在此示例中,Spark cron 每分钟运行一次,而 HBase cron 不运行)
- 由于默认情况下未设置 HBase cron,因此在对 HBase 群集执行缩放时需要重新运行此脚本。 如果 HBase 群集经常缩放,可以选择自动设置 HBase cron 作业。 例如:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
将脚本配置为每 30 分钟执行一次检查。 这样将会定期运行 HBase cron 计划,以自动将公共存储帐户上的新 HBase 信息下载到本地节点。
注意
这些脚本仅适用于 HDI 5.0 和 HDI 5.1 群集。
注意: 每当其中一个群集经历缩放活动时,都需要执行这些步骤。
将 hbase-site.xml 从本地存储复制到 Spark 群集默认存储所在的根目录。 编辑命令以反映配置。 然后,在与 HBase 群集建立的 SSH 会话中输入该命令:
语法值 新值 URI 方案 修改此值以反映存储。 语法适用于启用了安全传输的 Blob 存储。 SPARK_STORAGE_CONTAINER
替换为 Spark 群集使用的默认存储容器名称。 SPARK_STORAGE_ACCOUNT
替换为 Spark 群集使用的默认存储帐户名称。 hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.chinacloudapi.cn/
然后退出与 HBase 群集的 ssh 连接。
exit
使用 SSH 连接到 Spark 集群的头节点。 编辑命令,将
SPARKCLUSTER
替换为 Spark 群集的名称,然后输入该命令:ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.cn
输入命令,将
hbase-site.xml
从 Spark 群集的默认存储复制到群集本地存储上的 Spark 2 配置文件夹中:sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
完成上述步骤后,你应该能够运行 Spark shell,并引用相应版本的 Spark HBase 连接器。
例如,下表列出了 HDInsight 团队当前使用的两个版本和相应的命令。 如果 HBase 和 Spark 的版本与表中指示的版本相同,则可以为群集使用相同的版本。
在与 Spark 群集建立的 SSH 会话中,输入以下命令以启动 Spark shell:
Spark 版本 HDI HBase 版本 SHC 版本 Command 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
保持此 Spark shell 实例处于打开状态,并继续定义目录和查询。 如果在 SHC Core 存储库中找不到与版本相对应的 jar,请继续阅读。
对于 Spark 和 HBase 版本的后续组合,这些项目不再在上述存储库中发布。 可以直接从 spark-hbase-connector GitHub 分支生成 jar。 例如,如果运行的是 Spark 2.4 和 HBase 2.1,请完成以下步骤:
克隆存储库:
git clone https://github.com/hortonworks-spark/shc
转到 branch-2.4:
git checkout branch-2.4
从分支生成(创建 .jar 文件):
mvn clean package -DskipTests
运行以下命令(请确保更改与所生成的 .jar 文件相对应的 .jar 名称):
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
保持此 Spark shell 实例处于打开状态,并继续执行下一部分。
在此步骤中,定义一个将架构从 Apache Spark 映射到 Apache HBase 的目录对象。
在打开的 Spark Shell 中,输入以下
import
语句:import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
输入以下命令,以定义在 HBase 中创建的 Contacts 表的目录:
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
代码:
- 定义名为
Contacts
的 HBase 表的目录架构。 - 将 rowkey 标识为
key
,并将 Spark 中使用的列名映射到 HBase 中使用的列族、列名和列类型。 - 将 Rowkey 定义为具有
rowkey
的特定列族cf
的命名列 (rowkey
)。
- 定义名为
输入命令,以定义一个在 HBase 中提供围绕
Contacts
表的 DataFrame 的方法:def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
创建 DataFrame 的实例:
val df = withCatalog(catalog)
查询 DataFrame:
df.show()
应看到如下两行数据:
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
注册一个临时表,以便使用 Spark SQL 查询 HBase 表:
df.createTempView("contacts")
针对
contacts
表发出 SQL 查询:spark.sqlContext.sql("select personalName, officeAddress from contacts").show
应看到如下结果:
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
若要插入新的 Contact 记录,请定义
ContactRecord
类:case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
创建
ContactRecord
的实例并将其放在一个数组中:val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
将新数据数组保存到 HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
检查结果:
df.show()
应看到如下输出:
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
通过输入以下命令关闭 spark shell:
:q