다음을 통해 공유

使用 Azure API 管理、事件中心和 Moesif 监视您的 API

适用于:所有 API 管理层级

API 管理服务提供许多功能来增强发送到 HTTP API 的 HTTP 请求的处理。 但是,请求和响应都是暂时性存在的。 发出请求,并通过 API 管理服务流向后端 API。 API 将处理该请求,然后将响应返回给 API 使用者。 API 管理服务保留要在 Azure 门户仪表板中显示的有关 API 的一些重要统计信息,但除此之外不显示详细信息。

在 API 管理服务中使用 log-to-eventhub 策略,可将请求和响应的详细信息发送到 Azure 事件中心。 出于多种原因,你可能希望从发送到 API 的 HTTP 消息生成事件。 示例包括更新审核线索、使用情况分析、异常警报和第三方集成。

本文演示如何捕获整个 HTTP 请求和响应消息,将其发送到事件中心,然后将该消息中继到提供 HTTP 日志记录和监视服务的第三方服务。

为什么从 API 管理服务发送?

可以编写可以插入 HTTP API 框架的 HTTP 中间件来捕获 HTTP 请求和响应,并将其馈送到日志记录和监视系统中。 此方法的缺点是 HTTP 中间件需要集成到后端 API 中,并且必须与 API 平台匹配。 如果有多个 API,每个 API 都必须部署中间件。 后端 API 无法升级往往是有原因的。

使用 Azure API 管理服务来与日志记录基础结构集成提供了一个集中的、平台独立的解决方案。 它还可缩放,部分原因是 Azure API 管理的 异地复制 功能。

为什么发送到事件中心?

问:为什么创建特定于 Azure 事件中心的策略是合理的? 你可能想在很多不同的位置记录请求。 为何不能直接将请求发送到最终目标? 这是一个选项。 但是,从 API 管理服务发出日志记录请求时,必须考虑日志记录消息如何影响 API 的性能。 增加系统组件的可用实例或使用异地复制功能,可以处理逐渐增加的负载。 但是,如果日志记录基础结构在低负载状态下开始变慢,则短期的流量高峰可能导致请求延迟。

Azure 事件中心旨在引入大量数据,处理事件的能力远远高于大多数 API 处理的 HTTP 请求数。 在某种意义上,事件中心充当 API 管理服务与基础结构之间的高级缓冲区,可存储和处理消息。 这可以确保 API 性能不受日志记录基础结构的影响。

将数据传递到事件中心后,会留存数据并等待事件中心的使用者进行处理。 事件中心不关注数据的处理方式,只确保成功送达数据。

事件中心可将事件流式传输到多个使用者组。 这样,事件便可由不同的系统处理。 这支持许多集成方案,而不会在 API 管理服务中对 API 请求的处理造成更多延迟,因为只需要生成一个事件。

用于发送应用程序/HTTP 消息的策略

事件中心接受简单字符串形式的事件数据。 该字符串的内容由你确定。 若要打包 HTTP 请求并将其发送到 Azure 事件中心,需要使用请求或响应信息设置字符串的格式。 在这种情况下,如果有可以重复使用的现有格式,则可能不必编写自己的分析代码。 最初,可以考虑使用 HAR 发送 HTTP 请求和响应。 但是,这种格式最适合用于存储 JSON 格式的一连串 HTTP 请求。 其中包含许多必需的元素,使得通过网络传递 HTTP 消息的方案增加了不必要的复杂性。

另一种选择是使用 HTTP 规范 application/http 中所述的媒体类型。 此媒体类型使用与实际用于在网络上传输 HTTP 消息的格式同样的格式,但整个消息可以置于另一个 HTTP 请求的正文中。 在本例中,我们直接使用正文作为发送到事件中心(Event Hubs)的消息。 Microsoft ASP .NET Web API 2.2 客户端库中有一个分析器可以分析此格式并将其转换为本机 HttpRequestMessageHttpResponseMessage 对象,相当方便。

若要创建此消息,需要在 Azure API 管理中使用基于 C# 的策略表达式。 下面是向 Azure 事件中心发送 HTTP 请求消息的策略。

<log-to-eventhub logger-id="myapilogger" partition-id="0">
@{
   var requestLine = string.Format("{0} {1} HTTP/1.1\r\n",
                                               context.Request.Method,
                                               context.Request.Url.Path + context.Request.Url.QueryString);

   var body = context.Request.Body?.As<string>(true);
   if (body != null && body.Length > 1024)
   {
       body = body.Substring(0, 1024);
   }

   var headers = context.Request.Headers
                          .Where(h => h.Key != "Authorization" && h.Key != "Ocp-Apim-Subscription-Key")
                          .Select(h => string.Format("{0}: {1}", h.Key, String.Join(", ", h.Value)))
                          .ToArray<string>();

   var headerString = (headers.Any()) ? string.Join("\r\n", headers) + "\r\n" : string.Empty;

   return "request:"   + context.Variables["message-id"] + "\n"
                       + requestLine + headerString + "\r\n" + body;
}
</log-to-eventhub>

策略声明

此策略表达式有一些需要说明的特性。 该 log-to-eventhub 策略具有一个名为 logger-id 的属性,该属性指代在 API 管理服务中创建的日志记录器的名称。 有关如何在 API 管理服务中设置事件中心记录器的详细信息,请参阅文档 “如何在 Azure API 管理中将事件记录到 Azure 事件中心”。 第二个属性是一个可选参数,指明事件中心要在哪个分区中存储消息。 事件中心使用分区实现可伸缩性,并且需要至少两个分区。 只保证一个分区内的消息依次传递。 如果未指明 Azure 事件中心要在哪个分区中放置消息,它将使用循环算法来分配负载。 但是,这可能会导致某些消息无序处理。

分区

为了确保消息按顺序传送给使用者,并利用分区的负载分布功能,我们可以将 HTTP 请求消息发送到一个分区,并将 HTTP 响应消息发送到第二个分区。 这可确保均匀的负载分布,并可以保证按顺序使用所有请求和所有响应。 响应可以在相应请求之前使用,但这不是问题,因为我们有不同的机制将请求与响应相关联,我们知道请求始终在响应之前出现。

HTTP 有效负载

生成 requestLine后,检查是否应截断请求正文。 请求正文被截断成只有 1024 个字符。 这可以增加;但是,单个事件中心消息限制为 256 KB,因此,某些 HTTP 消息正文可能不适合单个消息。 执行日志记录和分析时,可以仅从 HTTP 请求行和标头派生大量信息。 此外,许多 API 请求仅返回小型正文,因此截断过大的正文所导致的信息价值损失相对较小,相较于为了保留所有正文内容而产生的传输、处理和存储成本的降低来说,这种损失是可以接受的。

处理正文的最后一点说明是,我们需要将true传递给As<string>()方法,因为我们正在读取正文内容,同时也希望允许后端 API 阅读正文。 将 true 传递给此方法后,正文会缓冲,以便进行第二次读取。 如果你有一个上传大型文件的 API 或采用长轮询的方式,这很重要。 在这些情况下,最好避免阅读正文。

HTTP 标头

HTTP 标头可以转换为采用简单键/值对格式的消息格式。 我们选择去除某些安全敏感字段,以避免不必要的凭据信息泄露。 API 密钥和其他凭据不太可能用于分析。 如果想要分析用户及其使用的特定产品,我们可以从 context 对象获取该对象并将其添加到消息中。

消息元数据

创建要发送到事件中心的完整消息时,首行实际上不是 application/http 消息的一部分。 第一行是附加的元数据,其中包括消息是请求消息还是响应消息,以及使响应与请求相互关联的消息 ID。 使用如下所示的另一个策略可以创建消息 ID:

<set-variable name="message-id" value="@(Guid.NewGuid())" />

我们可以创建请求消息,将其存储在变量中,直到返回响应,然后将请求和响应作为单个消息发送。 但是,通过独立发送请求和响应,并使用 message-id 来关联两者,我们可以在消息大小方面获得更大的灵活性,能够在维护消息顺序的同时利用多个分区,并加快请求到达日志记录仪表板的速度。 此外,在某些情况下,从不将有效响应发送到事件中心(可能是由于 API 管理服务中的严重请求错误),但我们仍然有请求记录。

用于发送响应 HTTP 消息的策略看起来与请求非常类似。完整的策略配置如下所示:

