排队引入命令用例(预览版)

适用于:✅Azure 数据资源管理器

排队引入命令允许你在引入历史数据之前测试数据引入过程并修复问题。 本文介绍如何使用队列摄取命令优化历史数据摄取。 执行以下步骤以优化历史数据排队处理:

  1. 列出文件夹中的 Blob
  2. 引入文件夹
  3. 跟踪引入状态
  4. 筛选要引入的排队文件
  5. 捕获创建时间
  6. 引入 20 个文件
  7. 跟踪后续引入状态
  8. 执行完整引入
  9. 取消引入

注释

排队引入命令在数据引入 URI 终结点 https://ingest-<YourClusterName><Region>.kusto.chinacloudapi.cn 上运行。

列出文件夹中的 Blob

为了更好地了解历史数据,您最多从 Azure Blob 存储容器中列出 10 个 Blob。

.list blobs (
    "https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
MaxFiles=10

输出

BlobUri 以字节为单位的大小 捕获的变量
<https:// BlobStorageLocation>/<FolderName>/part-100.parquet 7,429,062 {}
<https://<BlobStorageLocation</>FolderName/part-101.parquet 262,610 {}
<https://>BlobStorageLocation</>FolderName/part-102.parquet 6,154,166 {}
<https://>BlobStorageLocation</>FolderName/part-103.parquet 7,460,408 {}
<https:// BlobStorageLocation>/<FolderName>/part-104.parquet 6,154,166 {}
<https://BlobStorageLocation>/<FolderName>/part-105.parquet 7,441,587 {}
<https://Blob存储位置>/<文件夹名称>/part-106.parquet 1,087,425 {}
<https://<BlobStorageLocation</>FolderName/part-107.parquet 6,238,357 {}
<https:// BlobStorageLocation>/<FolderName>/part-208.csv 7,460,408 {}
<https://BlobStorageLocation>/<FolderName>/part-109.parquet 6,338,148 {}

现在可以验证 Blob 是否是要引入的正确 Blob。

引入文件夹

接下来,将 10 个 parquet 文件排队,以引入到 Logs 数据库的 TestDatabase 表中,并为引入启用跟踪。

.ingest-from-storage-queued into table database('TestDatabase').Logs
EnableTracking=true
with (format='parquet')
<|
    .list blobs (
        "https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
    )
    MaxFiles=10

输出

IngestionOperationId ClientRequestId OperationInfo
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 Kusto.Web.KWE,查询;11112222;11112222;22223333-bbbb-3333-cccc-4444cccc5555 .show queued ingestion operations “00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444”

然后使用包含 OperationInfoIngestionOperationId跟踪引入状态

然后,它 CancelationInfo包括 IngestionOperationId该作,用于 取消引入作

跟踪引入状态

运行 .show queued ingestion operations 命令来检查引入是否已完成,或者是否存在任何错误。

.show queued ingestion operations "00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444"

输出

IngestionOperationId 开始时间 上次更新时间 国家 已发现 进行中 已引入 已失败 已取消 样本失败原因 数据库
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 2025-03-19 14:57:41.0000000 2025-01-10 15:15:04.0000000 完成 10 0 10 0 0 TestDatabase 日志

如果 State 不是 Completed,您可以再次运行 .show queued ingestion operations。 再次运行此命令可以监视引入的 Blob 的数量增加情况,直到 State 更改为 Completed。 如有必要,还可以取消引入

筛选要引入的排队文件

检查引入结果后,再次尝试列出要引入的 Blob。 这一次添加了 parquet 后缀,以确保只引入 parquet 文件。

.list blobs (
    "https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
Suffix="parquet"
MaxFiles=10

输出

BlobUri 以字节为单位的大小 捕获的变量
<https:// BlobStorageLocation>/<FolderName>/part-100.parquet 7,429,062 {}
<https://<BlobStorageLocation</>FolderName/part-101.parquet 262,610 {}
<https://>BlobStorageLocation</>FolderName/part-102.parquet 6,154,166 {}
<https://>BlobStorageLocation</>FolderName/part-103.parquet 7,460,408 {}
<https:// BlobStorageLocation>/<FolderName>/part-104.parquet 6,154,166 {}
<https://BlobStorageLocation>/<FolderName>/part-105.parquet 7,441,587 {}
<https://Blob存储位置>/<文件夹名称>/part-106.parquet 1,087,425 {}
<https://<BlobStorageLocation</>FolderName/part-107.parquet 6,238,357 {}
<https://<BlobStorageLocation</>FolderName/part-108.parquet 7,460,408 {}
<https://BlobStorageLocation>/<FolderName>/part-109.parquet 6,338,148 {}

捕获创建时间

添加了一种路径格式以记录创建时间。

.list blobs (
    "https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
Suffix="parquet"
MaxFiles=10
PathFormat=("output/03/Year=" datetime_pattern("yyyy'/Month='MM'/Day='dd", creationTime) "/")

输出

BlobUri 以字节为单位的大小 捕获的变量
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-100.parquet 7,429,062 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-101.parquet 262,610 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-102.parquet 6,154,166 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-103.parquet 7,460,408 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-104.parquet 6,154,166 {“creationTime”: “03/20/2025 00:00:00”}
<https://>BlobStorageLocation</>FolderName/output/03/Year=2025/Month=03/Day=20/Hour=00/part-105.parquet 7,441,587 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-106.parquet 1,087,425 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-107.parquet 6,238,357 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-108.parquet 7,460,408 {“creationTime”: “03/20/2025 00:00:00”}
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-109.parquet 6,338,148 {“creationTime”: “03/20/2025 00:00:00”}

CapturedVariables 列日期与 BlobUri 列中指定的日期匹配。

引入 20 个文件

现在,从 Azure Blob 存储容器引入 20 个采用 parquet 格式的文件及其创建时间。

.ingest-from-storage-queued into table database('TestDatabase').Logs
EnableTracking=true
with (format='parquet')
<|
    .list blobs (
        "https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
    )
    Suffix="parquet"
    MaxFiles=20
    PathFormat=("output/03/Year=" datetime_pattern("yyyy'/Month='MM'/Day='dd", creationTime) "/")

输出

IngestionOperationId ClientRequestId OperationInfo CancelationInfo
22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555 Kusto.Web.KWE,查询;22223333;22223333;33334444-dddd-4444-eeee-5555eeee5555 .show queued ingestion operations “22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555” .cancel 排队引入作“22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555”

然后使用 OperationInfo跟踪引入状态

跟踪后续引入状态

运行 .show queued ingestion operations 命令来检查此引入是否存在任何问题。

.show queued ingestion operations "22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555"

输出

IngestionOperationId 开始时间 上次更新时间 国家 已发现 进行中 已取消 已引入 已失败 已取消 样本失败原因 数据库
22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555 2025-02-20 14:57:41.0000000 进行中 10 10 0 0 0 TestDatabase 日志

运行 .show extents 命令以检查是否使用早期日期创建区块,以确保数据完整性和历史准确性。

.show table Logs extents

MinCreatedOnMaxCreatedOn 值应显示数据创建时间,而不是数据引入时间。 有关这些返回值的详细信息,请参阅 .show extents

如有必要,可以取消引入

执行完整引入

通过对示例运行排队引入命令,你发现了引入可能遇到的问题。 修复它们后,即可引入所有历史数据,并等待完整的引入完成。

取消引入

在引入过程中,可以随时取消排队引入。

.cancel queued ingestion operation '22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555'

输出

IngestionOperationId 开始时间 上次更新时间 国家 已发现 待处理 已取消 已引入 已失败 样本失败原因 数据库
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 2025-03-20 15:03:11.0000000 已取消 10 10 0 0 0 TestDatabase 日志

然后可以回滚引入、修复问题并重新运行引入。