Compartir a través de

Azure Cosmos DB for Apache Cassandra 中的更改源

适用对象: Cassandra

Azure Cosmos DB for Apache Cassandra 中的更改源支持通过 Cassandra 查询语言 (CQL) 中的查询谓词提供。 使用这些谓词条件可以查询更改源 API。 应用程序可以使用 CQL 中必需的主键(也称为分区键)来获取对表所做的更改。 然后,可以根据结果采取进一步的措施。 对表中的行所做的更改将按照其修改时间顺序捕获,而排序顺序是按分区键捕获。

以下示例演示如何使用 .NET 获取 API for Cassandra 密钥空间表中所有行上的更改源。 直接在 CQL 中使用谓词 COSMOS_CHANGEFEED_START_TIME(),以从指定的开始时间(在本例中为当前日期时间)查询更改源中的项。 可以在此处(对于 C#)和此处(对于 Java)下载完整示例。

在每个迭代中,查询将使用分页状态从上次读取更改的时间点恢复。 可以看到,新的更改不断地流式传输到密钥空间中的表。 我们将会看到对已插入或更新的行所做的更改。 目前,不支持使用 API for Cassandra 中的更改源来监视删除操作。

注意

在删除集合后重新使用令牌,然后使用相同的名称重新创建它会导致错误。 建议在创建新集合并重用集合名称时将 pageState 设置为 null。

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

若要按主键获取对单个行所做的更改,可以在查询中添加主键。 以下示例演示如何跟踪指定了“user_id = 1”的行的更改。

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

当前限制

使用 API for Cassandra 的更改源时,以下限制适用:

  • 目前支持插入和更新操作。 尚不支持删除操作。 一种解决方法是,对正在删除的行添加一个软标记。 例如,在名为“deleted”的行中添加一个字段,并将其设置为“true”。
  • 与在核心 API for NoSQL 中一样,上次更新将会保留;对实体的中间性更新将不可用。

错误处理。

使用 API for Cassandra 中的更改源时,支持以下错误代码和消息:

  • HTTP 错误代码 429 - 当更改源的速率受到限制时,它会返回一个空页。

后续步骤