Apache Phoenix 查询服务器 REST SDKApache Phoenix Query Server REST SDK

Apache Phoenix 是一个以 Apache HBase 为基础的开源大规模并行关系数据库层。Apache Phoenix is an open source, massively parallel relational database layer on top of Apache HBase. Phoenix 允许通过 SQLLine 之类的 SSH 工具将类 SQL 查询与 HBase 配合使用。Phoenix allows you to use SQL-like queries with HBase through SSH tools such as SQLLine. Phoenix 也提供名为“Phoenix 查询服务器 (PQS)”的 HTTP 服务器,这是一个瘦客户端,支持两种用于客户端通信的传输机制:JSON 和协议缓冲区。Phoenix also provides an HTTP server called Phoenix Query Server (PQS), a thin client that supports two transport mechanisms for client communication: JSON and Protocol Buffers. 协议缓冲区是默认的机制,提供的通信比 JSON 更高效。Protocol Buffers is the default mechanism, and offers more efficient communication than JSON.

本文介绍如何使用 PQS REST SDK 创建表、逐个或成批 upsert 行,以及使用 SQL 语句选择数据。This article describes how to use the PQS REST SDK to create tables, upsert rows individually and in bulk, and select data using SQL statements. 示例使用适用于 Apache Phoenix 查询服务器的 Microsoft .NET 驱动程序The examples use the Microsoft .NET driver for Apache Phoenix Query Server. 此 SDK 在 Apache Calcite 的 Avatica API 基础上构建,该 API 将协议缓冲区专用于序列化格式。This SDK is built on Apache Calcite's Avatica APIs, which exclusively use Protocol Buffers for the serialization format.

有关详细信息,请参阅 Apache Calcite Avatica Protocol Buffers Reference(Apache Calcite Avatica 协议缓冲区参考)。For more information, see Apache Calcite Avatica Protocol Buffers Reference.

安装 SDKInstall the SDK

适用于 Apache Phoenix 查询服务器的 Microsoft .NET 驱动程序以 NuGet 包的形式提供,可以使用以下命令通过 Visual Studio NuGet 包管理器控制台进行安装:Microsoft .NET driver for Apache Phoenix Query Server is provided as a NuGet package, which can be installed from the Visual Studio NuGet Package Manager Console with the following command:

Install-Package Microsoft.Phoenix.Client

实例化新的 PhoenixClient 对象Instantiate new PhoenixClient object

若要开始使用库,请实例化新的 PhoenixClient 对象,将包含 UriClusterCredentials 传入到群集,并传入群集的 Apache Hadoop 用户名和密码。To begin using the library, instantiate a new PhoenixClient object, passing in ClusterCredentials containing the Uri to your cluster and the cluster's Apache Hadoop user name and password.

var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.cn/"), "USERNAME", "PASSWORD");
client = new PhoenixClient(credentials);

将 CLUSTERNAME 替换为 HDInsight HBase 群集名称,将 USERNAME 和 PASSWORD 替换为在创建群集时指定的 Hadoop 凭据。Replace CLUSTERNAME with your HDInsight HBase cluster name, and USERNAME and PASSWORD with the Hadoop credentials specified on cluster creation. 默认的 Hadoop 用户名为 adminThe default Hadoop user name is admin.

生成唯一的连接标识符Generate unique connection identifier

若要将一个或多个请求发送到 PQS,需包括一个将请求与连接相关联的唯一连接标识符。To send one or more requests to PQS, you need to include a unique connection identifier to associate the request(s) with the connection.

string connId = Guid.NewGuid().ToString();

每个示例都首先调用 OpenConnectionRequestAsync 方法,传入唯一连接字符串。Each example first makes a call to the OpenConnectionRequestAsync method, passing in the unique connection identifier. 接下来定义 ConnectionPropertiesRequestOptions,将这些对象和生成的连接标识符传递至 ConnectionSyncRequestAsync 方法。Next, define ConnectionProperties and RequestOptions, passing those objects and the generated connection identifier to the ConnectionSyncRequestAsync method. PQS 的 ConnectionSyncRequest 对象有助于确保客户端和服务器的数据库属性视图的一致性。PQS's ConnectionSyncRequest object helps ensure that both the client and server have a consistent view of the database properties.

ConnectionSyncRequest 及其 ConnectionPropertiesConnectionSyncRequest and its ConnectionProperties

