本文介绍如何将 JSON 格式的数据引入 Azure 数据资源管理器数据库。This article shows you how to ingest JSON formatted data into an Azure Data Explorer database.首先你将引入简单的原始映射 JSON 示例,再引入多行 JSON,然后处理包含数组和字典的更复杂 JSON 架构。You'll start with simple examples of raw and mapped JSON, continue to multi-lined JSON, and then tackle more complex JSON schemas containing arrays and dictionaries.这些示例详细演示了使用 Kusto 查询语言 (KQL)、C# 或 Python 引入 JSON 格式数据的过程。The examples detail the process of ingesting JSON formatted data using Kusto query language (KQL), C#, or Python.Kusto 查询语言 ingest 控制命令是直接对引擎终结点执行的。The Kusto query language ingest control commands are executed directly to the engine endpoint.在生产方案中,引入是使用客户端库或数据连接对数据管理服务执行的。In production scenarios, ingestion is executed to the Data Management service using client libraries or data connections.有关使用这些客户端库引入数据的演练,请阅读使用 Azure 数据资源管理器 Python 库引入数据和使用 Azure 数据资源管理器 .NET Standard SDK 引入数据。Read Ingest data using the Azure Data Explorer Python library and Ingest data using the Azure Data Explorer .NET Standard SDK for a walk-through regarding ingesting data with these client libraries.
Azure 数据资源管理器支持两种 JSON 文件格式:Azure Data Explorer supports two JSON file formats:
json:行分隔的 JSON。json: Line separated JSON.输入数据中的每一行只包含一条 JSON 记录。Each line in the input data has exactly one JSON record.
multijson:多行 JSON。multijson: Multi-lined JSON.分析器将忽略行分隔符,并读取从前一位置到有效 JSON 末尾的一条记录。The parser ignores the line separators and reads a record from the previous position to the end of a valid JSON.
引入和映射 JSON 格式的数据Ingest and map JSON formatted data
引入 JSON 格式 的数据需要使用 引入属性指定格式。Ingestion of JSON formatted data requires you to specify the format using ingestion property.引入 JSON 数据需要执行映射,以将 JSON 源条目映射到其目标列。Ingestion of JSON data requires mapping, which maps a JSON source entry to its target column.引入数据时,将 IngestionMapping 属性与其 ingestionMappingReference(用于预定义的映射)引入属性或其 IngestionMappings 属性结合使用。When ingesting data, use the IngestionMapping property with its ingestionMappingReference (for a pre-defined mapping) ingestion property or its IngestionMappings property.本文将使用 ingestionMappingReference 引入属性,该属性是在用于引入的表中预定义的。This article will use the ingestionMappingReference ingestion property, which is pre-defined on the table used for ingestion.以下示例首先将 JSON 记录作为原始数据引入到包含单个列的表中。In the examples below, we'll start by ingesting JSON records as raw data to a single column table.接下来,使用映射将每个属性引入到其映射列中。Then we'll use the mapping to ingest each property to its mapped column.
简单 JSON 示例Simple JSON example
以下示例是采用平面结构的简单 JSON。The following example is a simple JSON, with a flat structure.数据包含多个设备收集的温度和湿度信息。The data has temperature and humidity information, collected by several devices.每条记录已使用 ID 和时间戳进行标记。Each record is marked with an ID and timestamp.
此示例将 JSON 记录作为原始数据引入到包含单个列的表中。In this example, you ingest JSON records as raw data to a single column table.使用查询和更新策略的数据操作是在引入数据后执行的。The data manipulation, using queries, and update policy is done after the data is ingested.
在“添加群集”对话框中,以 https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/ 格式输入群集 URL,然后选择“添加”。In the Add cluster dialog box, enter your cluster URL in the form https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/, then select Add .
粘贴以下命令,然后选择“运行”以创建表。Paste in the following command, and select Run to create the table.
.create table RawEvents (Event: dynamic)
此查询将创建一个表,其中包含单个动态数据类型的 Event 列。This query creates a table with a single Event column of a dynamic data type.
此命令创建一个映射,以将 JSON 根路径 $ 映射到 Event 列。This command creates a mapping, and maps the JSON root path $ to the Event column.
将数据引入 RawEvents 表中。Ingest data into the RawEvents table.
.ingest into table RawEvents h'https://kustosamplefiles.blob.core.chinacloudapi.cn/jsonsamplefiles/simple.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D' with (format=json, jsonMappingReference=RawEventMapping)
使用 C# 引入原始 JSON 格式的数据。Use C# to ingest data in raw JSON format.
创建 RawEvents 表。Create the RawEvents table.
var kustoUri = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/";
var kustoConnectionStringBuilder =
new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
InitialCatalog = database,
UserID = user,
Password = password,
Authority = tenantId
};
var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder);
var table = "RawEvents";
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("Events", "System.Object"),
});
kustoClient.ExecuteControlCommand(command);
创建 JSON 映射。Create the JSON mapping.
var tableMapping = "RawEventMapping";
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Json,
tableName,
tableMapping,
new[] {
new ColumnMapping {ColumnName = "Events", Properties = new Dictionary<string, string>() {
{"path","$"} }
} });
kustoClient.ExecuteControlCommand(command);
此命令创建一个映射,以将 JSON 根路径 $ 映射到 Event 列。This command creates a mapping, and maps the JSON root path $ to the Event column.
将数据引入 RawEvents 表中。Ingest data into the RawEvents table.
var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/";
var blobPath = "https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/simple.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
var ingestConnectionStringBuilder =
new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
InitialCatalog = database,
UserID = user,
Password = password,
Authority = tenantId
};
var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties =
new KustoQueuedIngestionProperties(database, table)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping()
{
IngestionMappingReference = tableMapping
}
};
ingestClient.IngestFromStorageAsync(blobPath, properties);
备注
数据是根据批处理策略聚合的,因此会出现几分钟的延迟。Data is aggregated according to batching policy, resulting in a latency of a few minutes.
使用 Python 引入原始 JSON 格式的数据。Use Python to ingest data in raw JSON format.
创建一个新表,该表采用类似于 JSON 输入数据的架构。Create a new table, with a similar schema to the JSON input data.我们将对下面的所有示例和引入命令使用此表。We'll use this table for all the following examples and ingest commands.
在此映射中,根据表架构的定义,timestamp 条目将作为 datetime 数据类型引入到 Time 列。In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.
将数据引入 Events 表中。Ingest data into the Events table.
.ingest into table Events h'https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/simple.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D' with (format=json, jsonMappingReference=FlatEventMapping)
文件“simple.json”包含几条行分隔的 JSON 记录。The file 'simple.json' has a few line-separated JSON records.格式为 json,在引入命令中使用的映射是创建的 FlatEventMapping。The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
创建一个新表,该表采用类似于 JSON 输入数据的架构。Create a new table, with a similar schema to the JSON input data.我们将对下面的所有示例和引入命令使用此表。We'll use this table for all the following examples and ingest commands.
var tableMapping = "FlatEventMapping";
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Json,
"",
tableMapping,
new[]
{
new ColumnMapping() {ColumnName = "Time", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.timestamp"} } },
new ColumnMapping() {ColumnName = "Device", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.deviceId" } } },
new ColumnMapping() {ColumnName = "MessageId", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.messageId" } } },
new ColumnMapping() {ColumnName = "Temperature", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.temperature" } } },
new ColumnMapping() { ColumnName= "Humidity", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.humidity" } } },
});
kustoClient.ExecuteControlCommand(command);
在此映射中,根据表架构的定义,timestamp 条目将作为 datetime 数据类型引入到 Time 列。In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.
将数据引入 Events 表中。Ingest data into the Events table.
var blobPath = "https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/simple.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
var properties =
new KustoQueuedIngestionProperties(database, table)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping()
{
IngestionMappingReference = tableMapping
}
};
ingestClient.IngestFromStorageAsync(blobPath, properties);
文件“simple.json”包含几条行分隔的 JSON 记录。The file 'simple.json' has a few line-separated JSON records.格式为 json,在引入命令中使用的映射是创建的 FlatEventMapping。The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
创建一个新表,该表采用类似于 JSON 输入数据的架构。Create a new table, with a similar schema to the JSON input data.我们将对下面的所有示例和引入命令使用此表。We'll use this table for all the following examples and ingest commands.
文件“simple.json”包含几条行分隔的 JSON 记录。The file 'simple.json' has a few line separated JSON records.格式为 json,在引入命令中使用的映射是创建的 FlatEventMapping。The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.
引入多行 JSON 记录Ingest multi-lined JSON records
此示例引入多行 JSON 记录。In this example, you ingest multi-lined JSON records.每个 JSON 属性映射到表中的单个列。Each JSON property is mapped to a single column in the table.文件“multilined.json”包含几条缩进的 JSON 记录。The file 'multilined.json' has a few indented JSON records.格式 multijson 告知引擎按 JSON 结构读取记录。The format multijson tells the engine to read records by the JSON structure.
将数据引入 Events 表中。Ingest data into the Events table.
.ingest into table Events (h'https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/multilined.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
将数据引入 Events 表中。Ingest data into the Events table.
var tableMapping = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/multilined.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
var properties =
new KustoQueuedIngestionProperties(database, table)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping()
{
IngestionMappingReference = tableMapping
}
};
ingestClient.IngestFromStorageAsync(blobPath, properties);
将数据引入 Events 表中。Ingest data into the Events table.
引入包含数组的 JSON 记录Ingest JSON records containing arrays
数组数据类型是按顺序排列的值集合。Array data types are an ordered collection of values.JSON 数组的引入由更新策略来完成。Ingestion of a JSON array is done by an update policy.JSON 将按原样引入到中间表。The JSON is ingested as-is to an intermediate table.更新策略针对 RawEvents 表运行某个预定义的函数,并将结果重新引入到目标表。An update policy runs a pre-defined function on the RawEvents table, reingesting the results to the target table.我们将引入采用以下结构的数据:We will ingest data with the following structure:
使用 mv-expand 运算符创建一个 update policy 函数用于扩展 records 的集合,使集合中的每个值收到一个单独的行。Create an update policy function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator.我们将使用表 RawEvents 作为源表,使用 Events 作为目标表。We'll use table RawEvents as a source table and Events as a target table.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
该函数收到的架构必须与目标表的架构相匹配。The schema received by the function must match the schema of the target table.使用 getschema 运算符检查架构。Use getschema operator to review the schema.
EventRecordsExpand() | getschema
将更新策略添加到目标表。Add the update policy to the target table.此策略将自动对 RawEvents 中间表中的任何新引入数据运行查询,并将结果引入到 Events 表中。This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest the results into the Events table.定义零保留期策略,以避免持久保存中间表。Define a zero-retention policy to avoid persisting the intermediate table.
将数据引入 RawEvents 表中。Ingest data into the RawEvents table.
.ingest into table RawEvents (h'https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/array.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
检查 Events 表中的数据。Review data in the Events table.
Events
使用 mv-expand 运算符创建一个 update 函数用于扩展 records 的集合,使集合中的每个值收到一个单独的行。Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator.我们将使用表 RawEvents 作为源表,使用 Events 作为目标表。We'll use table RawEvents as a source table and Events as a target table.
var command =
CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false);
kustoClient.ExecuteControlCommand(command);
备注
该函数收到的架构必须与目标表的架构相匹配。The schema received by the function must match the schema of the target table.
将更新策略添加到目标表。Add the update policy to the target table.此策略将针对 RawEvents 中间表中任何新引入数据自动运行查询,并将查询结果引入到 Events 表中。This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table.定义零保留期策略,以避免持久保存中间表。Define a zero-retention policy to avoid persisting the intermediate table.
将数据引入 RawEvents 表中。Ingest data into the RawEvents table.
var table = "RawEvents";
var tableMapping = "RawEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/array.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
var properties =
new KustoQueuedIngestionProperties(database, table)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping()
{
IngestionMappingReference = tableMapping
}
};
ingestClient.IngestFromStorageAsync(blobPath, properties);
检查 Events 表中的数据。Review data in the Events table.
使用 mv-expand 运算符创建一个 update 函数用于扩展 records 的集合,使集合中的每个值收到一个单独的行。Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator.我们将使用表 RawEvents 作为源表,使用 Events 作为目标表。We'll use table RawEvents as a source table and Events as a target table.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
备注
该函数收到的架构必须与目标表的架构相匹配。The schema received by the function has to match the schema of the target table.
将更新策略添加到目标表。Add the update policy to the target table.此策略将针对 RawEvents 中间表中任何新引入数据自动运行查询,并将查询结果引入到 Events 表中。This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table.定义零保留期策略,以避免持久保存中间表。Define a zero-retention policy to avoid persisting the intermediate table.
引入包含字典的 JSON 记录Ingest JSON records containing dictionaries
字典结构化 JSON 包含键值对。Dictionary structured JSON contains key-value pairs.JSON 记录使用 JsonPath 中的逻辑表达式经历引入映射过程。Json records undergo ingestion mapping using logical expression in the JsonPath.可以引入采用以下结构的数据:You can ingest data with the following structure:
将数据引入 Events 表中。Ingest data into the Events table.
.ingest into table Events h'https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/dictionary.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D' with (format=multijson, jsonMappingReference=KeyValueEventMapping)
创建 JSON 映射。Create a JSON mapping.
var tableName = "Events";
var tableMapping = "KeyValueEventMapping";
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Json,
"",
tableMapping,
new[]
{
new ColumnMapping() { ColumnName = "Time", Properties = new Dictionary<string, string>() { {
MappingConsts.Path,
"$.event[?(@.Key == 'timestamp')]"
} } },
new ColumnMapping() { ColumnName = "Device", Properties = new Dictionary<string, string>() { {
MappingConsts.Path,
"$.event[?(@.Key == 'deviceId')]"
} } }, new ColumnMapping() { ColumnName = "MessageId", Properties = new Dictionary<string, string>() { {
MappingConsts.Path,
"$.event[?(@.Key == 'messageId')]"
} } }, new ColumnMapping() { ColumnName = "Temperature", Properties = new Dictionary<string, string>() { {
MappingConsts.Path,
"$.event[?(@.Key == 'temperature')]"
} } }, new ColumnMapping() { ColumnName = "Humidity", Properties = new Dictionary<string, string>() { {
MappingConsts.Path,
"$.event[?(@.Key == 'humidity')]"
} } },
});
kustoClient.ExecuteControlCommand(command);
将数据引入 Events 表中。Ingest data into the Events table.
var blobPath = "https://kustosamplefiles.blob.core.chinaclouapi.cn/jsonsamplefiles/dictionary.json?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
var properties =
new KustoQueuedIngestionProperties(database, table)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping()
{
IngestionMappingReference = tableMapping
}
};
ingestClient.IngestFromStorageAsync(blobPath, properties);