<policies>
  <inbound>
      <set-variable name="message-id" value="@(Guid.NewGuid())" />
      <log-to-eventhub logger-id="myapilogger" partition-id="0">
      @{
          var requestLine = string.Format("{0} {1} HTTP/1.1\r\n",
                                                      context.Request.Method,
                                                      context.Request.Url.Path + context.Request.Url.QueryString);

          var body = context.Request.Body?.As<string>(true);
          if (body != null && body.Length > 1024)
          {
              body = body.Substring(0, 1024);
          }

          var headers = context.Request.Headers
                               .Where(h => h.Key != "Authorization" && h.Key != "Ocp-Apim-Subscription-Key")
                               .Select(h => string.Format("{0}: {1}", h.Key, String.Join(", ", h.Value)))
                               .ToArray<string>();

          var headerString = (headers.Any()) ? string.Join("\r\n", headers) + "\r\n" : string.Empty;

          return "request:"   + context.Variables["message-id"] + "\n"
                              + requestLine + headerString + "\r\n" + body;
      }
  </log-to-eventhub>
  </inbound>
  <backend>
      <forward-request follow-redirects="true" />
  </backend>
  <outbound>
      <log-to-eventhub logger-id="myapilogger" partition-id="1">
      @{
          var statusLine = string.Format("HTTP/1.1 {0} {1}\r\n",
                                              context.Response.StatusCode,
                                              context.Response.StatusReason);

          var body = context.Response.Body?.As<string>(true);
          if (body != null && body.Length > 1024)
          {
              body = body.Substring(0, 1024);
          }

          var headers = context.Response.Headers
                                          .Select(h => string.Format("{0}: {1}", h.Key, String.Join(", ", h.Value)))
                                          .ToArray<string>();

          var headerString = (headers.Any()) ? string.Join("\r\n", headers) + "\r\n" : string.Empty;

          return "response:"  + context.Variables["message-id"] + "\n"
                              + statusLine + headerString + "\r\n" + body;
     }
  </log-to-eventhub>
  </outbound>
</policies>

set-variable 策略创建了一个值,该值可供 log-to-eventhub 部分中的策略和 <inbound> 部分访问。

从事件中心接收事件

使用 AMQP 协议可从 Azure 事件中心接收事件。 Microsoft 服务总线团队提供了客户端库来方便使用事件。 支持两种不同的方法:一种是成为直接使用者,另一种是使用 EventProcessorHost 类。 可以在 事件中心示例存储库中找到这两种方法的示例。 差异的简要说明:Direct Consumer 为你提供完全的控制,而 EventProcessorHost 为你处理部分事务,但对你处理这些事件的方式做出某些假设。

EventProcessorHost

在此示例中,我们为简单起见使用 EventProcessorHost,但是对于此特定方案,它可能不是最佳选择。 EventProcessorHost 努力确保用户无需担心特定事件处理器类中发生线程问题。 但是,在我们的方案中,我们将消息转换为另一种格式,并使用异步方法将其传递给另一个服务。 无需更新共享状态,因此没有线程问题的风险。 在大多数情况下,EventProcessorHost 可能是最佳选择,当然也是更方便的选项。

IEventProcessor

使用 EventProcessorHost 时的核心概念是创建包含 IEventProcessor 方法的 ProcessEventAsync 接口的实现。 下面是该方法的本质:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{

    foreach (EventData eventData in messages)
    {
        _Logger.LogInfo(string.Format("Event received from partition: {0} - {1}", context.Lease.PartitionId,eventData.PartitionKey));

        try
        {
            var httpMessage = HttpMessage.Parse(eventData.GetBodyStream());
            await _MessageContentProcessor.ProcessHttpMessage(httpMessage);
        }
        catch (Exception ex)
        {
            _Logger.LogError(ex.Message);
        }
    }
    ... checkpointing code snipped ...
}

EventData 对象的列表被传递到方法中,我们迭代该列表。 每个方法的字节将分析成 HttpMessage 对象,该对象将传递到 IHttpMessageProcessor 的实例。

HttpMessage

HttpMessage 实例包含三个数据片段:

public class HttpMessage
{
    public Guid MessageId { get; set; }
    public bool IsRequest { get; set; }
    public HttpRequestMessage HttpRequestMessage { get; set; }
    public HttpResponseMessage HttpResponseMessage { get; set; }

... parsing code snipped ...

}

HttpMessage 实例包含一个用于将 HTTP 请求连接到相应 HTTP 响应的 MessageId GUID,以及一个用于确定对象是否包含 HttpRequestMessage 和 HttpResponseMessage 实例的布尔值。 通过使用来自System.Net.Http的内置HTTP类,我能够利用application/http中包含的System.Net.Http.Formatting解析代码。

IHttpMessageProcessor

然后,HttpMessage 实例将转发到 IHttpMessageProcessor 的实现,这是为了分离从 Azure 事件中心接收和解释事件以及实际处理事件而创建的接口。

转发 HTTP 消息

对于此示例,我们决定将 HTTP 请求推送到 Moesif API 分析。 Moesif 是一种基于云的服务,专门负责 HTTP 分析和调试。 他们有一个免费层,所以很容易尝试。 Moesif 允许我们实时查看通过 API 管理服务的 HTTP 请求。

IHttpMessageProcessor 实现如下所示:

