Compartilhar via

Azure Cosmos DB for Apache Cassandra 中的更改馈送

适用对象: 卡珊德拉

重要

你是否正在寻找一种数据库解决方案,以应对需要高扩展性、99.999% 可用性服务级别协议(SLA)、即时自动扩展和跨多个区域的自动故障转移的场景? 请考虑使用 Azure Cosmos DB for NoSQL

Azure Cosmos DB for Apache Cassandra 中支持更改馈送功能,该功能通过 Cassandra 查询语言 (CQL) 中的查询谓词提供。 您可以使用这些谓词条件查询更改提取 API。 应用程序可以使用 CQL 中必需的主键(也称为分区键)来获取对表所做的更改。 然后,可以根据结果采取进一步的措施。 对表中行的更改将按照其修改时间顺序及每个分区键的排序顺序捕获。

以下示例演示如何使用 .NET 获取 Cassandra 键空间的表中所有行的变更提要。 直接在 CQL 中使用谓词 COSMOS_CHANGEFEED_START_TIME() 用于从指定的开始时间(在本例中为当前日期时间)查询变更流中的条目。 可以在此处(对于 C#)和此处(对于 Java)下载完整示例。

在每次迭代中,查询将利用分页状态从上次更改读取点继续。 可以看到,新的更改不断地流式传输到密钥空间中的表。 我们将会看到对已插入或更新的行所做的更改。 目前,API for Cassandra 尚不支持通过变更提要监视删除操作。

注释

在删除集合后重新使用令牌,然后使用相同的名称重新创建它会导致错误。 建议在创建新集合并重用集合名称时将 pageState 设置为 null。

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

若要通过主键获取单行的更改,可以在查询中添加主键。 以下示例演示如何跟踪指定了“user_id = 1”的行的更改。

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

当前限制

在使用 Cassandra 的 API 进行变更日志时,以下限制适用:

  • 目前支持插入和更新操作。 尚不支持删除操作。 一种解决方法是,对正在删除的行添加一个软标记。 例如,在名为“deleted”的行中添加一个字段,并将其设置为“true”。
  • 与在核心 API for NoSQL 中一样,上次更新将会保留;对实体的中间性更新将不可用。

错误处理

使用 Cassandra API 中的更改提要时,支持以下错误代码和消息:

  • HTTP 错误代码 429 - 当更改源的速率受到限制时,它会返回一个空页。

后续步骤