若要调用 ConnectionSyncRequestAsync,请传入 ConnectionProperties 对象。To call ConnectionSyncRequestAsync, pass in a ConnectionProperties object.

ConnectionProperties connProperties = new ConnectionProperties
{
    HasAutoCommit = true,
    AutoCommit = true,
    HasReadOnly = true,
    ReadOnly = false,
    TransactionIsolation = 0,
    Catalog = "",
    Schema = "",
    IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);

下面是一些相关属性:Here are some properties of interest:

propertiesProperty 说明Description
AutoCommitAutoCommit 一个布尔值,表示是否为 Phoenix 事务启用 autoCommitA boolean denoting whether autoCommit is enabled for Phoenix transactions.
ReadOnlyReadOnly 一个布尔值,表示连接是否为只读。A boolean denoting whether the connection is read-only.
TransactionIsolationTransactionIsolation 一个整数,表示按 JDBC 规范确定的事务隔离级别 - 见下表。An integer denoting the level of transaction isolation per the JDBC specification - see the following table.
目录Catalog 提取连接属性时需要使用的目录的名称。The name of the catalog to use when fetching connection properties.
架构Schema 提取连接属性时需要使用的架构的名称。The name of the schema to use when fetching connection properties.
IsDirtyIsDirty 一个布尔值,指示属性是否已更改。A boolean denoting whether the properties have been altered.

下面是 TransactionIsolation 值:Here are the TransactionIsolation values:

隔离值Isolation value 说明Description
00 事务不受支持。Transactions are not supported.
11 可能出现脏读、不可重复读和幻读。Dirty reads, non-repeatable reads, and phantom reads may occur.
22 可以防止脏读,但会出现不可重复读和幻读。Dirty reads are prevented, but non-repeatable reads and phantom reads may occur.
44 可以防止脏读和不可重复读,但会出现幻读。Dirty reads and non-repeatable reads are prevented, but phantom reads may occur.
88 脏读、不可重复读和幻读都可以防止。Dirty reads, non-repeatable reads, and phantom reads are all prevented.

创建新表Create a new table

HBase 与任何其他 RDBMS 一样,在表中存储数据。HBase, like any other RDBMS, stores data in tables. Phoenix 使用标准的 SQL 查询来创建新表,同时定义主键和列类型。Phoenix uses standard SQL queries to create new tables, while defining the primary key and column types.

此示例和所有后续示例都按照实例化新的 PhoenixClient 对象中的定义使用实例化的 PhoenixClient 对象。This example and all later examples, use the instantiated PhoenixClient object as defined in Instantiate a new PhoenixClient object.

string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();

// You can set certain request options, such as timeout in milliseconds:
options.TimeoutMillis = 300000;

// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
CreateStatementResponse createStatementResponse = null;
OpenConnectionResponse openConnResponse = null;

try
{
    // Opening connection
    var info = new pbc::MapField<string, string>();
    openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
    
    // Syncing connection
    ConnectionProperties connProperties = new ConnectionProperties
    {
        HasAutoCommit = true,
        AutoCommit = true,
        HasReadOnly = true,
        ReadOnly = false,
        TransactionIsolation = 0,
        Catalog = "",
        Schema = "",
        IsDirty = true
    };
    await client.ConnectionSyncRequestAsync(connId, connProperties, options);

    // Create the statement
    createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result;
    
    // Create the table if it does not exist
    string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " +
        "LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))";
    await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);

    Console.WriteLine($"Table \"Customers\" created.");
}
catch (Exception e)
{
    Console.WriteLine(e);
    throw;
}
finally
{
    if (createStatementResponse != null)
    {
        client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait();
        createStatementResponse = null;
    }

    if (openConnResponse != null)
    {
        client.CloseConnectionRequestAsync(connId, options).Wait();
        openConnResponse = null;
    }
}

前一示例使用 IF NOT EXISTS 选项创建名为 Customers 的新表。The preceding example creates a new table named Customers using the IF NOT EXISTS option. CreateStatementRequestAsync 调用在 Avitica (PQS) 服务器中创建新的语句。The CreateStatementRequestAsync call creates a new statement in the Avitica (PQS) server. finally 块关闭返回的 CreateStatementResponseOpenConnectionResponse 对象。The finally block closes the returned CreateStatementResponse and the OpenConnectionResponse objects.

逐个插入数据Insert data individually

