如何创建使用 Azure Cosmos DB SQL API 和更改源处理器的 Java 应用How to create a Java application that uses Azure Cosmos DB SQL API and change feed processor

本操作指南逐步介绍了如何创建简单的 Java 应用,此应用使用 Azure Cosmos DB SQL API 将文档插入到 Azure Cosmos DB 容器,同时使用更改源和更改源处理器来维护容器的具体化视图。This how-to guide walks you through a simple Java application which uses the Azure Cosmos DB SQL API to insert documents into an Azure Cosmos DB container, while maintaining a materialized view of the container using Change Feed and Change Feed Processor. 此 Java 应用使用 Azure Cosmos DB Java SDK v4 与 Azure Cosmos DB SQL API 通信。The Java application communicates with the Azure Cosmos DB SQL API using Azure Cosmos DB Java SDK v4.

重要

本教程仅适用于 Azure Cosmos DB Java SDK v4。This tutorial is for Azure Cosmos DB Java SDK v4 only. 请查看 Azure Cosmos DB Java SDK v4 发行说明Maven 存储库、Azure Cosmos DB Java SDK v4 性能提示和 Azure Cosmos DB Java SDK v4 故障排除指南了解详细信息。Please view the Azure Cosmos DB Java SDK v4 Release notes, Maven repository, Azure Cosmos DB Java SDK v4 performance tips, and Azure Cosmos DB Java SDK v4 troubleshooting guide for more information. 如果你当前使用的是早于 v4 的版本,请参阅迁移到 Azure Cosmos DB Java SDK v4 指南,获取升级到 v4 的相关帮助。If you are currently using an older version than v4, see the Migrate to Azure Cosmos DB Java SDK v4 guide for help upgrading to v4.

先决条件Prerequisites

  • Azure Cosmos DB 帐户的 URI 和密钥The URI and key for your Azure Cosmos DB account

  • MavenMaven

  • Java 8Java 8

背景Background

Azure Cosmos DB 更改源提供了事件驱动的接口,用于触发操作来响应文档插入。The Azure Cosmos DB change feed provides an event-driven interface to trigger actions in response to document insertion. 此功能有很多用途。This has many uses. 例如,在读取和写入量大的应用中,更改源的主要用途是在容器引入文档时创建容器的实时具体化视图。For example in applications which are both read and write heavy, a chief use of change feed is to create a real-time materialized view of a container as it is ingesting documents. 具体化视图容器包含相同的数据,但为了提高读取效率,该容器进行了分区,因此可以提高应用程序的读取和写入效率。The materialized view container will hold the same data but partitioned for efficient reads, making the application both read and write efficient.

管理更改源事件的工作主要由 SDK 中内置的更改源处理器库来完成。The work of managing change feed events is largely taken care of by the change feed Processor library built into the SDK. 此库足够强大,可以根据需要在多个工作线程之间分配更改源事件。This library is powerful enough to distribute change feed events among multiple workers, if that is desired. 你所要做的就是为更改源库提供回调。All you have to do is provide the change feed library a callback.

此简单示例展示了更改源处理器库,其中包含一个在具体化视图中创建和删除文档的工作线程。This simple example demonstrates change feed Processor library with a single worker creating and deleting documents from a materialized view.

设置Setup

克隆应用示例存储库(如果尚未这样做):If you have not already done so, clone the app example repo:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-app-example.git

在存储库目录中打开终端。Open a terminal in the repo directory. 运行以下命令来生成应用Build the app by running

mvn clean package

