Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Apache Phoenix 是一个以 Apache HBase 为基础的开源大规模并行关系数据库层。 Phoenix 允许通过 SQLLine 之类的 SSH 工具将类 SQL 查询与 HBase 配合使用。 Phoenix 也提供名为“Phoenix 查询服务器 (PQS)”的 HTTP 服务器,这是一个瘦客户端,支持两种用于客户端通信的传输机制:JSON 和协议缓冲区。 协议缓冲区是默认的机制,提供的通信比 JSON 更高效。
本文介绍如何使用 PQS REST SDK 创建表、逐个或成批 upsert 行,以及使用 SQL 语句选择数据。 示例使用适用于 Apache Phoenix 查询服务器的 Microsoft .NET 驱动程序。 此 SDK 在 Apache Calcite 的 Avatica API 基础上构建,该 API 将协议缓冲区专用于序列化格式。
有关详细信息,请参阅 Apache Calcite Avatica Protocol Buffers Reference(Apache Calcite Avatica 协议缓冲区参考)。
适用于 Apache Phoenix 查询服务器的 Azure .NET 驱动程序以 NuGet 包的形式提供,可以使用以下命令通过 Visual Studio NuGet 包管理器控制台进行安装:
Install-Package Microsoft.Phoenix.Client
若要开始使用库,请实例化新的 PhoenixClient 对象,将包含 Uri 的 ClusterCredentials 传入到群集,并传入群集的 Apache Hadoop 用户名和密码。
var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.cn/"), "USERNAME", "PASSWORD");
client = new PhoenixClient(credentials);
将 CLUSTERNAME 替换为 HDInsight HBase 群集名称,将 USERNAME 和 PASSWORD 替换为在创建群集时指定的 Hadoop 凭据。 默认的 Hadoop 用户名为 admin。
若要将一个或多个请求发送到 PQS,需包括一个将请求与连接相关联的唯一连接标识符。
string connId = Guid.NewGuid().ToString();
每个示例都首先调用 OpenConnectionRequestAsync 方法,传入唯一连接字符串。 接下来定义 ConnectionProperties 和 RequestOptions,将这些对象和生成的连接标识符传递至 ConnectionSyncRequestAsync 方法。 PQS 的 ConnectionSyncRequest 对象有助于确保客户端和服务器的数据库属性视图的一致性。
若要调用 ConnectionSyncRequestAsync,请传入 ConnectionProperties 对象。
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
下面是一些相关属性:
| 属性 | 说明 |
|---|---|
| AutoCommit | 一个布尔值,表示是否为 Phoenix 事务启用 autoCommit。 |
| ReadOnly | 一个布尔值,表示连接是否为只读。 |
| TransactionIsolation | 一个整数,表示按 JDBC 规范确定的事务隔离级别 - 见下表。 |
| 目录 | 提取连接属性时需要使用的目录的名称。 |
| 架构 | 提取连接属性时需要使用的架构的名称。 |
| IsDirty | 一个布尔值,指示属性是否已更改。 |
下面是 TransactionIsolation 值:
| 隔离值 | 说明 |
|---|---|
| 0 | 不支持事务。 |
| 1 | 可能出现脏读、不可重复读和幻读。 |
| 2 | 可以防止脏读,但会出现不可重复读和幻读。 |
| 4 | 可以防止脏读和不可重复读,但会出现幻读。 |
| 8 | 脏读、不可重复读和幻读都可以防止。 |
HBase 与任何其他 RDBMS 一样,在表中存储数据。 Phoenix 使用标准的 SQL 查询来创建新表,同时定义主键和列类型。
此示例和所有后续示例都按照实例化新的 PhoenixClient 对象中的定义使用实例化的 PhoenixClient 对象。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// You can set certain request options, such as timeout in milliseconds:
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
CreateStatementResponse createStatementResponse = null;
OpenConnectionResponse openConnResponse = null;
try
{
// Opening connection
var info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Create the statement
createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result;
// Create the table if it does not exist
string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " +
"LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))";
await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
Console.WriteLine($"Table \"Customers\" created.");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
if (createStatementResponse != null)
{
client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait();
createStatementResponse = null;
}
if (openConnResponse != null)
{
client.CloseConnectionRequestAsync(connId, options).Wait();
openConnResponse = null;
}
}
前一示例使用 IF NOT EXISTS 选项创建名为 Customers 的新表。 CreateStatementRequestAsync 调用在 Avitica (PQS) 服务器中创建新的语句。 finally 块关闭返回的 CreateStatementResponse 和 OpenConnectionResponse 对象。
以下示例显示了一个单独的数据插入,引用的 List<string> 集合包含美国的州和领地缩写:
var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };
表的 StateProvince 列值会在后续的选择操作中使用。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options);
statementHandle = prepareResponse.Statement;
var r = new Random();
// Insert 300 rows
for (int i = 0; i < 300; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0,1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
Console.WriteLine("Inserting customer " + i);
await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options);
}
await client.CommitRequestAsync(connId, options);
Console.WriteLine("Upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
执行插入语句的结构类似于创建新表。 在 try 块的末尾,事务是显式提交的。 此示例重复插入事务 300 次。 以下示例演示更有效的批插入过程。
以下代码几乎与逐个插入数据的代码相同。 此示例在调用 ExecuteBatchRequestAsync 的过程中使用 UpdateBatch 对象,而不是使用准备好的语句重复调用 ExecuteRequestAsync。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
CreateStatementResponse createStatementResponse = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Creating statement
createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result;
var statementHandle = prepareResponse.Statement;
var updates = new pbc.RepeatedField<UpdateBatch>();
var r = new Random();
// Insert 300 rows
for (int i = 300; i < 600; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0, 1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
var batch = new UpdateBatch
{
ParameterValues = list
};
updates.Add(batch);
Console.WriteLine($"Added customer {i} to batch");
}
var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options);
Console.WriteLine("Batch upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
在一个测试环境中,逐个插入 300 个新记录花了差不多 2 分钟。 与之相反,一批插入 300 个记录只需 6 秒。
以下示例显示如何重复使用一个连接来执行多个查询:
- 选择所有记录,然后在返回默认的最大值 100 以后提取剩余的记录。
- 使用总行计数 select 语句检索单一标量结果。
- 执行一个 select 语句,返回单个州或领地的客户总数。
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.cn/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
var createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "SELECT * FROM Customers";
ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows;
// Loop through all of the returned rows and display the first two columns
for (int i = 0; i < rows.Count; i++)
{
Row row = rows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue);
}
// 100 is hard-coded on the server side as the default firstframe size
// FetchRequestAsync is called to get any remaining rows
Console.WriteLine("");
Console.WriteLine($"Number of rows: {rows.Count}");
// Fetch remaining rows, offset is not used, simply set to 0
// When FetchResponse.Frame.Done is true, all rows were fetched
FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options);
Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}");
Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}");
Console.WriteLine("");
// Running query 2
string sql2 = "select count(*) from Customers";
ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue;
Console.WriteLine($"Total customer records: {count}");
Console.WriteLine("");
// Running query 3
string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc";
ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows;
for (int i = 0; i < stateRows.Count; i++)
{
Row row = stateRows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue);
}
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
select 语句的输出应该是以下结果:
id0 first0
id1 first1
id10 first10
id100 first100
id101 first101
id102 first102
. . .
id185 first185
id186 first186
id187 first187
id188 first188
Number of rows: 100
Frame row count: 500
Fetch response is done: True
Total customer records: 600
NJ: 21
CA: 19
GU: 17
NC: 16
IN: 16
MA: 16
AZ: 16
ME: 16
IL: 15
OR: 15
. . .
MO: 10
HI: 10
GA: 10
DC: 9
NM: 9
MD: 9
MP: 9
SC: 7
AR: 7
MH: 6
FM: 5