适用于 Java 的 Micrometer 指标
适用范围: NoSQL
适用于 Azure Cosmos DB 的 Java SDK 使用 Micrometer 工具在常用的可观测性系统(如 Prometheus)中实现客户端指标。 本文包含用于将指标抓取到 Prometheus 中的说明和代码片段,取自此示例。 此处介绍了由 SDK 提供的指标的完整列表。
使用来自 Prometheus 的指标
可以从此处下载 prometheus。 若要在适用于 Azure Cosmos DB 的 Java SDK 中通过 Prometheus 使用 Micrometer 指标,请首先确保已导入注册表和客户端所需的库:
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>0.5.0</version>
</dependency>
在应用程序中,向遥测配置提供 prometheus 注册表。请注意,可以设置各种诊断阈值,这有助于将使用的指标限制为最感兴趣的指标:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.prometheus.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Family;
import com.azure.cosmos.models.*;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
import com.sun.net.httpserver.HttpServer;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static com.azure.cosmos.examples.common.Profile.generateDocs;
public class CosmosClientMetricsQuickStartAsync {
private CosmosAsyncClient client;
private final String databaseName = "ClientMetricsPrometheusDB";
private final String containerName = "Container";
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
private static AtomicInteger number_docs_inserted = new AtomicInteger(0);
private static AtomicInteger write_request_count = new AtomicInteger(0);
private static AtomicInteger read_request_count = new AtomicInteger(0);
public static final int NUMBER_OF_DOCS = 5000;
public ArrayList<JsonNode> docs;
private final static Logger logger = LoggerFactory.getLogger(CosmosClientMetricsQuickStartAsync.class);
public void close() {
client.close();
}
public static void main(String[] args) {
CosmosClientMetricsQuickStartAsync quickStart = new CosmosClientMetricsQuickStartAsync();
try {
logger.info("Starting ASYNC main");
quickStart.clientMetricsPrometheusDemo();
logger.info("Demo complete, please hold while resources are released");
} finally {
logger.info("Shutting down");
quickStart.shutdown();
}
}
private void clientMetricsPrometheusDemo() {
logger.info("Using Azure Cosmos DB endpoint: {}", AccountSettings.HOST);
// <ClientMetricsConfig>
//prometheus meter registry
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
//provide the prometheus registry to the telemetry config
CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
// Any requests that violate (are lower than) any of the below thresholds that are set
// will not appear in "request-level" metrics (those with "rntbd" or "gw" in their name).
// The "operation-level" metrics (those with "ops" in their name) will still be collected.
// Use this to reduce noise in the amount of metrics collected.
.setRequestChargeThreshold(10)
.setNonPointOperationLatencyThreshold(Duration.ofDays(10))
.setPointOperationLatencyThreshold(Duration.ofDays(10))
)
// Uncomment below to apply sampling to help further tune client-side resource consumption related to metrics.
// The sampling rate can be modified after Azure Cosmos DB Client initialization – so the sampling rate can be
// modified without any restarts being necessary.
//.sampleDiagnostics(0.25)
.clientCorrelationId("samplePrometheusMetrics001")
.metricsOptions(new CosmosMicrometerMetricsOptions().meterRegistry(prometheusRegistry)
//.configureDefaultTagNames(CosmosMetricTagName.PARTITION_KEY_RANGE_ID)
.applyDiagnosticThresholdsForTransportLevelMeters(true)
);
// </ClientMetricsConfig>
// Start local HttpServer server to expose the meter registry metrics to Prometheus.
// When adding this endpoint to prometheus.yml, add the domain name and port to "targets".
// For example, if prometheus is running on the same server as this app, you can add localhost:8080:
// - targets: ["localhost:9090", "localhost:8080"]
// download and install prometheus from here: https://prometheus.io/download/
// <PrometheusTargetServer>
try {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/metrics", httpExchange -> {
String response = prometheusRegistry.scrape();
int i = 1;
httpExchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
os.write(response.getBytes());
}
});
new Thread(server::start).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
// </PrometheusTargetServer>
// <CosmosClient>
// Create async client
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.clientTelemetryConfig(telemetryConfig)
.consistencyLevel(ConsistencyLevel.SESSION) //make sure we can read our own writes
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();
// </CosmosClient>
try {
createDatabaseIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
createContainerIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
docs = generateDocs(NUMBER_OF_DOCS);
// None of the rntbd / request-level metrics for point operations will show as they violate one the thresholds set (minimum 10 RUs).
createManyDocuments();
readManyDocuments();
// The rntbd / request-level metrics for the below query will show as it exceeds 10 RUs.
// If you comment out the below, no rntbd / request-level metrics at all will be collected due to the thresholds set.
queryAllDocuments();
}
// Database create
private void createDatabaseIfNotExists() throws Exception {
logger.info("Creating database {} if not exists", databaseName);
// Create database if not exists
Mono<CosmosDatabaseResponse> databaseResponseMono = client.createDatabaseIfNotExists(databaseName);
CosmosDatabaseResponse cosmosDatabaseResponse = databaseResponseMono.block();
CosmosDiagnostics diagnostics = cosmosDatabaseResponse.getDiagnostics();
logger.info("Create database diagnostics : {}", diagnostics);
database = client.getDatabase(cosmosDatabaseResponse.getProperties().getId());
logger.info("Done.");
}
// Container create
private void createContainerIfNotExists() throws Exception {
logger.info("Creating container {} if not exists", containerName);
// Create container if not exists
CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, "/id");
// Provision throughput
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(10000);
// Create container
Mono<CosmosContainerResponse> containerResponseMono = database.createContainerIfNotExists(containerProperties,
throughputProperties);
CosmosContainerResponse cosmosContainerResponse = containerResponseMono.block();
CosmosDiagnostics diagnostics = cosmosContainerResponse.getDiagnostics();
logger.info("Create container diagnostics : {}", diagnostics);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
logger.info("Done.");
}
private void createManyDocuments() {
Flux.fromIterable(docs).flatMap(doc -> container.createItem(doc)
)
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 201) {
number_docs_inserted.getAndIncrement();
write_request_count.incrementAndGet();
} else
logger.info("WARNING insert status code {} != 201" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful write requests: " + write_request_count);
}
private void readManyDocuments() {
// collect the ids that were generated when writing the data.
List<String> list = new ArrayList<String>();
for (final JsonNode doc : docs) {
list.add(doc.get("id").asText());
}
final long startTime = System.currentTimeMillis();
Flux.fromIterable(list)
.flatMap(id -> container.readItem(id, new PartitionKey(id), JsonNode.class))
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 200) {
read_request_count.getAndIncrement();
} else
logger.info("WARNING insert status code {} != 200" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful read requests: " + read_request_count);
}
private void queryAllDocuments() {
int preferredPageSize = number_docs_inserted.get(); // We'll use this later
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
// Set populate query metrics to get metrics around query executions
queryOptions.setQueryMetricsEnabled(true);
CosmosPagedFlux<Family> pagedFluxResponse = container.queryItems(
"SELECT * FROM c", queryOptions, Family.class);
try {
pagedFluxResponse.byPage(preferredPageSize).flatMap(fluxResponse -> {
logger.info("Got a page of query result with " +
fluxResponse.getResults().size() + " items(s)"
+ " and request charge of " + fluxResponse.getRequestCharge());
return Flux.empty();
}).blockLast();
} catch(Exception err) {
if (err instanceof CosmosException) {
//Client-specific errors
CosmosException cerr = (CosmosException) err;
cerr.printStackTrace();
logger.error(String.format("Query failed with %s\n", cerr));
} else {
//General errors
err.printStackTrace();
}
}
}
// Database delete
private void deleteDatabase() throws Exception {
logger.info("Last step: delete database {} by ID", databaseName);
// Delete database
CosmosDatabaseResponse dbResp =
client.getDatabase(databaseName).delete(new CosmosDatabaseRequestOptions()).block();
logger.info("Status code for database delete: {}", dbResp.getStatusCode());
logger.info("Done.");
}
// Cleanup before close
private void shutdown() {
try {
//Clean shutdown
deleteDatabase();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client", err);
}
//client.close();
logger.info("Done with sample.");
}
}
启动本地 HttpServer 服务器,以向 Prometheus 公开计量注册表指标:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.prometheus.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Family;
import com.azure.cosmos.models.*;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
import com.sun.net.httpserver.HttpServer;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static com.azure.cosmos.examples.common.Profile.generateDocs;
public class CosmosClientMetricsQuickStartAsync {
private CosmosAsyncClient client;
private final String databaseName = "ClientMetricsPrometheusDB";
private final String containerName = "Container";
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
private static AtomicInteger number_docs_inserted = new AtomicInteger(0);
private static AtomicInteger write_request_count = new AtomicInteger(0);
private static AtomicInteger read_request_count = new AtomicInteger(0);
public static final int NUMBER_OF_DOCS = 5000;
public ArrayList<JsonNode> docs;
private final static Logger logger = LoggerFactory.getLogger(CosmosClientMetricsQuickStartAsync.class);
public void close() {
client.close();
}
public static void main(String[] args) {
CosmosClientMetricsQuickStartAsync quickStart = new CosmosClientMetricsQuickStartAsync();
try {
logger.info("Starting ASYNC main");
quickStart.clientMetricsPrometheusDemo();
logger.info("Demo complete, please hold while resources are released");
} finally {
logger.info("Shutting down");
quickStart.shutdown();
}
}
private void clientMetricsPrometheusDemo() {
logger.info("Using Azure Cosmos DB endpoint: {}", AccountSettings.HOST);
// <ClientMetricsConfig>
//prometheus meter registry
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
//provide the prometheus registry to the telemetry config
CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
// Any requests that violate (are lower than) any of the below thresholds that are set
// will not appear in "request-level" metrics (those with "rntbd" or "gw" in their name).
// The "operation-level" metrics (those with "ops" in their name) will still be collected.
// Use this to reduce noise in the amount of metrics collected.
.setRequestChargeThreshold(10)
.setNonPointOperationLatencyThreshold(Duration.ofDays(10))
.setPointOperationLatencyThreshold(Duration.ofDays(10))
)
// Uncomment below to apply sampling to help further tune client-side resource consumption related to metrics.
// The sampling rate can be modified after Azure Cosmos DB Client initialization – so the sampling rate can be
// modified without any restarts being necessary.
//.sampleDiagnostics(0.25)
.clientCorrelationId("samplePrometheusMetrics001")
.metricsOptions(new CosmosMicrometerMetricsOptions().meterRegistry(prometheusRegistry)
//.configureDefaultTagNames(CosmosMetricTagName.PARTITION_KEY_RANGE_ID)
.applyDiagnosticThresholdsForTransportLevelMeters(true)
);
// </ClientMetricsConfig>
// Start local HttpServer server to expose the meter registry metrics to Prometheus.
// When adding this endpoint to prometheus.yml, add the domain name and port to "targets".
// For example, if prometheus is running on the same server as this app, you can add localhost:8080:
// - targets: ["localhost:9090", "localhost:8080"]
// download and install prometheus from here: https://prometheus.io/download/
// <PrometheusTargetServer>
try {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/metrics", httpExchange -> {
String response = prometheusRegistry.scrape();
int i = 1;
httpExchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
os.write(response.getBytes());
}
});
new Thread(server::start).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
// </PrometheusTargetServer>
// <CosmosClient>
// Create async client
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.clientTelemetryConfig(telemetryConfig)
.consistencyLevel(ConsistencyLevel.SESSION) //make sure we can read our own writes
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();
// </CosmosClient>
try {
createDatabaseIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
createContainerIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
docs = generateDocs(NUMBER_OF_DOCS);
// None of the rntbd / request-level metrics for point operations will show as they violate one the thresholds set (minimum 10 RUs).
createManyDocuments();
readManyDocuments();
// The rntbd / request-level metrics for the below query will show as it exceeds 10 RUs.
// If you comment out the below, no rntbd / request-level metrics at all will be collected due to the thresholds set.
queryAllDocuments();
}
// Database create
private void createDatabaseIfNotExists() throws Exception {
logger.info("Creating database {} if not exists", databaseName);
// Create database if not exists
Mono<CosmosDatabaseResponse> databaseResponseMono = client.createDatabaseIfNotExists(databaseName);
CosmosDatabaseResponse cosmosDatabaseResponse = databaseResponseMono.block();
CosmosDiagnostics diagnostics = cosmosDatabaseResponse.getDiagnostics();
logger.info("Create database diagnostics : {}", diagnostics);
database = client.getDatabase(cosmosDatabaseResponse.getProperties().getId());
logger.info("Done.");
}
// Container create
private void createContainerIfNotExists() throws Exception {
logger.info("Creating container {} if not exists", containerName);
// Create container if not exists
CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, "/id");
// Provision throughput
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(10000);
// Create container
Mono<CosmosContainerResponse> containerResponseMono = database.createContainerIfNotExists(containerProperties,
throughputProperties);
CosmosContainerResponse cosmosContainerResponse = containerResponseMono.block();
CosmosDiagnostics diagnostics = cosmosContainerResponse.getDiagnostics();
logger.info("Create container diagnostics : {}", diagnostics);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
logger.info("Done.");
}
private void createManyDocuments() {
Flux.fromIterable(docs).flatMap(doc -> container.createItem(doc)
)
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 201) {
number_docs_inserted.getAndIncrement();
write_request_count.incrementAndGet();
} else
logger.info("WARNING insert status code {} != 201" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful write requests: " + write_request_count);
}
private void readManyDocuments() {
// collect the ids that were generated when writing the data.
List<String> list = new ArrayList<String>();
for (final JsonNode doc : docs) {
list.add(doc.get("id").asText());
}
final long startTime = System.currentTimeMillis();
Flux.fromIterable(list)
.flatMap(id -> container.readItem(id, new PartitionKey(id), JsonNode.class))
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 200) {
read_request_count.getAndIncrement();
} else
logger.info("WARNING insert status code {} != 200" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful read requests: " + read_request_count);
}
private void queryAllDocuments() {
int preferredPageSize = number_docs_inserted.get(); // We'll use this later
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
// Set populate query metrics to get metrics around query executions
queryOptions.setQueryMetricsEnabled(true);
CosmosPagedFlux<Family> pagedFluxResponse = container.queryItems(
"SELECT * FROM c", queryOptions, Family.class);
try {
pagedFluxResponse.byPage(preferredPageSize).flatMap(fluxResponse -> {
logger.info("Got a page of query result with " +
fluxResponse.getResults().size() + " items(s)"
+ " and request charge of " + fluxResponse.getRequestCharge());
return Flux.empty();
}).blockLast();
} catch(Exception err) {
if (err instanceof CosmosException) {
//Client-specific errors
CosmosException cerr = (CosmosException) err;
cerr.printStackTrace();
logger.error(String.format("Query failed with %s\n", cerr));
} else {
//General errors
err.printStackTrace();
}
}
}
// Database delete
private void deleteDatabase() throws Exception {
logger.info("Last step: delete database {} by ID", databaseName);
// Delete database
CosmosDatabaseResponse dbResp =
client.getDatabase(databaseName).delete(new CosmosDatabaseRequestOptions()).block();
logger.info("Status code for database delete: {}", dbResp.getStatusCode());
logger.info("Done.");
}
// Cleanup before close
private void shutdown() {
try {
//Clean shutdown
deleteDatabase();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client", err);
}
//client.close();
logger.info("Done with sample.");
}
}
确保在创建 CosmosClient
时传递 clientTelemetryConfig
:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.prometheus.async;
import com.azure.cosmos.*;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.examples.common.Family;
import com.azure.cosmos.models.*;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.fasterxml.jackson.databind.JsonNode;
import com.sun.net.httpserver.HttpServer;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static com.azure.cosmos.examples.common.Profile.generateDocs;
public class CosmosClientMetricsQuickStartAsync {
private CosmosAsyncClient client;
private final String databaseName = "ClientMetricsPrometheusDB";
private final String containerName = "Container";
private CosmosAsyncDatabase database;
private CosmosAsyncContainer container;
private static AtomicInteger number_docs_inserted = new AtomicInteger(0);
private static AtomicInteger write_request_count = new AtomicInteger(0);
private static AtomicInteger read_request_count = new AtomicInteger(0);
public static final int NUMBER_OF_DOCS = 5000;
public ArrayList<JsonNode> docs;
private final static Logger logger = LoggerFactory.getLogger(CosmosClientMetricsQuickStartAsync.class);
public void close() {
client.close();
}
public static void main(String[] args) {
CosmosClientMetricsQuickStartAsync quickStart = new CosmosClientMetricsQuickStartAsync();
try {
logger.info("Starting ASYNC main");
quickStart.clientMetricsPrometheusDemo();
logger.info("Demo complete, please hold while resources are released");
} finally {
logger.info("Shutting down");
quickStart.shutdown();
}
}
private void clientMetricsPrometheusDemo() {
logger.info("Using Azure Cosmos DB endpoint: {}", AccountSettings.HOST);
// <ClientMetricsConfig>
//prometheus meter registry
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
//provide the prometheus registry to the telemetry config
CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
// Any requests that violate (are lower than) any of the below thresholds that are set
// will not appear in "request-level" metrics (those with "rntbd" or "gw" in their name).
// The "operation-level" metrics (those with "ops" in their name) will still be collected.
// Use this to reduce noise in the amount of metrics collected.
.setRequestChargeThreshold(10)
.setNonPointOperationLatencyThreshold(Duration.ofDays(10))
.setPointOperationLatencyThreshold(Duration.ofDays(10))
)
// Uncomment below to apply sampling to help further tune client-side resource consumption related to metrics.
// The sampling rate can be modified after Azure Cosmos DB Client initialization – so the sampling rate can be
// modified without any restarts being necessary.
//.sampleDiagnostics(0.25)
.clientCorrelationId("samplePrometheusMetrics001")
.metricsOptions(new CosmosMicrometerMetricsOptions().meterRegistry(prometheusRegistry)
//.configureDefaultTagNames(CosmosMetricTagName.PARTITION_KEY_RANGE_ID)
.applyDiagnosticThresholdsForTransportLevelMeters(true)
);
// </ClientMetricsConfig>
// Start local HttpServer server to expose the meter registry metrics to Prometheus.
// When adding this endpoint to prometheus.yml, add the domain name and port to "targets".
// For example, if prometheus is running on the same server as this app, you can add localhost:8080:
// - targets: ["localhost:9090", "localhost:8080"]
// download and install prometheus from here: https://prometheus.io/download/
// <PrometheusTargetServer>
try {
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/metrics", httpExchange -> {
String response = prometheusRegistry.scrape();
int i = 1;
httpExchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = httpExchange.getResponseBody()) {
os.write(response.getBytes());
}
});
new Thread(server::start).start();
} catch (IOException e) {
throw new RuntimeException(e);
}
// </PrometheusTargetServer>
// <CosmosClient>
// Create async client
client = new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.clientTelemetryConfig(telemetryConfig)
.consistencyLevel(ConsistencyLevel.SESSION) //make sure we can read our own writes
.contentResponseOnWriteEnabled(true)
.buildAsyncClient();
// </CosmosClient>
try {
createDatabaseIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
createContainerIfNotExists();
} catch (Exception e) {
throw new RuntimeException(e);
}
docs = generateDocs(NUMBER_OF_DOCS);
// None of the rntbd / request-level metrics for point operations will show as they violate one the thresholds set (minimum 10 RUs).
createManyDocuments();
readManyDocuments();
// The rntbd / request-level metrics for the below query will show as it exceeds 10 RUs.
// If you comment out the below, no rntbd / request-level metrics at all will be collected due to the thresholds set.
queryAllDocuments();
}
// Database create
private void createDatabaseIfNotExists() throws Exception {
logger.info("Creating database {} if not exists", databaseName);
// Create database if not exists
Mono<CosmosDatabaseResponse> databaseResponseMono = client.createDatabaseIfNotExists(databaseName);
CosmosDatabaseResponse cosmosDatabaseResponse = databaseResponseMono.block();
CosmosDiagnostics diagnostics = cosmosDatabaseResponse.getDiagnostics();
logger.info("Create database diagnostics : {}", diagnostics);
database = client.getDatabase(cosmosDatabaseResponse.getProperties().getId());
logger.info("Done.");
}
// Container create
private void createContainerIfNotExists() throws Exception {
logger.info("Creating container {} if not exists", containerName);
// Create container if not exists
CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, "/id");
// Provision throughput
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(10000);
// Create container
Mono<CosmosContainerResponse> containerResponseMono = database.createContainerIfNotExists(containerProperties,
throughputProperties);
CosmosContainerResponse cosmosContainerResponse = containerResponseMono.block();
CosmosDiagnostics diagnostics = cosmosContainerResponse.getDiagnostics();
logger.info("Create container diagnostics : {}", diagnostics);
container = database.getContainer(cosmosContainerResponse.getProperties().getId());
logger.info("Done.");
}
private void createManyDocuments() {
Flux.fromIterable(docs).flatMap(doc -> container.createItem(doc)
)
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 201) {
number_docs_inserted.getAndIncrement();
write_request_count.incrementAndGet();
} else
logger.info("WARNING insert status code {} != 201" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful write requests: " + write_request_count);
}
private void readManyDocuments() {
// collect the ids that were generated when writing the data.
List<String> list = new ArrayList<String>();
for (final JsonNode doc : docs) {
list.add(doc.get("id").asText());
}
final long startTime = System.currentTimeMillis();
Flux.fromIterable(list)
.flatMap(id -> container.readItem(id, new PartitionKey(id), JsonNode.class))
.flatMap(itemResponse -> {
if (itemResponse.getStatusCode() == 200) {
read_request_count.getAndIncrement();
} else
logger.info("WARNING insert status code {} != 200" + itemResponse.getStatusCode());
return Mono.empty();
})
.onErrorContinue((throwable, o) -> {
logger.info(
"Exception in create docs. e: {}", throwable.getMessage(), throwable
);
}).blockLast();
logger.info("Number of successful read requests: " + read_request_count);
}
private void queryAllDocuments() {
int preferredPageSize = number_docs_inserted.get(); // We'll use this later
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
// Set populate query metrics to get metrics around query executions
queryOptions.setQueryMetricsEnabled(true);
CosmosPagedFlux<Family> pagedFluxResponse = container.queryItems(
"SELECT * FROM c", queryOptions, Family.class);
try {
pagedFluxResponse.byPage(preferredPageSize).flatMap(fluxResponse -> {
logger.info("Got a page of query result with " +
fluxResponse.getResults().size() + " items(s)"
+ " and request charge of " + fluxResponse.getRequestCharge());
return Flux.empty();
}).blockLast();
} catch(Exception err) {
if (err instanceof CosmosException) {
//Client-specific errors
CosmosException cerr = (CosmosException) err;
cerr.printStackTrace();
logger.error(String.format("Query failed with %s\n", cerr));
} else {
//General errors
err.printStackTrace();
}
}
}
// Database delete
private void deleteDatabase() throws Exception {
logger.info("Last step: delete database {} by ID", databaseName);
// Delete database
CosmosDatabaseResponse dbResp =
client.getDatabase(databaseName).delete(new CosmosDatabaseRequestOptions()).block();
logger.info("Status code for database delete: {}", dbResp.getStatusCode());
logger.info("Done.");
}
// Cleanup before close
private void shutdown() {
try {
//Clean shutdown
deleteDatabase();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client", err);
}
//client.close();
logger.info("Done with sample.");
}
}
将应用程序客户端的终结点添加到 prometheus.yml
时,在“targets”中添加域名和端口。 例如,如果 prometheus 与应用客户端运行在同一服务器上,则可以按如下所示添加 localhost:8080
到 targets
:
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: "prometheus"
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ["localhost:9090", "localhost:8080"]
现在,可以使用来自 Prometheus 的指标: