使用逻辑应用将数据从 Log Analytics 工作区导出到存储帐户

本文介绍一种使用 Azure 逻辑应用从 Azure Monitor 中的 Log Analytics 工作区查询数据并将其发送到 Azure 存储的方法。 如果需要导出 Azure Monitor 日志数据用于审核和合规性场景或允许其他服务检索此数据,请使用此过程。

其他导出方法

本文讨论的方法介绍了使用逻辑应用从日志查询进行计划导出。 针对特定场景导出数据的其他选项包括:

概述

此过程使用 Azure Monitor 日志连接器,它可让你从逻辑应用运行日志查询,并将其输出用于工作流中的其他操作。 此过程中使用 Azure Blob 存储连接器将查询输出发送到存储。

Screenshot that shows a Logic Apps overview.

从 Log Analytics 工作区导出数据时,请限制逻辑应用工作流处理的数据量。 筛选和聚合查询中的日志数据,以减少所需数据。 例如,如需导出登录事件,应筛选所需事件并仅投影必填字段。 例如:

SecurityEvent
| where EventID == 4624 or EventID == 4625
| project TimeGenerated , Account , AccountType , Computer

按计划导出数据时,请在查询中使用 ingestion_time() 函数,确保不会错过延迟到达的数据。 如果数据由于网络或平台问题而延迟,使用引入时间可确保将数据包含在下一次逻辑应用执行中。 有关示例,请参阅逻辑应用过程部分中的“添加 Azure Monitor 日志操作”步骤。

先决条件

在开始此过程之前,必须完成以下先决条件:

  • Log Analytics 工作区:创建逻辑应用的用户必须至少具有工作区的读取权限。
  • 存储帐户:存储帐户不必与 Log Analytics 工作区位于同一订阅中。 创建逻辑应用的用户必须具有存储帐户的写入权限。

连接器限制

Azure Monitor 中的 Log Analytics 工作区和日志查询均为多租户服务,其中具有可保护和隔离客户以及保持服务质量的限制。 查询大量数据时,请考虑以下限制,这些限制可能会影响配置逻辑应用定期和日志查询的方式:

  • 日志查询返回的行数不能超过 500,000。
  • 日志查询返回的字节数不能超过 64,000,000。
  • 日志查询的运行时间不能超过 10 分钟。
  • Log Analytics 连接器限制为每分钟 100 个调用。

逻辑应用过程

以下部分将引导你完成该过程。

在存储帐户中创建容器

使用创建容器中的过程将容器添加到存储帐户中,以保存导出的数据。 本文中使用的容器名称是 loganalytics-data,但是你可以使用任何名称。

创建逻辑应用工作流

  1. 在 Azure 门户中转到“逻辑应用”,然后选择“添加”。 请选择“订阅”、“资源组”和“区域”来存储新的逻辑应用。 然后,为它指定一个唯一名称。 可以打开“Log Analytics”设置来收集有关运行时数据和事件的信息,如设置 Azure Monitor 日志和收集 Azure 逻辑应用的诊断数据中所述。 使用 Azure Monitor 日志连接器不需要此设置。

    Screenshot that shows creating a logic app.

  2. 选择“查看 + 创建”,然后选择“创建”。 部署完成后,请选择“转到资源”来打开“逻辑应用设计器”。

为工作流创建触发器

在“从通用触发器开始”下,选择“重复”。 此设置会创建一个按固定间隔自动运行的逻辑应用工作流。 在操作的“频率”框中,请选择“日”。 在“间隔”框中,输入“1”即可每天运行一次工作流。

Screenshot that shows a Recurrence action.

添加 Azure Monitor 日志操作

通过 Azure Monitor 日志操作,你可以指定要运行的查询。 此示例中使用的日志查询针对每小时定期进行了优化。 它收集在特定执行时间引入的数据。 例如,如果工作流在 4:35 运行,则时间范围为 3:00-4:00。 如果将逻辑应用更改为以其他频率运行,那么也需要更改查询。 例如,如果将定期设置为每天运行,请将查询中的 startTime 设置为 startofday(make_datetime(year,month,day,0,0))