演练Walkthrough

  1. 首先检查是否有一个 Azure Cosmos DB 帐户。As a first check, you should have an Azure Cosmos DB account. 在浏览器中打开 Azure 门户,转到你的 Azure Cosmos DB 帐户,然后在左侧窗格中转到“数据资源管理器”。Open the Azure portal in your browser, go to your Azure Cosmos DB account, and in the left pane navigate to Data Explorer .

    Azure Cosmos DB 帐户

  2. 在终端中使用以下命令运行应用:Run the app in the terminal using the following command:

    mvn exec:java -Dexec.mainClass="com.azure.cosmos.workedappexample.SampleGroceryStore" -DACCOUNT_HOST="your-account-uri" -DACCOUNT_KEY="your-account-key" -Dexec.cleanupDaemonThreads=false
    
  3. 看到以下内容时,请按 EnterPress enter when you see

    Press enter to create the grocery store inventory system...
    

    然后,在浏览器中返回到 Azure 门户上的数据资源管理器。then return to the Azure portal Data Explorer in your browser. 此时会看到已添加包含三个空容器的数据库 GroceryStoreDatabase:You will see a database GroceryStoreDatabase has been added with three empty containers:

    • InventoryContainer - 示例杂货店的库存记录,已按项 id(一个 UUID)进行分区。InventoryContainer - The inventory record for our example grocery store, partitioned on item id which is a UUID.
    • InventoryContainer-pktype - 库存记录的具体化视图,已针对项 type 的查询进行优化InventoryContainer-pktype - A materialized view of the inventory record, optimized for queries over item type
    • InventoryContainer-leases - 更改源始终需要租赁容器;租赁跟踪应用读取更改源的进度。InventoryContainer-leases - A leases container is always needed for change feed; leases track the app's progress in reading the change feed.

    空容器

  4. 在终端中,现在应会看到一条提示In the terminal, you should now see a prompt

    Press enter to start creating the materialized view...
    

    按 Enter。Press enter. 现在,下面的代码块将在另一个线程中执行并初始化更改源处理器:Now the following block of code will execute and initialize the change feed processor on another thread:

    Java SDK V4 (Maven com.azure::azure-cosmos) 异步 APIJava SDK V4 (Maven com.azure::azure-cosmos) Async API

    
    changeFeedProcessorInstance = getChangeFeedProcessor("SampleHost_1", feedContainer, leaseContainer);
    changeFeedProcessorInstance.start()
        .subscribeOn(Schedulers.elastic())
        .doOnSuccess(aVoid -> {
            isProcessorRunning.set(true);
        })
        .subscribe();
    
    while (!isProcessorRunning.get()); //Wait for Change Feed processor start
    
    

    "SampleHost_1" 是更改源处理器工作线程的名称。"SampleHost_1" is the name of the Change Feed processor worker. changeFeedProcessorInstance.start() 是实际启动更改源处理器的组件。changeFeedProcessorInstance.start() is what actually starts the Change Feed processor.

    在浏览器中返回到 Azure 门户上的数据资源管理器。Return to the Azure portal Data Explorer in your browser. 在“InventoryContainer-leases”容器下,单击“项”以查看其内容。 Under the InventoryContainer-leases container, click items to see its contents. 此时会看到,更改源处理器已填充了租约容器,即,处理器已在 InventoryContainer 的某些分区中为 SampleHost_1 工作线程分配了一个租约。You will see that Change Feed Processor has populated the lease container, i.e. the processor has assigned the SampleHost_1 worker a lease on some partitions of the InventoryContainer .

    租约

  5. 再次在终端中按 Enter。Press enter again in the terminal. 这会触发将 10 个文档插入 InventoryContainer 的事件。This will trigger 10 documents to be inserted into InventoryContainer . 每个文档插入事件在更改源中显示为 JSON;下面的回调代码通过将 JSON 文档镜像到具体化视图来处理这些事件:Each document insertion appears in the change feed as JSON; the following callback code handles these events by mirroring the JSON documents into a materialized view:

    Java SDK V4 (Maven com.azure::azure-cosmos) 异步 APIJava SDK V4 (Maven com.azure::azure-cosmos) Async API

    
    public static ChangeFeedProcessor getChangeFeedProcessor(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
        ChangeFeedProcessorOptions cfOptions = new ChangeFeedProcessorOptions();
        cfOptions.setFeedPollDelay(Duration.ofMillis(100));
        cfOptions.setStartFromBeginning(true);
        return new ChangeFeedProcessorBuilder()
            .options(cfOptions)
            .hostName(hostName)
            .feedContainer(feedContainer)
            .leaseContainer(leaseContainer)
            .handleChanges((List<JsonNode> docs) -> {
                for (JsonNode document : docs) {
                        //Duplicate each document update from the feed container into the materialized view container
                        updateInventoryTypeMaterializedView(document);
                }
    
            })
            .buildChangeFeedProcessor();
    }
    
    private static void updateInventoryTypeMaterializedView(JsonNode document) {
        typeContainer.upsertItem(document).subscribe();
    }
    
    
  6. 让代码运行 5-10 秒。Allow the code to run 5-10sec. 然后,返回到 Azure 门户上的数据资源管理器,并依次转到“InventoryContainer”>“项”。Then return to the Azure portal Data Explorer and navigate to InventoryContainer > items . 此时会看到,项正在插入到库存容器;请记下分区键 (id)。You should see that items are being inserted into the inventory container; note the partition key (id).

    源容器

  7. 现在,请在数据资源管理器中导航到“InventoryContainer-pktype”>“项”。Now, in Data Explorer navigate to InventoryContainer-pktype > items . 这是具体化视图 - 此容器中的项是 InventoryContainer 的镜像,因为它们是由更改源以编程方式插入的。This is the materialized view - the items in this container mirror InventoryContainer because they were inserted programmatically by change feed. 记下分区键 (type)。Note the partition key (type). 因此,此具体化视图已针对 type 查询筛选进行优化,但它在 InventoryContainer 中效率不高,因为此容器是按 id 分区的。So this materialized view is optimized for queries filtering over type, which would be inefficient on InventoryContainer because it is partitioned on id.

    屏幕截图显示了 Azure Cosmos DB 帐户的“数据资源管理器”页,其中已选中“项”。

  8. 我们将同时从 InventoryContainer 和 InventoryContainer-pktype 删除某个文档,只需使用一个 upsertItem() 调用即可。 We're going to delete a document from both InventoryContainer and InventoryContainer-pktype using just a single upsertItem() call. 首先,查看 Azure 门户上的数据资源管理器。First, take a look at Azure portal Data Explorer. 我们将删除 /type == "plums" 的文档;下面以红框突出显示了此项We'll delete the document for which /type == "plums"; it is encircled in red below

    屏幕截图显示了 Azure Cosmos DB 帐户的“数据资源管理器”页,其中已选择了特定项 ID。

    再次按 Enter,以调用示例代码中的函数 deleteDocument()Hit enter again to call the function deleteDocument() in the example code. 此函数(如下所示)更新插入 /ttl == 5 的文档的新版本,这会将该文档的生存时间 (TTL) 设置为 5 秒。This function, shown below, upserts a new version of the document with /ttl == 5, which sets document Time-To-Live (TTL) to 5sec.

    Java SDK V4 (Maven com.azure::azure-cosmos) 异步 APIJava SDK V4 (Maven com.azure::azure-cosmos) Async API

    
    public static void deleteDocument() {
    
        String jsonString =    "{\"id\" : \"" + idToDelete + "\""
                + ","
                + "\"brand\" : \"Jerry's\""
                + ","
                + "\"type\" : \"plums\""
                + ","
                + "\"quantity\" : \"50\""
                + ","
                + "\"ttl\" : 5"
                + "}";
    
        ObjectMapper mapper = new ObjectMapper();
        JsonNode document = null;
    
        try {
            document = mapper.readTree(jsonString);
        } catch (Exception e) {
            e.printStackTrace();
        }
    
        feedContainer.upsertItem(document,new CosmosItemRequestOptions()).block();
    }
    
    

    更改源 feedPollDelay 设置为 100 毫秒;因此,更改源几乎可以即时响应此更新,并按如上所示调用 updateInventoryTypeMaterializedView()The change feed feedPollDelay is set to 100ms; therefore, change feed responds to this update almost instantly and calls updateInventoryTypeMaterializedView() shown above. 最后一个函数调用将 TTL 为 5 秒的新文档更新插入到 InventoryContainer-pktype。That last function call will upsert the new document with TTL of 5sec into InventoryContainer-pktype .

    效果是,在大约 5 秒后,该文档将会过期并从两个容器中删除。The effect is that after about 5 seconds, the document will expire and be deleted from both containers.

    之所以需要完成此过程是因为,更改源只会针对项插入或更新操作发出事件,而不会针对项删除操作发出事件。This procedure is necessary because change feed only issues events on item insertion or update, not on item deletion.

  9. 再次按 Enter 以关闭程序并清理其资源。Press enter one more time to close the program and clean up its resources.