Apache Phoenix 是一个以 Apache HBase 为基础的开源大规模并行关系数据库层。 Phoenix 允许通过 SQLLine 之类的 SSH 工具将类 SQL 查询与 HBase 配合使用。 Phoenix 也提供名为“Phoenix 查询服务器 (PQS)”的 HTTP 服务器,这是一个瘦客户端,支持两种用于客户端通信的传输机制:JSON 和协议缓冲区。 协议缓冲区是默认的机制,提供的通信比 JSON 更高效。
本文介绍如何使用 PQS REST SDK 创建表、逐个或成批 upsert 行,以及使用 SQL 语句选择数据。 示例使用适用于 Apache Phoenix 查询服务器的 Microsoft .NET 驱动程序。 此 SDK 在 Apache Calcite 的 Avatica API 基础上构建,该 API 将协议缓冲区专用于序列化格式。
有关详细信息,请参阅 Apache Calcite Avatica Protocol Buffers Reference(Apache Calcite Avatica 协议缓冲区参考)。
适用于 Apache Phoenix 查询服务器的 Azure .NET 驱动程序以 NuGet 包的形式提供,可以使用以下命令通过 Visual Studio NuGet 包管理器控制台进行安装:
Install-Package Microsoft.Phoenix.Client
若要开始使用库,请实例化新的 PhoenixClient
对象,将包含 Uri
的 ClusterCredentials
传入到群集,并传入群集的 Apache Hadoop 用户名和密码。
var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.cn/"), "USERNAME", "PASSWORD");
client = new PhoenixClient(credentials);
将 CLUSTERNAME 替换为 HDInsight HBase 群集名称,将 USERNAME 和 PASSWORD 替换为在创建群集时指定的 Hadoop 凭据。 默认的 Hadoop 用户名为 admin。
若要将一个或多个请求发送到 PQS,需包括一个将请求与连接相关联的唯一连接标识符。
string connId = Guid.NewGuid().ToString();
每个示例都首先调用 OpenConnectionRequestAsync
方法,传入唯一连接字符串。 接下来定义 ConnectionProperties
和 RequestOptions
,将这些对象和生成的连接标识符传递至 ConnectionSyncRequestAsync
方法。 PQS 的 ConnectionSyncRequest
对象有助于确保客户端和服务器的数据库属性视图的一致性。
若要调用 ConnectionSyncRequestAsync
,请传入 ConnectionProperties
对象。
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
下面是一些相关属性:
属性 | 说明 |
---|---|
AutoCommit | 一个布尔值,表示是否为 Phoenix 事务启用 autoCommit 。 |
ReadOnly | 一个布尔值,表示连接是否为只读。 |
TransactionIsolation | 一个整数,表示按 JDBC 规范确定的事务隔离级别 - 见下表。 |
目录 | 提取连接属性时需要使用的目录的名称。 |
架构 | 提取连接属性时需要使用的架构的名称。 |
IsDirty | 一个布尔值,指示属性是否已更改。 |
下面是 TransactionIsolation
值:
隔离值 | 说明 |
---|---|
0 | 不支持事务。 |
1 | 可能出现脏读、不可重复读和幻读。 |
2 | 可以防止脏读,但会出现不可重复读和幻读。 |
4 | 可以防止脏读和不可重复读,但会出现幻读。 |
8 | 脏读、不可重复读和幻读都可以防止。 |
HBase 与任何其他 RDBMS 一样,在表中存储数据。 Phoenix 使用标准的 SQL 查询来创建新表,同时定义主键和列类型。
此示例和所有后续示例都按照实例化新的 PhoenixClient 对象中的定义使用实例化的 PhoenixClient
对象。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// You can set certain request options, such as timeout in milliseconds:
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
CreateStatementResponse createStatementResponse = null;
OpenConnectionResponse openConnResponse = null;
try
{
// Opening connection
var info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Create the statement
createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result;
// Create the table if it does not exist
string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " +
"LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))";
await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
Console.WriteLine($"Table \"Customers\" created.");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
if (createStatementResponse != null)
{
client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait();
createStatementResponse = null;
}
if (openConnResponse != null)
{
client.CloseConnectionRequestAsync(connId, options).Wait();
openConnResponse = null;
}
}
前一示例使用 IF NOT EXISTS
选项创建名为 Customers
的新表。 CreateStatementRequestAsync
调用在 Avitica (PQS) 服务器中创建新的语句。 finally
块关闭返回的 CreateStatementResponse
和 OpenConnectionResponse
对象。
以下示例显示了一个单独的数据插入,引用的 List<string>
集合包含美国的州和领地缩写:
var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };
表的 StateProvince
列值会在后续的选择操作中使用。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options);
statementHandle = prepareResponse.Statement;
var r = new Random();
// Insert 300 rows
for (int i = 0; i < 300; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0,1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
Console.WriteLine("Inserting customer " + i);
await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options);
}
await client.CommitRequestAsync(connId, options);
Console.WriteLine("Upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
执行插入语句的结构类似于创建新表。 在 try
块的末尾,事务是显式提交的。 此示例重复插入事务 300 次。 以下示例演示更有效的批插入过程。
以下代码几乎与逐个插入数据的代码相同。 此示例在调用 ExecuteBatchRequestAsync
的过程中使用 UpdateBatch
对象,而不是使用准备好的语句重复调用 ExecuteRequestAsync
。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
CreateStatementResponse createStatementResponse = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Creating statement
createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result;
var statementHandle = prepareResponse.Statement;
var updates = new pbc.RepeatedField<UpdateBatch>();
var r = new Random();
// Insert 300 rows
for (int i = 300; i < 600; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0, 1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
var batch = new UpdateBatch
{
ParameterValues = list
};
updates.Add(batch);
Console.WriteLine($"Added customer {i} to batch");
}
var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options);
Console.WriteLine("Batch upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
在一个测试环境中,逐个插入 300 个新记录花了差不多 2 分钟。 与之相反,一批插入 300 个记录只需 6 秒。
以下示例显示如何重复使用一个连接来执行多个查询:
- 选择所有记录,然后在返回默认的最大值 100 以后提取剩余的记录。
- 使用总行计数 select 语句检索单一标量结果。
- 执行一个 select 语句,返回单个州或领地的客户总数。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
var createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "SELECT * FROM Customers";
ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows;
// Loop through all of the returned rows and display the first two columns
for (int i = 0; i < rows.Count; i++)
{
Row row = rows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue);
}
// 100 is hard-coded on the server side as the default firstframe size
// FetchRequestAsync is called to get any remaining rows
Console.WriteLine("");
Console.WriteLine($"Number of rows: {rows.Count}");
// Fetch remaining rows, offset is not used, simply set to 0
// When FetchResponse.Frame.Done is true, all rows were fetched
FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options);
Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}");
Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}");
Console.WriteLine("");
// Running query 2
string sql2 = "select count(*) from Customers";
ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue;
Console.WriteLine($"Total customer records: {count}");
Console.WriteLine("");
// Running query 3
string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc";
ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows;
for (int i = 0; i < stateRows.Count; i++)
{
Row row = stateRows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue);
}
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
select
语句的输出应该是以下结果:
id0 first0
id1 first1
id10 first10
id100 first100
id101 first101
id102 first102
. . .
id185 first185
id186 first186
id187 first187
id188 first188
Number of rows: 100
Frame row count: 500
Fetch response is done: True
Total customer records: 600
NJ: 21
CA: 19
GU: 17
NC: 16
IN: 16
MA: 16
AZ: 16
ME: 16
IL: 15
OR: 15
. . .
MO: 10
HI: 10
GA: 10
DC: 9
NM: 9
MD: 9
MP: 9
SC: 7
AR: 7
MH: 6
FM: 5