系统将提示你选择一个租户,从而向工作流将用其来运行查询的帐户授予对 Log Analytics 工作区的访问权限。

  1. 选择“+ 新建步骤”,添加在完成重复操作后运行的操作。 在“选择操作”下,输入“azure monitor”。 然后,选择“Azure Monitor 日志”。

    Screenshot that shows an Azure Monitor Logs action.

  2. 选择“Azure Log Analytics - 运行查询并列出结果”。

    Screenshot that shows Azure Monitor Logs is highlighted under Choose an action.

  3. 选择 Log Analytics 工作区的“订阅”和“资源组”。 为“资源类型”选择“Log Analytics 工作区”。 然后,在“资源名称”下选择工作区名称。

  4. 将以下日志查询添加到“查询”窗口中:

    let dt = now();
    let year = datetime_part('year', dt);
    let month = datetime_part('month', dt);
    let day = datetime_part('day', dt);
     let hour = datetime_part('hour', dt);
    let startTime = make_datetime(year,month,day,hour,0)-1h;
    let endTime = startTime + 1h - 1tick;
    AzureActivity
    | where ingestion_time() between(startTime .. endTime)
    | project 
        TimeGenerated,
        BlobTime = startTime, 
        OperationName ,
        OperationNameValue ,
        Level ,
        ActivityStatus ,
        ResourceGroup ,
        SubscriptionId ,
        Category ,
        EventSubmissionTimestamp ,
        ClientIpAddress = parse_json(HTTPRequest).clientIpAddress ,
        ResourceId = _ResourceId 
    
  5. 时间范围根据 TimeGenerated 列来指定将包含在查询中的记录 。 值应大于在查询中选择的时间范围。 由于此查询未使用 TimeGenerated 列,因此“在查询中设置”选项不可用。 有关时间范围的详细信息,请参阅查询范围。 对于“时间范围”,选择“过去 4 小时” 。 此设置可确保引入时间大于 TimeGenerated 的任何记录都将包含在结果中。

    Screenshot that shows the settings for the new Azure Monitor Logs action named Run query and visualize results.

(可选)添加分析 JSON 操作

“运行查询并列出结果”操作的输出采用 JSON 格式。 可在准备进行“撰写”操作的过程中分析此数据并对其进行操作。

可以提供一个 JSON 架构用于描述预期要接收的有效负载。 设计器使用此架构分析 JSON 内容,并生成用户友好的标记来表示 JSON 内容中的属性。 然后,你可以在整个逻辑应用工作流中轻松引用和使用这些属性。

可以使用“运行查询并列出结果”步骤中的示例输出。

  1. 在逻辑应用功能区中选择“运行触发器”。 然后,选择“运行”,下载并保存输出记录。 对于上一步中的示例查询,可以使用以下示例输出:

    {
        "TimeGenerated": "2020-09-29T23:11:02.578Z",
        "BlobTime": "2020-09-29T23:00:00Z",
        "OperationName": "Returns Storage Account SAS Token",
        "OperationNameValue": "MICROSOFT.RESOURCES/DEPLOYMENTS/WRITE",
        "Level": "Informational",
        "ActivityStatus": "Started",
        "ResourceGroup": "monitoring",
        "SubscriptionId": "00000000-0000-0000-0000-000000000000",
        "Category": "Administrative",
        "EventSubmissionTimestamp": "2020-09-29T23:11:02Z",
        "ClientIpAddress": "192.168.1.100",
        "ResourceId": "/subscriptions/00000000-0000-0000-0000-000000000000/resourcegroups/monitoring/providers/microsoft.storage/storageaccounts/my-storage-account"
    }
    
  2. 选择“+ 新建步骤”,然后选择“+ 添加操作”。 在“选择操作”下,输入“json”,然后选择“分析 JSON”。

    Screenshot that shows selecting a Parse JSON operator.

  3. 选择“内容”框即可显示以前活动的值的列表。 从“运行查询并列出结果”操作中选择“正文” 。 此输出来自日志查询。

    Screenshot that shows selecting a Body.

  4. 复制之前保存的示例记录。 选择“使用示例有效负载生成架构”并粘贴它。

    Screenshot that shows parsing a JSON payload.

添加“撰写”操作

“撰写”操作获取已分析的 JSON 输出,并创建需要存储在 blob 中的对象。

  1. 选择“+ 新建步骤”,然后选择“+ 添加操作”。 在“选择操作”下,输入“撰写”。 然后,选择“撰写”操作。

    Screenshot that shows selecting a Compose action.

  2. 选择“输入”框,显示以前活动的值的列表。 从“分析 JSON”操作中选择“正文” 。 这个已分析的输出来自日志查询。

    Screenshot that shows selecting a body for a Compose action.

添加“创建 Blob”操作

