Azure HDInsight 中的 Hive Warehouse Connector 支持的 Apache Spark 操作Apache Spark operations supported by Hive Warehouse Connector in Azure HDInsight

本文介绍 Hive Warehouse Connector (HWC) 支持的基于 Spark 的操作。This article shows spark-based operations supported by Hive Warehouse Connector (HWC). 下面显示的所有示例都将通过 Apache Spark shell 执行。All examples shown below will be executed through the Apache Spark shell.

先决条件Prerequisite

完成 Hive Warehouse Connector 设置步骤。Complete the Hive Warehouse Connector setup steps.

入门Getting started

若要启动 spark-shell 会话,请执行以下步骤:To start a spark-shell session, do the following steps:

  1. 使用 ssh 命令连接到 Apache Spark 群集。Use ssh command to connect to your Apache Spark cluster. 编辑以下命令,将 CLUSTERNAME 替换为群集的名称,然后输入该命令:Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 在 ssh 会话中,执行以下命令以记下 hive-warehouse-connector-assembly 版本:From your ssh session, execute the following command to note the hive-warehouse-connector-assembly version:

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. 使用上面标识的 hive-warehouse-connector-assembly 版本编辑下面的代码。Edit the code below with the hive-warehouse-connector-assembly version identified above. 然后执行命令启动 spark shell:Then execute the command to start the spark shell:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. 启动 spark shell 后,可以使用以下命令启动 Hive Warehouse Connector 实例:After starting the spark-shell, a Hive Warehouse Connector instance can be started using the following commands:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

使用 Hive 查询创建 Spark 数据帧Creating Spark DataFrames using Hive queries

使用 HWC 库的所有查询的结果都将作为数据帧返回。The results of all queries using the HWC library are returned as a DataFrame. 以下示例演示如何创建基本的 hive 查询。The following examples demonstrate how to create a basic hive query.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

查询的结果是 Spark 数据帧,可以与 MLIB 和 SparkSQL 等 Spark 库一起使用。The results of the query are Spark DataFrames, which can be used with Spark libraries like MLIB and SparkSQL.

将 Spark 数据帧写入 Hive 表Writing out Spark DataFrames to Hive tables

Spark 本身不支持写入 Hive 的托管 ACID 表。Spark doesn't natively support writing to Hive's managed ACID tables. 但是,使用 HWC 可以将任何数据帧写入 Hive 表。However,using HWC, you can write out any DataFrame to a Hive table. 可以在以下示例中看到此功能的工作方式:You can see this functionality at work in the following example:

  1. 创建名为 sampletable_colorado 的表,并使用以下命令指定其列:Create a table called sampletable_colorado and specify its columns using the following command:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. 筛选列 state 等于 Colorado 的表 hivesampletableFilter the table hivesampletable where the column state equals Colorado. 此 hive 查询使用 write 函数返回存储在 Hive 表 sampletable_colorado 中的 Spark 数据帧 ans sis。This hive query returns a Spark DataFrame ans sis saved in the Hive table sampletable_colorado using the write function.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. 通过以下命令查看结果:View the results with the following command:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector 显示 hive 表

结构化流写入Structured streaming writes

使用 Hive Warehouse Connector,可以使用 Spark 流将数据写入 Hive 表。Using Hive Warehouse Connector, you can use Spark streaming to write data into Hive tables.

重要

启用 的 Spark 4.0 群集不支持结构化流写入。Structured streaming writes are not supported in enabled Spark 4.0 clusters.

按照以下步骤将数据从 localhost 端口 9999 上的 Spark 流引入到 Hive 表中。Follow the steps below to ingest data from a Spark stream on localhost port 9999 into a Hive table via. Hive Warehouse Connector。Hive Warehouse Connector.

  1. 从打开的 Spark shell 中,使用以下命令启动 spark 流:From your open Spark shell, begin a spark stream with the following command:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. 通过执行以下步骤,为创建的 Spark 流生成数据:Generate data for the Spark stream that you created, by doing the following steps:

    1. 在同一个 Spark 群集上打开第二个 SSH 会话。Open a second SSH session on the same Spark cluster.
    2. 在命令提示符下键入 nc -lk 9999At the command prompt, type nc -lk 9999. 此命令使用 netcat 实用程序将数据从命令行发送到指定端口。This command uses the netcat utility to send data from the command line to the specified port.
  3. 返回到第一个 SSH 会话并创建一个新 Hive 表来保存流数据。Return to the first SSH session and create a new Hive table to hold the streaming data. 在 spark shell 中,输入以下命令:At the spark-shell, enter the following command:

    hive.createTable("stream_table").column("value","string").create()
    
  4. 然后使用以下命令将流数据写入新创建的表:Then write the streaming data to the newly created table using the following command:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    重要

    由于 Apache Spark 中的已知问题,目前必须手动设置 metastoreUridatabase 选项。The metastoreUri and database options must currently be set manually due to a known issue in Apache Spark. 有关此问题的详细信息,请参阅 SPARK-25460For more information about this issue, see SPARK-25460.

  5. 返回到第二个 SSH 会话,并输入以下值:Return to the second SSH session and enter the following values:

    foo
    HiveSpark
    bar
    
  6. 返回到第一个 SSH 会话并记下简短的活动。Return to the first SSH session and note the brief activity. 使用以下命令查看数据:Use the following command to view the data:

    hive.table("stream_table").show()
    

使用“Ctrl + C”在第二个 SSH 会话上停止 netcat。Use Ctrl + C to stop netcat on the second SSH session. 使用 :q 在第一个 SSH 会话中退出 spark shell。Use :q to exit spark-shell on the first SSH session.

后续步骤Next steps