public class MoesifHttpMessageProcessor : IHttpMessageProcessor
{
    private readonly string RequestTimeName = "MoRequestTime";
    private MoesifApiClient _MoesifClient;
    private ILogger _Logger;
    private string _SessionTokenKey;
    private string _ApiVersion;
    public MoesifHttpMessageProcessor(ILogger logger)
    {
        var appId = Environment.GetEnvironmentVariable("APIMEVENTS-MOESIF-APP-ID", EnvironmentVariableTarget.Process);
        _MoesifClient = new MoesifApiClient(appId);
        _SessionTokenKey = Environment.GetEnvironmentVariable("APIMEVENTS-MOESIF-SESSION-TOKEN", EnvironmentVariableTarget.Process);
        _ApiVersion = Environment.GetEnvironmentVariable("APIMEVENTS-MOESIF-API-VERSION", EnvironmentVariableTarget.Process);
        _Logger = logger;
    }

    public async Task ProcessHttpMessage(HttpMessage message)
    {
        if (message.IsRequest)
        {
            message.HttpRequestMessage.Properties.Add(RequestTimeName, DateTime.UtcNow);
            return;
        }

        EventRequestModel moesifRequest = new EventRequestModel()
        {
            Time = (DateTime) message.HttpRequestMessage.Properties[RequestTimeName],
            Uri = message.HttpRequestMessage.RequestUri.OriginalString,
            Verb = message.HttpRequestMessage.Method.ToString(),
            Headers = ToHeaders(message.HttpRequestMessage.Headers),
            ApiVersion = _ApiVersion,
            IpAddress = null,
            Body = message.HttpRequestMessage.Content != null ? System.Convert.ToBase64String(await message.HttpRequestMessage.Content.ReadAsByteArrayAsync()) : null,
            TransferEncoding = "base64"
        };

        EventResponseModel moesifResponse = new EventResponseModel()
        {
            Time = DateTime.UtcNow,
            Status = (int) message.HttpResponseMessage.StatusCode,
            IpAddress = Environment.MachineName,
            Headers = ToHeaders(message.HttpResponseMessage.Headers),
            Body = message.HttpResponseMessage.Content != null ? System.Convert.ToBase64String(await message.HttpResponseMessage.Content.ReadAsByteArrayAsync()) : null,
            TransferEncoding = "base64"
        };

        Dictionary<string, string> metadata = new Dictionary<string, string>();
        metadata.Add("ApimMessageId", message.MessageId.ToString());

        EventModel moesifEvent = new EventModel()
        {
            Request = moesifRequest,
            Response = moesifResponse,
            SessionToken = _SessionTokenKey != null ? message.HttpRequestMessage.Headers.GetValues(_SessionTokenKey).FirstOrDefault() : null,
            Tags = null,
            UserId = null,
            Metadata = metadata
        };

        Dictionary<string, string> response = await _MoesifClient.Api.CreateEventAsync(moesifEvent);

        _Logger.LogDebug("Message forwarded to Moesif");
    }

    private static Dictionary<string, string> ToHeaders(HttpHeaders headers)
    {
        IEnumerable<KeyValuePair<string, IEnumerable<string>>> enumerable = headers.GetEnumerator().ToEnumerable();
        return enumerable.ToDictionary(p => p.Key, p => p.Value.GetEnumerator()
                                                         .ToEnumerable()
                                                         .ToList()
                                                         .Aggregate((i, j) => i + ", " + j));
    }
}

MoesifHttpMessageProcessor 利用一个专为 Moesif 设计的 C# API 库,这个库可以轻松地将 HTTP 事件数据推送到他们的服务中。 若要将 HTTP 数据发送到 Moesif 收集器 API,需要一个帐户和一个应用程序 ID。可以通过在 Moesif 的网站上创建帐户来获取 Moesif 应用程序 ID,然后转到右上角的菜单并选择“应用设置”。

完整示例

GitHub 上提供了本示例的源代码和测试。 需要准备好 API 管理服务连接的事件中心存储帐户才能自行运行本示例。

此示例只是一个简单的控制台应用程序,它侦听来自事件中心的事件,将其转换为 Moesif EventRequestModelEventResponseModel 对象,然后将其转发到 Moesif 收集器 API。

在以下动画图像中,可以看到在开发人员门户中向 API 发出请求,控制台应用程序显示正在接收、处理和转发的消息,然后在事件流中显示请求和响应。

将请求转发到 Runscope 的动画图像演示

概要

Azure API 管理服务是捕获 API 双向 HTTP 流量的理想平台。 Azure 事件中心是一个高度可缩放的、低成本的解决方案,可以捕获流量并将其馈送到辅助处理系统进行日志记录、监视和其他复杂分析。 只需编写几十行代码,就能轻松连接到 Moesif 等第三方流量监视系统。