使用输入和输出绑定将 Azure Functions 与 Azure 数据资源管理器集成(预览版)
重要
此连接器可用于 Microsoft Fabric 中的实时智能。 使用本文中的说明时,请注意以下例外情况:
- 如果需要,请按照创建 KQL 数据库中的说明创建数据库。
- 如果需要,请按照创建空表中的说明创建表。
- 使用复制 URI 中的说明获取查询或引入 URI。
- 运行 KQL 查询集中的查询。
借助 Azure Functions,可以按计划或以响应某个事件的方式在云中运行无服务器代码。 借助适用于 Azure Functions 的 Azure 数据资源管理器输入和输出绑定,可以将 Azure 数据资源管理器集成到工作流中,以引入数据并针对群集运行查询。
先决条件
尝试与示例项目集成
如何使用适用于 Azure Functions 的 Azure 数据资源管理器绑定
有关如何使用适用于 Azure Functions 的 Azure 数据资源管理器绑定的信息,请参阅以下主题:
- 适用于 Azure Functions 的 Azure 数据资源管理器绑定概述
- 适用于 Azure Functions 的 Azure 数据资源管理器输入绑定
- 适用于 Azure Functions 的 Azure 数据资源管理器输出绑定
使用适用于 Azure Functions 的 Azure 数据资源管理器绑定的方案
以下部分介绍一些使用适用于 Azure Functions 的 Azure 数据资源管理器绑定的常见方案。
输入绑定
输入绑定运行 Kusto 查询语言 (KQL) 查询或 KQL 函数(可以选择带有参数),并将输出返回给函数。
以下部分介绍如何在一些常见方案中使用输入绑定。
方案 1:用于从群集查询数据的 HTTP 终结点
使用输入绑定适用于需要通过 REST API 公开 Azure 数据资源管理器数据的情况。 在此方案中,使用 Azure Functions HTTP 触发器来查询群集中的数据。 在需要为外部应用程序或服务提供对 Azure 数据资源管理器数据的编程访问的情况下,此方案特别有用。 通过 REST API 公开数据,无需直接连接到群集,应用程序即可轻松使用数据。
该代码定义具有 HTTP 触发器和 Azure 数据资源管理器输入绑定的函数。 输入绑定指定要针对 productsdb 数据库中的 Products 表运行的查询。 该函数将 productId 列用作谓词,而此谓词作为参数进行传递。
{
[FunctionName("GetProduct")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
HttpRequest req,
[Kusto(Database:"productsdb" ,
KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
KqlParameters = "@productId={productId}",
Connection = "KustoConnectionString")]
IAsyncEnumerable<Product> products)
{
IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
var productList = new List<Product>();
while (await enumerator.MoveNextAsync())
{
productList.Add(enumerator.Current);
}
await enumerator.DisposeAsync();
return new OkObjectResult(productList);
}
}
然后可以如下所示调用该函数:
curl https://myfunctionapp.chinacloudsites.cn/api/getproducts/1
方案 2:用于从群集导出数据的计划触发器
以下方案适用于需要按基于时间的计划导出数据的情况。
该代码定义具有计时器触发器的函数,用于将 productsdb 数据库中的销售数据聚合导出到 Azure Blob 存储中的 CSV 文件。
public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
[Kusto(ConnectionStringSetting = "KustoConnectionString",
DatabaseName = "productsdb",
Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
// Write the query results to a CSV file
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
csv.WriteRecords(queryResults);
writer.Flush();
stream.Position = 0;
await outputBlob.UploadFromStreamAsync(stream);
}
}
输出绑定
输出绑定采用一行或多行,并将其插入 Azure 数据资源管理器表。
以下部分介绍如何在一些常见方案中使用输出绑定。
方案 1:用于将数据引入群集的 HTTP 终结点
以下方案适用于需要处理传入 HTTP 请求并将其引入群集的情况。 通过使用输出绑定,可以将来自请求的传入数据写入 Azure 数据资源管理器表。
该代码定义具有 HTTP 触发器和 Azure 数据资源管理器输出绑定的函数。 此函数采用 HTTP 请求正文中的 JSON 有效负载,并将其写入 productsdb 数据库中的 products 表。
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
HttpRequest req, ILogger log,
[Kusto(Database:"productsdb" ,
TableName ="products" ,
Connection = "KustoConnectionString")] out Product product)
{
log.LogInformation($"AddProduct function started");
string body = new StreamReader(req.Body).ReadToEnd();
product = JsonConvert.DeserializeObject<Product>(body);
string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
product.Name, product.ProductID, product.Cost);
log.LogInformation("Ingested product {}", productString);
return new CreatedResult($"/api/addproductuni", product);
}
然后可以如下所示调用该函数:
curl -X POST https://myfunctionapp.chinacloudsites.cn/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'
方案 2:从 RabbitMQ 或 Azure 上支持的其他消息系统引入数据
以下方案适用于需要将来自消息系统的数据引入群集的情况。 通过使用输出绑定,可以将来自消息系统的传入数据引入 Azure 数据资源管理器表。
该代码定义具有消息(JSON 格式的数据,通过 RabbitMQ 触发器传入,引入到 productsdb 数据库的 products 表)的函数。
public class QueueTrigger
{
[FunctionName("QueueTriggerBinding")]
[return: Kusto(Database: "productsdb",
TableName = "products",
Connection = "KustoConnectionString")]
public static Product Run(
[RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
ILogger log)
{
log.LogInformation($"Dequeued product {product.ProductID}");
return product;
}
}
有关函数的详细信息,请参阅 Azure Functions 文档。 Azure 数据资源管理器扩展位于: