使用 Apache Spark 读取和写入 Apache HBase 数据Use Apache Spark to read and write Apache HBase data

通常使用 Apache HBase 的低级别 API(扫描、获取和放置)或者通过 Apache Phoenix 使用 SQL 语法来查询 Apache HBase。Apache HBase is typically queried either with its low-level API (scans, gets, and puts) or with a SQL syntax using Apache Phoenix. Apache 还提供 Apache Spark HBase 连接器。Apache also provides the Apache Spark HBase Connector. 对于查询和修改 HBase 存储的数据,使用该连接器是一种便捷高效的替代方式。The Connector is a convenient and efficient alternative to query and modify data stored by HBase.

先决条件Prerequisites

  • 部署在同一虚拟网络中的两个单独的 HDInsight 群集。Two separate HDInsight clusters deployed in the same virtual network. 一个HBase 和一个至少安装了 Spark 2.1 (HDInsight 3.6) 的 Spark。One HBase, and one Spark with at least Spark 2.1 (HDInsight 3.6) installed. 有关详细信息,请参阅使用 Azure 门户在 HDInsight 中创建基于 Linux 的群集For more information, see Create Linux-based clusters in HDInsight using the Azure portal.

  • 群集主存储的 URI 方案。The URI scheme for your clusters primary storage. 对于 Azure Blob 存储,此方案为 wasb://;对于 Azure Data Lake Storage Gen2,此方案为 abfs://This scheme would be wasb:// for Azure Blob Storage, abfs:// for Azure Data Lake Storage Gen2. 如果为 Blob 存储启用了安全传输,则 URI 将为 wasbs://If secure transfer is enabled for Blob Storage, the URI would be wasbs://. 另请参阅安全传输See also, secure transfer.

整体进程Overall process

让 Spark 群集能够查询 HBase 群集的主要过程如下所示:The high-level process for enabling your Spark cluster to query your HBase cluster is as follows:

  1. 在 HBase 中准备一些示例数据。Prepare some sample data in HBase.
  2. 从 HBase 群集配置文件夹 (/etc/hbase/conf) 获取 hbase-site.xml 文件,并将 hbase-site.xml 的副本放入 Spark 2 配置文件夹 (/etc/spark2/conf)。Acquire the hbase-site.xml file from your HBase cluster configuration folder (/etc/hbase/conf), and place a copy of hbase-site.xml in your Spark 2 configuration folder (/etc/spark2/conf). (可选:使用 HDInsight 团队提供的脚本来自动执行此过程)(OPTIONAL: use script provided by HDInsight team to automate this process)
  3. 运行 spark-shell,在 packages 中按 Maven 坐标来引用 Spark HBase 连接器。Run spark-shell referencing the Spark HBase Connector by its Maven coordinates in the packages option.
  4. 定义将架构从 Spark 映射到 HBase 的目录。Define a catalog that maps the schema from Spark to HBase.
  5. 使用 RDD 或 DataFrame API 与 HBase 数据进行交互。Interact with the HBase data using either the RDD or DataFrame APIs.

在 Apache HBase 中准备示例数据Prepare sample data in Apache HBase

此步骤中,将在 Apache HBase 中创建并填充一个表,然后可使用 Spark 对其进行查询。In this step, you create and populate a table in Apache HBase that you can then query using Spark.

  1. 使用 ssh 命令连接到 HBase 群集。Use the ssh command to connect to your HBase cluster. 编辑以下命令,将 HBASECLUSTER 替换为 HBase 群集的名称,然后输入该命令:Edit the command below by replacing HBASECLUSTER with the name of your HBase cluster, and then enter the command:

    ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.cn
    
  2. 使用 hbase shell 命令启动 HBase 交互式 shell。Use the hbase shell command to start the HBase interactive shell. 在 SSH 连接中输入以下命令。Enter the following command in your SSH connection:

    hbase shell
    
  3. 使用 create 命令创建包含双列系列的 HBase 表。Use the create command to create an HBase table with two-column families. 输入以下命令:Enter the following command:

    create 'Contacts', 'Personal', 'Office'
    
  4. 使用 put 命令将指定列中的值插入特定表中的指定行。Use the put command to insert values at a specified column in a specified row in a particular table. 输入以下命令:Enter the following command:

    put 'Contacts', '1000', 'Personal:Name', 'John Dole'
    put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
    put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
    put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
    put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
    put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
    put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
    
  5. 使用 exit 命令停止 HBase 交互式 shell。Use the exit command to stop the HBase interactive shell. 输入以下命令:Enter the following command:

    exit
    