“创建 Blob”操作会将撰写的 JSON 写入存储。

  1. 选择“+ 新建步骤”,然后选择“+ 添加操作”。 在“选择操作”下,输入“Blob”。 然后,选择“创建 Blob”操作。

    Screenshot that shows selecting the Create Blob action.

  2. 在“连接名称”中输入连接到存储帐户的名称。 然后,选择“文件夹路径”框中的文件夹图标,从而选择存储帐户中的容器。 选择“Blob 名称”,查看以前活动的值的列表。 选择“表达式”,然后输入与你的时间间隔匹配的表达式。 对于每小时运行一次的该查询,以下表达式将设置每个前一小时的 Blob 名称:

    subtractFromTime(formatDateTime(utcNow(),'yyyy-MM-ddTHH:00:00'), 1,'Hour')
    

    Screenshot that shows a blob expression.

  3. 选择“Blob 内容”框可显示以前活动的值的列表。 然后,在“撰写”部分中选择“输出”。

    Screenshot that shows creating a blob expression.

测试工作流

若要测试工作流,请选择“运行”。 如果工作流存在错误,会在有问题的步骤中进行指示。 可以查看执行并钻取到每个步骤,通过查看输入和输出来进行故障调查。 如有必要,请参阅在 Azure 逻辑应用中排查和诊断工作流故障

Screenshot that shows Runs history.

查看存储中的日志

转到 Azure 门户中的“存储帐户”菜单,然后选择你的存储帐户。 选择“Blob”磁贴。 然后,选择在“创建 Blob”操作中指定的容器。 选择一个 Blob,然后选择“编辑 Blob”。

Screenshot that shows blob data.

逻辑应用模板

模板中不包含可选的分析 JSON 步骤

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Compose": {
                "inputs": "@body('Run_query_and_list_results')",
                "runAfter": {
                    "Run_query_and_list_results": [
                        "Succeeded"
                    ]
                },
                "type": "Compose"
            },
            "Create_blob_(V2)": {
                "inputs": {
                    "body": "@outputs('Compose')",
                    "headers": {
                        "ReadFileMetadataFromServer": true
                    },
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['azureblob']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/v2/datasets/@{encodeURIComponent(encodeURIComponent('AccountNameFromSettings'))}/files",
                    "queries": {
                        "folderPath": "/logicappexport",
                        "name": "@{utcNow()}",
                        "queryParametersSingleEncoded": true
                    }
                },
                "runAfter": {
                    "Compose": [
                        "Succeeded"
                    ]
                },
                "runtimeConfiguration": {
                    "contentTransfer": {
                        "transferMode": "Chunked"
                    }
                },
                "type": "ApiConnection"
            },
            "Run_query_and_list_results": {
                "inputs": {
                    "body": "let dt = now();\nlet year = datetime_part('year', dt);\nlet month = datetime_part('month', dt);\nlet day = datetime_part('day', dt);\n let hour = datetime_part('hour', dt);\nlet startTime = make_datetime(year,month,day,hour,0)-1h;\nlet endTime = startTime + 1h - 1tick;\nAzureActivity\n| where ingestion_time() between(startTime .. endTime)\n| project \n    TimeGenerated,\n    BlobTime = startTime, \n    OperationName ,\n    OperationNameValue ,\n    Level ,\n    ActivityStatus ,\n    ResourceGroup ,\n    SubscriptionId ,\n    Category ,\n    EventSubmissionTimestamp ,\n    ClientIpAddress = parse_json(HTTPRequest).clientIpAddress ,\n    ResourceId = _ResourceId ",
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['azuremonitorlogs']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/queryData",
                    "queries": {
                        "resourcegroups": "resource-group-name",
                        "resourcename": "workspace-name",
                        "resourcetype": "Log Analytics Workspace",
                        "subscriptions": "workspace-subscription-id",
                        "timerange": "Set in query"
                    }
                },
                "runAfter": {},
                "type": "ApiConnection"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            }
        },
        "triggers": {
            "Recurrence": {
                "evaluatedRecurrence": {
                    "frequency": "Day",
                    "interval": 1
                },
                "recurrence": {
                    "frequency": "Day",
                    "interval": 1
                },
                "type": "Recurrence"
            }
        }
    },
    "parameters": {
        "$connections": {
            "value": {
                "azureblob": {
                    "connectionId": "/subscriptions/logic-app-subscription-id/resourceGroups/logic-app-resource-group-name/providers/Microsoft.Web/connections/blob-connection-name",
                    "connectionName": "blob-connection-name",
                    "id": "/subscriptions/logic-app-subscription-id/providers/Microsoft.Web/locations/canadacentral/managedApis/azureblob"
                },
                "azuremonitorlogs": {
                    "connectionId": "/subscriptions/blob-connection-name/resourceGroups/logic-app-resource-group-name/providers/Microsoft.Web/connections/azure-monitor-logs-connection-name",
                    "connectionName": "azure-monitor-logs-connection-name",
                    "id": "/subscriptions/blob-connection-name/providers/Microsoft.Web/locations/canadacentral/managedApis/azuremonitorlogs"
                }
            }
        }
    }
}

后续步骤