适用于:✅Azure 数据资源管理器
排队引入命令允许你在引入历史数据之前测试数据引入过程并修复问题。 本文介绍如何使用队列摄取命令优化历史数据摄取。 执行以下步骤以优化历史数据排队处理:
注释
排队引入命令在数据引入 URI 终结点 https://ingest-<YourClusterName><Region>.kusto.chinacloudapi.cn
上运行。
列出文件夹中的 Blob
为了更好地了解历史数据,您最多从 Azure Blob 存储容器中列出 10 个 Blob。
.list blobs (
"https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
MaxFiles=10
输出
BlobUri | 以字节为单位的大小 | 捕获的变量 |
---|---|---|
<https:// BlobStorageLocation>/<FolderName>/part-100.parquet | 7,429,062 | {} |
<https://<BlobStorageLocation</>FolderName/part-101.parquet | 262,610 | {} |
<https://>BlobStorageLocation</>FolderName/part-102.parquet | 6,154,166 | {} |
<https://>BlobStorageLocation</>FolderName/part-103.parquet | 7,460,408 | {} |
<https:// BlobStorageLocation>/<FolderName>/part-104.parquet | 6,154,166 | {} |
<https://BlobStorageLocation>/<FolderName>/part-105.parquet | 7,441,587 | {} |
<https://Blob存储位置>/<文件夹名称>/part-106.parquet | 1,087,425 | {} |
<https://<BlobStorageLocation</>FolderName/part-107.parquet | 6,238,357 | {} |
<https:// BlobStorageLocation>/<FolderName>/part-208.csv | 7,460,408 | {} |
<https://BlobStorageLocation>/<FolderName>/part-109.parquet | 6,338,148 | {} |
现在可以验证 Blob 是否是要引入的正确 Blob。
引入文件夹
接下来,将 10 个 parquet 文件排队,以引入到 Logs
数据库的 TestDatabase
表中,并为引入启用跟踪。
.ingest-from-storage-queued into table database('TestDatabase').Logs
EnableTracking=true
with (format='parquet')
<|
.list blobs (
"https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
MaxFiles=10
输出
IngestionOperationId | ClientRequestId | OperationInfo |
---|---|---|
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 | Kusto.Web.KWE,查询;11112222;11112222;22223333-bbbb-3333-cccc-4444cccc5555 | .show queued ingestion operations “00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444” |
然后使用包含 OperationInfo
的 IngestionOperationId
来跟踪引入状态。
然后,它 CancelationInfo
包括 IngestionOperationId
该作,用于 取消引入作。
跟踪引入状态
运行 .show queued ingestion operations
命令来检查引入是否已完成,或者是否存在任何错误。
.show queued ingestion operations "00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444"
输出
IngestionOperationId | 开始时间 | 上次更新时间 | 国家 | 已发现 | 进行中 | 已引入 | 已失败 | 已取消 | 样本失败原因 | 数据库 | 表 |
---|---|---|---|---|---|---|---|---|---|---|---|
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 | 2025-03-19 14:57:41.0000000 | 2025-01-10 15:15:04.0000000 | 完成 | 10 | 0 | 10 | 0 | 0 | TestDatabase | 日志 |
如果 State
不是 Completed
,您可以再次运行 .show queued ingestion operations
。 再次运行此命令可以监视引入的 Blob 的数量增加情况,直到 State
更改为 Completed
。 如有必要,还可以取消引入。
筛选要引入的排队文件
检查引入结果后,再次尝试列出要引入的 Blob。 这一次添加了 parquet 后缀,以确保只引入 parquet 文件。
.list blobs (
"https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
Suffix="parquet"
MaxFiles=10
输出
BlobUri | 以字节为单位的大小 | 捕获的变量 |
---|---|---|
<https:// BlobStorageLocation>/<FolderName>/part-100.parquet | 7,429,062 | {} |
<https://<BlobStorageLocation</>FolderName/part-101.parquet | 262,610 | {} |
<https://>BlobStorageLocation</>FolderName/part-102.parquet | 6,154,166 | {} |
<https://>BlobStorageLocation</>FolderName/part-103.parquet | 7,460,408 | {} |
<https:// BlobStorageLocation>/<FolderName>/part-104.parquet | 6,154,166 | {} |
<https://BlobStorageLocation>/<FolderName>/part-105.parquet | 7,441,587 | {} |
<https://Blob存储位置>/<文件夹名称>/part-106.parquet | 1,087,425 | {} |
<https://<BlobStorageLocation</>FolderName/part-107.parquet | 6,238,357 | {} |
<https://<BlobStorageLocation</>FolderName/part-108.parquet | 7,460,408 | {} |
<https://BlobStorageLocation>/<FolderName>/part-109.parquet | 6,338,148 | {} |
捕获创建时间
添加了一种路径格式以记录创建时间。
.list blobs (
"https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
Suffix="parquet"
MaxFiles=10
PathFormat=("output/03/Year=" datetime_pattern("yyyy'/Month='MM'/Day='dd", creationTime) "/")
输出
BlobUri | 以字节为单位的大小 | 捕获的变量 |
---|---|---|
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-100.parquet | 7,429,062 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-101.parquet | 262,610 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-102.parquet | 6,154,166 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-103.parquet | 7,460,408 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-104.parquet | 6,154,166 | {“creationTime”: “03/20/2025 00:00:00”} |
<https://>BlobStorageLocation</>FolderName/output/03/Year=2025/Month=03/Day=20/Hour=00/part-105.parquet | 7,441,587 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-106.parquet | 1,087,425 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-107.parquet | 6,238,357 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-108.parquet | 7,460,408 | {“creationTime”: “03/20/2025 00:00:00”} |
<https:// BlobStorageLocation>/<FolderName>/output/03/Year=2025/Month=03/Day=20/Hour=00/part-109.parquet | 6,338,148 | {“creationTime”: “03/20/2025 00:00:00”} |
CapturedVariables
列日期与 BlobUri
列中指定的日期匹配。
引入 20 个文件
现在,从 Azure Blob 存储容器引入 20 个采用 parquet 格式的文件及其创建时间。
.ingest-from-storage-queued into table database('TestDatabase').Logs
EnableTracking=true
with (format='parquet')
<|
.list blobs (
"https://<BlobStorageLocation>/<FolderName>;managed_identity=system"
)
Suffix="parquet"
MaxFiles=20
PathFormat=("output/03/Year=" datetime_pattern("yyyy'/Month='MM'/Day='dd", creationTime) "/")
输出
IngestionOperationId | ClientRequestId | OperationInfo | CancelationInfo |
---|---|---|---|
22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555 | Kusto.Web.KWE,查询;22223333;22223333;33334444-dddd-4444-eeee-5555eeee5555 | .show queued ingestion operations “22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555” | .cancel 排队引入作“22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555” |
然后使用 OperationInfo
来跟踪引入状态。
跟踪后续引入状态
运行 .show queued ingestion operations
命令来检查此引入是否存在任何问题。
.show queued ingestion operations "22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555"
输出
IngestionOperationId | 开始时间 | 上次更新时间 | 国家 | 已发现 | 进行中 | 已取消 | 已引入 | 已失败 | 已取消 | 样本失败原因 | 数据库 | 表 |
---|---|---|---|---|---|---|---|---|---|---|---|---|
22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555 | 2025-02-20 14:57:41.0000000 | 进行中 | 10 | 10 | 0 | 0 | 0 | TestDatabase | 日志 |
运行 .show extents
命令以检查是否使用早期日期创建区块,以确保数据完整性和历史准确性。
.show table Logs extents
MinCreatedOn
和 MaxCreatedOn
值应显示数据创建时间,而不是数据引入时间。 有关这些返回值的详细信息,请参阅 .show extents。
如有必要,可以取消引入。
执行完整引入
通过对示例运行排队引入命令,你发现了引入可能遇到的问题。 修复它们后,即可引入所有历史数据,并等待完整的引入完成。
取消引入
在引入过程中,可以随时取消排队引入。
.cancel queued ingestion operation '22223333;22223333;11110000-bbbb-2222-cccc-4444dddd5555'
输出
IngestionOperationId | 开始时间 | 上次更新时间 | 国家 | 已发现 | 待处理 | 已取消 | 已引入 | 已失败 | 样本失败原因 | 数据库 | 表 |
---|---|---|---|---|---|---|---|---|---|---|---|
00001111;11112222;00001111-aaaa-2222-bbbb-3333cccc4444 | 2025-03-20 15:03:11.0000000 | 已取消 | 10 | 10 | 0 | 0 | 0 | TestDatabase | 日志 |
然后可以回滚引入、修复问题并重新运行引入。