以下示例显示了一个单独的数据插入,引用的 List<string> 集合包含美国的州和领地缩写:This example shows an individual data insert, referencing a List<string> collection of American state and territory abbreviations:

var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };

表的 StateProvince 列值会在后续的选择操作中使用。The table's StateProvince column value will be used in a later select operation.

string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;

// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
    // Opening connection
    pbc::MapField<string, string> info = new pbc::MapField<string, string>();
    openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
    // Syncing connection
    ConnectionProperties connProperties = new ConnectionProperties
    {
        HasAutoCommit = true,
        AutoCommit = true,
        HasReadOnly = true,
        ReadOnly = false,
        TransactionIsolation = 0,
        Catalog = "",
        Schema = "",
        IsDirty = true
    };
    await client.ConnectionSyncRequestAsync(connId, connProperties, options);

    string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
    PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options);
    statementHandle = prepareResponse.Statement;
    
    var r = new Random();

    // Insert 300 rows
    for (int i = 0; i < 300; i++)
    {
        var list = new pbc.RepeatedField<TypedValue>();
        var id = new TypedValue
        {
            StringValue = "id" + i,
            Type = Rep.String
        };
        var firstName = new TypedValue
        {
            StringValue = "first" + i,
            Type = Rep.String
        };
        var lastName = new TypedValue
        {
            StringValue = "last" + i,
            Type = Rep.String
        };
        var state = new TypedValue
        {
            StringValue = states.ElementAt(r.Next(0, 49)),
            Type = Rep.String
        };
        var email = new TypedValue
        {
            StringValue = $"email{1}@junkemail.com",
            Type = Rep.String
        };
        var phone = new TypedValue
        {
            StringValue = $"555-229-341{i.ToString().Substring(0,1)}",
            Type = Rep.String
        };
        list.Add(id);
        list.Add(firstName);
        list.Add(lastName);
        list.Add(state);
        list.Add(email);
        list.Add(phone);

        Console.WriteLine("Inserting customer " + i);

        await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options);
    }

    await client.CommitRequestAsync(connId, options);

    Console.WriteLine("Upserted customer data");

}
catch (Exception ex)
{

}
finally
{
    if (statementHandle != null)
    {
        await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
        statementHandle = null;
    }
    if (openConnResponse != null)
    {
        await client.CloseConnectionRequestAsync(connId, options);
        openConnResponse = null;
    }
}

执行插入语句的结构类似于创建新表。The structure for executing an insert statement is similar to creating a new table. try 块末尾,事务将显式提交。At the end of the try block, the transaction is explicitly committed. 此示例重复插入事务 300 次。This example repeats an insert transaction 300 times. 以下示例演示更有效的批插入过程。The following example shows a more efficient batch insert process.

批插入数据Batch insert data

以下代码几乎与逐个插入数据的代码相同。The following code is nearly identical to the code for inserting data individually. 此示例在调用 ExecuteBatchRequestAsync 的过程中使用 UpdateBatch 对象,而不是使用准备好的语句重复调用 ExecuteRequestAsyncThis example uses the UpdateBatch object in a call to ExecuteBatchRequestAsync, rather than repeatedly calling ExecuteRequestAsync with a prepared statement.

string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;

// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
CreateStatementResponse createStatementResponse = null;
try
{
    // Opening connection
    pbc::MapField<string, string> info = new pbc::MapField<string, string>();
    openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
    // Syncing connection
    ConnectionProperties connProperties = new ConnectionProperties
    {
        HasAutoCommit = true,
        AutoCommit = true,
        HasReadOnly = true,
        ReadOnly = false,
        TransactionIsolation = 0,
        Catalog = "",
        Schema = "",
        IsDirty = true
    };
    await client.ConnectionSyncRequestAsync(connId, connProperties, options);

    // Creating statement
    createStatementResponse = await client.CreateStatementRequestAsync(connId, options);

    string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
    PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result;
    var statementHandle = prepareResponse.Statement;
    var updates = new pbc.RepeatedField<UpdateBatch>();

    var r = new Random();

    // Insert 300 rows
    for (int i = 300; i < 600; i++)
    {
        var list = new pbc.RepeatedField<TypedValue>();
        var id = new TypedValue
        {
            StringValue = "id" + i,
            Type = Rep.String
        };
        var firstName = new TypedValue
        {
            StringValue = "first" + i,
            Type = Rep.String
        };
        var lastName = new TypedValue
        {
            StringValue = "last" + i,
            Type = Rep.String
        };
        var state = new TypedValue
        {
            StringValue = states.ElementAt(r.Next(0, 49)),
            Type = Rep.String
        };
        var email = new TypedValue
        {
            StringValue = $"email{1}@junkemail.com",
            Type = Rep.String
        };
        var phone = new TypedValue
        {
            StringValue = $"555-229-341{i.ToString().Substring(0, 1)}",
            Type = Rep.String
        };
        list.Add(id);
        list.Add(firstName);
        list.Add(lastName);
        list.Add(state);
        list.Add(email);
        list.Add(phone);

        var batch = new UpdateBatch
        {
            ParameterValues = list
        };
        updates.Add(batch);

        Console.WriteLine($"Added customer {i} to batch");
    }

    var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options);

    Console.WriteLine("Batch upserted customer data");

}
catch (Exception ex)
{

}
finally
{
    if (openConnResponse != null)
    {
        await client.CloseConnectionRequestAsync(connId, options);
        openConnResponse = null;
    }
}

