Azure Cosmos DB API for Cassandra 中的更改源

适用于: Cassandra API

Azure Cosmos DB API for Cassandra 中的更改源支持通过 Cassandra 查询语言 (CQL) 中的查询谓词提供。 使用这些谓词条件可以查询更改源 API。 应用程序可以使用 CQL 中必需的主键(也称为分区键)来获取对表所做的更改。 然后,可以根据结果采取进一步的措施。 对表中的行所做的更改将按照其修改时间顺序捕获,而排序顺序是按分区键捕获。

以下示例演示如何使用 .NET 获取 Cassandra API 密钥空间表中所有行上的更改源。 直接在 CQL 中使用谓词 COSMOS_CHANGEFEED_START_TIME(),以从指定的开始时间(在本例中为当前日期时间)查询更改源中的项。 可以在此处(对于 C#)和此处(对于 Java)下载完整示例。

在每个迭代中,查询将使用分页状态从上次读取更改的时间点恢复。 可以看到,新的更改不断地流式传输到密钥空间中的表。 我们将会看到对已插入或更新的行所做的更改。 目前不支持使用 Cassandra API 中的更改源来监视删除操作。

Session cassandraSession = utils.getSession();

try {
	  DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
	   LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
	   String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
   			+ dtf.format(now)+ "'";

	 byte[] token=null; 
	 System.out.println(query); 
	 while(true)
	 {
		 SimpleStatement st=new  SimpleStatement(query);
		 st.setFetchSize(100);
		 if(token!=null)
			 st.setPagingStateUnsafe(token);

		 ResultSet result=cassandraSession.execute(st) ;
		 token=result.getExecutionInfo().getPagingState().toBytes();

		 for(Row row:result)
		 {
			 System.out.println(row.getString("user_name"));
		 }
	 }

} finally {
    utils.close();
    LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
}

若要按主键获取对单个行所做的更改,可以在查询中添加主键。 以下示例演示如何跟踪指定了“user_id = 1”的行的更改。

String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
       			+ dtf.format(now)+ "'";
SimpleStatement st=new  SimpleStatement(query);

当前限制

使用 Cassandra API 的更改源时,以下限制适用:

  • 目前支持插入和更新操作。 尚不支持删除操作。 一种解决方法是,对正在删除的行添加一个软标记。 例如,在名为“deleted”的行中添加一个字段,并将其设置为“true”。
  • 与在核心 SQL API 中一样,上次更新将会保留;对实体的中间性更新将不可用。

错误处理。

使用 Cassandra API 中的更改源时,支持以下错误代码和消息:

  • HTTP 错误代码 429 - 当更改源的速率受到限制时,它会返回一个空页。

后续步骤