运行脚本以设置群集之间的连接Run scripts to set up connection between clusters

若要设置群集之间的通信,请执行以下步骤,在群集上运行两个脚本。To set up the communication between clusters, follow the below steps to run two scripts on your clusters. 这些脚本将自动执行下面“手动设置通信”一节中所述的文件复制过程。These scripts will automate the process of file copying described in 'Set up communication manually' section below.

  • 从 HBase 群集运行的脚本会将 hbase-site.xml 和 HBase IP 映射信息上传到 Spark 群集所附加的默认存储。The script you run from the HBase cluster will upload hbase-site.xml and HBase IP-mapping information to the default storage attached to your Spark cluster.
  • 从 Spark 群集运行的脚本设置两个 cron 作业,以定期运行两个帮助器脚本:The script that you run from the Spark cluster sets up two cron jobs to run two helper scripts periodically:
    1. HBase cron 作业 � 将新的 hbase-site.xml 文件和 HBase IP 映射从 Spark 默认存储帐户下载到本地节点HBase cron job � download new hbase-site.xml files and HBase IP mapping from Spark default storage account to local node
    2. Spark cron 作业 � 检查是否发生了 Spark 缩放以及群集是否安全。Spark cron job � checks if a Spark scaling occurred and if cluster is secure. 如果是,则编辑 /etc/hosts 以包含本地存储的 HBase IP 映射If so, edit /etc/hosts to include HBase IP mapping stored locally