在一个测试环境中,逐个插入 300 个新记录花了差不多 2 分钟。In one test environment, individually inserting 300 new records took almost 2 minutes. 与之相反,一批插入 300 个记录只需 6 秒。In contrast, inserting 300 records as a batch required only 6 seconds.

选择数据Select data

以下示例显示如何重复使用一个连接来执行多个查询:This example shows how to reuse one connection to execute multiple queries:

  1. 选择所有记录,然后在返回默认的最大值 100 以后提取剩余的记录。Select all records, and then fetch remaining records after the default maximum of 100 are returned.
  2. 使用总行计数 select 语句检索单一标量结果。Use a total row count select statement to retrieve the single scalar result.
  3. 执行一个 select 语句,返回单个州或领地的客户总数。Execute a select statement that returns the total number of customers per state or territory.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();

// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;

try
{
    // Opening connection
    pbc::MapField<string, string> info = new pbc::MapField<string, string>();
    openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
    // Syncing connection
    ConnectionProperties connProperties = new ConnectionProperties
    {
        HasAutoCommit = true,
        AutoCommit = true,
        HasReadOnly = true,
        ReadOnly = false,
        TransactionIsolation = 0,
        Catalog = "",
        Schema = "",
        IsDirty = true
    };
    await client.ConnectionSyncRequestAsync(connId, connProperties, options);
    var createStatementResponse = await client.CreateStatementRequestAsync(connId, options);

    string sql = "SELECT * FROM Customers";
    ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);

    pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows;
    // Loop through all of the returned rows and display the first two columns
    for (int i = 0; i < rows.Count; i++)
    {
        Row row = rows[i];
        Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue);
    }
    
    // 100 is hard-coded on the server side as the default firstframe size
    // FetchRequestAsync is called to get any remaining rows
    Console.WriteLine("");
    Console.WriteLine($"Number of rows: {rows.Count}");

    // Fetch remaining rows, offset is not used, simply set to 0
    // When FetchResponse.Frame.Done is true, all rows were fetched
    FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options);
    Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}");
    Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}");
    Console.WriteLine("");

    // Running query 2
    string sql2 = "select count(*) from Customers";
    ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
    long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue;

    Console.WriteLine($"Total customer records: {count}");
    Console.WriteLine("");

    // Running query 3
    string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc";
    ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);

    pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows;
    for (int i = 0; i < stateRows.Count; i++)
    {
        Row row = stateRows[i];
        Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue);
    }
}
catch (Exception ex)
{

}
finally
{
    if (statementHandle != null)
    {
        await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
        statementHandle = null;
    }
    if (openConnResponse != null)
    {
        await client.CloseConnectionRequestAsync(connId, options);
        openConnResponse = null;
    }
}

select 语句的输出应该是以下结果:The output of the select statements should be the following result:

id0 first0
id1 first1
id10 first10
id100 first100
id101 first101
id102 first102
. . .
id185 first185
id186 first186
id187 first187
id188 first188

Number of rows: 100
Frame row count: 500
Fetch response is done: True

Total customer records: 600

NJ: 21
CA: 19
GU: 17
NC: 16
IN: 16
MA: 16
AZ: 16
ME: 16
IL: 15
OR: 15
. . . 
MO: 10
HI: 10
GA: 10
DC: 9
NM: 9
MD: 9
MP: 9
SC: 7
AR: 7
MH: 6
FM: 5

后续步骤Next steps