注意:在继续之前,请确保已将 Spark 群集的存储帐户作为辅助存储帐户添加到了 HBase 群集。NOTE: Before proceeding, make sure you have added the Spark cluster�s storage account to your HBase cluster as secondary storage account. 请确保按以下所示顺序运行脚本。Make sure you the scripts in order as indicated below.

  1. 在 HBase 群集上使用脚本操作以应用更改(考虑以下因素):Use Script Action on your HBase cluster to apply the changes with the following considerations:

    propertiesProperty ValueValue
    Bash 脚本 URIBash script URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-hbase.sh
    节点类型Node type(s) 区域Region
    parametersParameters -s SECONDARYS_STORAGE_URL
    持久化Persisted yes
    • SECONDARYS_STORAGE_URL 是 Spark 端默认存储的 URL。SECONDARYS_STORAGE_URL is the url of the Spark side default storage. 参数示例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.netParameter Example: -s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net
  2. 在 Spark 群集上使用脚本操作以应用更改(考虑以下因素):Use Script Action on your Spark cluster to apply the changes with the following considerations:

    propertiesProperty ValueValue
    Bash 脚本 URIBash script URI https://hdiconfigactions.blob.core.windows.net/hbasesparkconnectorscript/connector-spark.sh
    节点类型Node type(s) 头、辅助角色、ZookeeperHead, Worker, Zookeeper
    parametersParameters -s "SPARK-CRON-SCHEDULE"(可选)-h "HBASE-CRON-SCHEDULE"(可选)-s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional)
    持久化Persisted yes
    • 可以指定希望此群集自动检查更新的频率。You can specify how often you want this cluster to automatically check if update. 默认值:-s �*/1 * * * *� -h 0(在此示例中,Spark cron 每分钟运行一次,而 HBase cron 不运行)Default: -s �*/1 * * * *� -h 0 (In this example, the Spark cron runs every minute, while the HBase cron doesn't run)
    • 由于默认情况下未设置 HBase cron,因此在对 HBase 群集执行缩放时需要重新运行此脚本。Since HBase cron is not set up by default, you need to rerun this script when perform scaling to your HBase cluster. 如果 HBase 群集经常缩放,可以选择自动设置 HBase cron 作业。If your HBase cluster scales often, you may choose to set up HBase cron job automatically. 例如:-h "*/30 * * * *" 将脚本配置为每 30 分钟执行一次检查。For example: -h "*/30 * * * *" configures the script to perform checks every 30 minutes. 这样将会定期运行 HBase cron 计划,以自动将公共存储帐户上的新 HBase 信息下载到本地节点。This will run HBase cron schedule periodically to automate downloading of new HBase information on the common storage account to local node.

手动设置通信(可选,如果上述步骤中提供的脚本失败)Set up communication manually (Optional, if provided script in above step fails)

注意: 每当其中一个群集经历缩放活动时,都需要执行这些步骤。NOTE: These steps need to perform every time one of the clusters undergoes a scaling activity.

  1. 将 hbase-site.xml 从本地存储复制到 Spark 群集默认存储所在的根目录。Copy the hbase-site.xml from local storage to the root of your Spark cluster's default storage. 编辑以下命令以反映配置。Edit the command below to reflect your configuration. 然后,在与 HBase 群集建立的 SSH 会话中输入该命令:Then, from your open SSH session to the HBase cluster, enter the command:
语法值Syntax value 新值New value
URI 方案URI scheme 修改此值以反映存储。Modify to reflect your storage. 以下语法适用于启用了安全传输的 Blob 存储。The syntax below is for blob storage with secure transfer enabled.
SPARK_STORAGE_CONTAINER 替换为 Spark 群集使用的默认存储容器名称。Replace with the default storage container name used for the Spark cluster.
SPARK_STORAGE_ACCOUNT 替换为 Spark 群集使用的默认存储帐户名称。Replace with the default storage account name used for the Spark cluster.
hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.chinacloudapi.cn/

然后退出与 HBase 群集的 ssh 连接。Then exit your ssh connection to your HBase cluster.

exit
  1. 使用 SSH 连接到 Spark 集群的头节点。Connect to the head node of your Spark cluster using SSH. 编辑以下命令,将 SPARKCLUSTER 替换为 Spark 群集的名称,然后输入该命令:Edit the command below by replacing SPARKCLUSTER with the name of your Spark cluster, and then enter the command:

    ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.cn
    
  2. 输入以下命令,将 hbase-site.xml 从 Spark 群集的默认存储复制到群集本地存储上的 Spark 2 配置文件夹中:Enter the command below to copy hbase-site.xml from your Spark cluster's default storage to the Spark 2 configuration folder on the cluster's local storage:

    sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
    

运行 Spark Shell,引用 Spark HBase 连接器Run Spark Shell referencing the Spark HBase Connector

完成上述步骤后,你应该能够运行 Spark shell,并引用相应版本的 Spark HBase 连接器。After you complete the preceding step, you should be able to run Spark shell, referencing the appropriate version of Spark HBase Connector. 若要找到适合群集方案的最新 Spark HBase 连接器核心版本,请参阅 SHC 核心 存储库To find the most recent appropriate Spark HBase Connector core version for your cluster scenario, see SHC Core Repository.

例如,下表列出了 HDInsight 团队当前使用的两个版本和相应的命令。As an example, the following table lists two versions and the corresponding commands the HDInsight team currently uses. 如果 HBase 和 Spark 的版本与表中指示的版本相同,则可以为群集使用相同的版本。You can use the same versions for your clusters if the versions of HBase and Spark are same as indicated in the table.

  1. 在与 Spark 群集建立的 SSH 会话中,输入以下命令以启动 Spark shell:In your open SSH session to the Spark cluster, enter the following command to start a Spark shell:

    Spark 版本Spark version HDI HBase 版本HDI HBase version SHC 版本SHC version CommandCommand
    2.12.1 HDI 3.6 (HBase 1.1)HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.111.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
    2.42.4 HDI 4.0 (HBase 2.0)HDI 4.0 (HBase 2.0) 1.1.0.3.1.2.2-11.1.0.3.1.2.2-1 spark-shell --packages com.hortonworks.shc:shc-core:1.1.0.3.1.2.2-1 --repositories http://repo.hortonworks.com/content/groups/public/
  2. 保持此 Spark shell 实例处于打开状态,并继续定义目录和查询Keep this Spark shell instance open and continue to Define a catalog and query. 如果在 SHC Core 存储库中找不到与版本相对应的 jar,请继续阅读。If you don't find the jars that correspond to your versions in the SHC Core repository, continue reading.

可以直接从 spark-hbase-connector GitHub 分支生成 jar。You can build the jars directly from the spark-hbase-connector GitHub branch. 例如,如果运行的是 Spark 2.3 和 HBase 1.1,请完成以下步骤:For example, if you are running with Spark 2.3 and HBase 1.1, complete these steps:

  1. 克隆存储库:Clone the repo:

    git clone https://github.com/hortonworks-spark/shc
    
  2. 转到分支-2.3:Go to branch-2.3:

    git checkout branch-2.3
    
  3. 从分支生成(创建 .jar 文件):Build from the branch (creates a .jar file):

    mvn clean package -DskipTests
    
  4. 运行以下命令(请确保更改与所生成的 .jar 文件相对应的 .jar 名称):Run the following command (be sure to change the .jar name that corresponds to the .jar file you built):

    spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar
    
  5. 保持此 Spark shell 实例处于打开状态,并继续执行下一部分。Keep this Spark shell instance open and continue to the next section.

定义目录和查询Define a Catalog and Query

在此步骤中,定义一个将架构从 Apache Spark 映射到 Apache HBase 的目录对象。In this step, you define a catalog object that maps the schema from Apache Spark to Apache HBase.

  1. 在打开的 Spark Shell 中,输入以下 import 语句:In your open Spark Shell, enter the following import statements:

    import org.apache.spark.sql.{SQLContext, _}
    import org.apache.spark.sql.execution.datasources.hbase._
    import org.apache.spark.{SparkConf, SparkContext}
    import spark.sqlContext.implicits._
    
  2. 输入以下命令,以定义在 HBase 中创建的 Contacts 表的目录:Enter the command below to define a catalog for the Contacts table you created in HBase:

    def catalog = s"""{
        |"table":{"namespace":"default", "name":"Contacts"},
        |"rowkey":"key",
        |"columns":{
        |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
        |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
        |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
        |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
        |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
        |}
    |}""".stripMargin
    

    代码:The code:

    1. 定义名为 Contacts 的 HBase 表的目录架构。Defines a catalog schema for the HBase table named Contacts.
    2. 将 rowkey 标识为 key,并将 Spark 中使用的列名映射到 HBase 中使用的列族、列名和列类型。Identifies the rowkey as key, and map the column names used in Spark to the column family, column name, and column type as used in HBase.
    3. 将 Rowkey 定义为具有 rowkey 的特定列族 cf 的命名列 (rowkey)。Defines the rowkey in detail as a named column (rowkey), which has a specific column family cf of rowkey.
  3. 输入以下命令,以定义一个在 HBase 中提供围绕 Contacts 表的 DataFrame 的方法:Enter the command below to define a method that provides a DataFrame around your Contacts table in HBase:

    def withCatalog(cat: String): DataFrame = {
        spark.sqlContext
        .read
        .options(Map(HBaseTableCatalog.tableCatalog->cat))
        .format("org.apache.spark.sql.execution.datasources.hbase")
        .load()
     }
    
  4. 创建 DataFrame 的实例:Create an instance of the DataFrame:

    val df = withCatalog(catalog)
    
  5. 查询 DataFrame:Query the DataFrame:

    df.show()
    
  6. 应看到如下两行数据:You should see two rows of data:

    +------+--------------------+--------------+-------------+--------------+
    |rowkey|       officeAddress|   officePhone| personalName| personalPhone|
    +------+--------------------+--------------+-------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
    |  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+-------------+--------------+
    
  7. 注册一个临时表,以便使用 Spark SQL 查询 HBase 表:Register a temporary table so you can query the HBase table using Spark SQL:

    df.createTempView("contacts")
    
  8. 针对 contacts 表发出 SQL 查询:Issue a SQL query against the contacts table:

    spark.sqlContext.sql("select personalName, officeAddress from contacts").show
    
  9. 应看到如下结果:You should see results like these:

    +-------------+--------------------+
    | personalName|       officeAddress|
    +-------------+--------------------+
    |    John Dole|1111 San Gabriel Dr.|
    |  Calvin Raji|5415 San Gabriel Dr.|
    +-------------+--------------------+
    

插入新数据Insert new data

  1. 若要插入新的 Contact 记录,请定义 ContactRecord 类:To insert a new Contact record, define a ContactRecord class:

    case class ContactRecord(
        rowkey: String,
        officeAddress: String,
        officePhone: String,
        personalName: String,
        personalPhone: String
        )
    
  2. 创建 ContactRecord 的实例并将其放在一个数组中:Create an instance of ContactRecord and put it in an array:

    val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")
    
    var newData = new Array[ContactRecord](1)
    newData(0) = newContact
    
  3. 将新数据数组保存到 HBase:Save the array of new data to HBase:

    sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
    
  4. 检查结果:Examine the results:

    df.show()
    
  5. 应看到如下输出:You should see output like this:

    +------+--------------------+--------------+------------+--------------+
    |rowkey|       officeAddress|   officePhone|personalName| personalPhone|
    +------+--------------------+--------------+------------+--------------+
    |  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
    | 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
    |  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
    +------+--------------------+--------------+------------+--------------+
    
  6. 通过输入以下命令关闭 spark shell:Close the spark shell by entering the following command:

    :q
    

后续步骤Next steps