教程:跟踪和分析用户和环境

本教程演示如何向跟踪添加上下文,以便跟踪和分析用户、会话和部署。

  • 在一个简单的聊天应用中,你将使用 mlflow.update_current_trace() 来向跟踪添加自定义元数据和标记。
  • 若要分析跟踪,你将使用mlflow.search_traces()来提取与用户、会话、环境和应用版本相关的跟踪,并计算统计信息。

环境设置

安装所需程序包:

  • mlflow[databricks]:使用最新版本的 MLflow 获取更多功能和改进。
  • openai:此应用将使用 OpenAI API 客户端调用 Databricks 托管的模型。
%pip install -qq --upgrade "mlflow[databricks]>=3.1.0" openai
dbutils.library.restartPython()

创建 MLflow 实验。 如果使用 Databricks 笔记本,则可以跳过此步骤并使用默认笔记本试验。 否则,请按照 环境设置快速指南 创建实验并连接到 MLflow 跟踪服务器。

定义和跟踪应用程序

下面的简单聊天应用程序调用 Databricks 托管的基础模型来应答用户查询。

进行跟踪的方法有以下几种:

  • mlflow.openai.autolog() 自动记录 OpenAI 客户端调用
  • @mlflow.trace 追踪 my_app() 中的应用程序逻辑
  • mlflow.update_current_trace()my_app() 中为跟踪添加上下文:
    • 用户和会话上下文:可以将特定于查询的信息(如用户 ID)作为参数传递给应用程序逻辑。
    • 部署上下文:特定于部署的信息(如环境或应用版本)通过环境变量传递到应用程序,从而简化了部署中的配置更改。

MLflow 会自动填充跟踪中的某些元数据,但可以替代默认值。 还可以定义自定义元数据。 下面的示例演示了这两者。


import mlflow
import os
from databricks.sdk import WorkspaceClient

mlflow.openai.autolog()

@mlflow.trace
def my_app(user_id: str, session_id: str, message: str) -> str:
    """Process a chat message with extra content logging for traces."""

    # Add user and session context to the current trace.
    # The @mlflow.trace decorator ensures there is an active trace.
    mlflow.update_current_trace(
        metadata={
            "mlflow.trace.user": user_id,
            "mlflow.trace.session": session_id,
        },
        tags={
          "query_category": "chat",  # Example of a custom tag
        },
    )

    app_environment = os.getenv("APP_ENVIRONMENT", "development")
    mlflow.update_current_trace(
        metadata={
            # Override automatically populated metadata
            "mlflow.source.type": app_environment,  # Override default LOCAL/NOTEBOOK
            # Add custom metadata
            "app_version": os.getenv("APP_VERSION", "1.0.0"),
            "deployment_id": os.getenv("DEPLOYMENT_ID", "unknown"),
        }
    )

    # The trace will capture the execution time, inputs, outputs, and any errors
    # Your chat logic here
    response = chat_completion(message)
    return response

# Basic chat logic
def chat_completion(message: str) -> str:
    # Create an OpenAI client that is connected to Databricks-hosted LLMs
    w = WorkspaceClient()
    client = w.serving_endpoints.get_open_ai_client()

    response = client.chat.completions.create(
        model="databricks-claude-sonnet-4",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful assistant. Give brief, 1-2 sentence responses.",
            },
            {
                "role": "user",
                "content": message,
            },
        ]
    )
    return response.choices[0].message.content

上面的应用程序逻辑采用用户、会话和其他元数据作为函数参数。 在生产应用程序中,实现可能会从请求对象中的标头中提取元数据。

接下来,模拟几个不同的用户和多个会话,每个用户或会话都有一次或多次聊天交互。 使用环境变量设置部署信息。

# Set environment variables to log deployment-specific metadata with traces.
os.environ["APP_ENVIRONMENT"] = "staging"
os.environ["APP_VERSION"] = "1.0.0"
os.environ["DEPLOYMENT_ID"] = "deployment-123"

# Run the chat completion with user and session context to generate example traces:
for session in range(2):
  # 2 chat interactions per session for this user
  result = my_app(
      user_id="user-123",
      session_id=f"session-abc-{session}",
      message="What is MLflow and how does it help with GenAI?"
  )
  result = my_app(
      user_id="user-123",
      session_id=f"session-abc-{session}",
      message="What is ML vs. AI?"
  )

os.environ["APP_VERSION"] = "1.1.0"
os.environ["DEPLOYMENT_ID"] = "deployment-456"

for session in range(2):
  # 1 chat interaction per session for this user
  result = my_app(
      user_id="user-456",
      session_id=f"session-def-{session}",
      message="What is MLflow and how does it help with machine learning?"
  )

搜索跟踪记录

以下所有分析都基于使用 mlflow.search_traces() 收集相关轨迹进行分析:

import mlflow
traces = mlflow.search_traces()
traces

每个跟踪都用应用中记录的其他上下文(例如用户 ID)进行批注:

first_trace = traces.iloc[0]
first_trace.trace_metadata['mlflow.trace.user']
'user-456'

分析用户行为

首先,分析特定用户的行为。

import pandas as pd
import time

def analyze_user_behavior(user_id: str, days: int = 7):
    """Analyze activity patterns for a specific user."""

    cutoff_ms = int((time.time() - days * 86400) * 1000)

    traces = mlflow.search_traces(
        filter_string=f"metadata.`mlflow.trace.user` = '{user_id}' AND "
                      f"trace.timestamp_ms > {cutoff_ms}",
        order_by=["trace.timestamp_ms DESC"],
    )

    if len(traces) == 0:
        print(f"No activity found for user {user_id}")
        return

    # Calculate key metrics
    total_interactions = len(traces)
    unique_sessions = set(row.trace_metadata.get("mlflow.trace.session", "") for index, row in traces.iterrows())
    unique_sessions.discard("")

    print(f"User {user_id} Activity Report ({days} days)")
    print("=" * 50)
    print(f"Total interactions: {total_interactions}")
    print(f"Unique sessions: {len(unique_sessions)}")

    # Daily activity
    traces['date'] = pd.to_datetime(traces['request_time'], unit='ms').dt.date
    daily_activity = traces.groupby('date').size()
    print(f"\nDaily activity:")
    print(daily_activity.to_string())

    # Query categories
    query_categories = traces['tags'].apply(lambda tags: tags.get('query_category'))
    unique_categories = set(query_categories.dropna())
    category_counts = query_categories.value_counts()
    print(f"\nQuery categories:")
    print(category_counts.to_string())

    # Performance stats
    print(f"\nPerformance:")
    print(f"Average response time: {traces['execution_duration'].mean():.1f}ms")
    print(f"Error rate: {(traces['state'] == 'ERROR').mean() * 100:.1f}%")

    return traces
analyze_user_behavior(user_id="user-123")
User user-123 Activity Report (7 days)
==================================================
Total interactions: 4
Unique sessions: 2

Daily activity:
date
2025-12-12    4

Query categories:
tags
chat    4

Performance:
Average response time: 2177.5ms
Error rate: 0.0%

分析会话流

用户可能会与您的应用程序互动,以进行多轮对话。 分析会话轮次有助于说明用户体验。 下面,使用跟踪时间戳按顺序排列对话轮次。

def analyze_session_flow(session_id: str):
    """Analyze conversation flow within a session."""

    # Get all traces from a session, ordered chronologically
    session_traces = mlflow.search_traces(
        filter_string=f"metadata.`mlflow.trace.session` = '{session_id}'",
        order_by=["timestamp ASC"]
    )

    # Build a timeline of the conversation
    conversation_turns = []
    for index, row in session_traces.iterrows():
        conversation_turns.append({
            "turn": index + 1,
            "timestamp": int(row.request_time),
            "duration_ms": int(row.execution_duration),
            "status": str(row.state),
            "response": row.response,
        })

    return conversation_turns
analyze_session_flow(session_id="session-abc-0")
[{'turn': 1,
  'timestamp': 1765560306051,
  'duration_ms': 2570,
  'status': 'OK',
  'response': 'MLflow is an open-source platform for managing the machine learning lifecycle, including experiment tracking, model packaging, and deployment. For GenAI, it helps by providing tools to track experiments with large language models, manage model versions, log prompts and responses, and deploy AI models at scale while maintaining reproducibility and governance.'},
 {'turn': 2,
  'timestamp': 1765560308943,
  'duration_ms': 2644,
  'status': 'OK',
  'response': 'AI (Artificial Intelligence) is the broader field focused on creating machines that can perform tasks requiring human-like intelligence, while ML (Machine Learning) is a subset of AI that specifically uses algorithms to learn patterns from data without being explicitly programmed for each task. Think of AI as the goal and ML as one of the main methods to achieve it.'}]

分析环境和版本

可以像分析用户和会话一样分析部署元数据,例如环境或应用版本。 分析部署有助于跟踪对应用程序进行迭代时的质量、延迟或其他重要指标的改进或降级。

traces = mlflow.search_traces()

traces['app_version'] = traces['trace_metadata'].apply(lambda meta: meta.get('app_version'))
traces['user_id'] = traces['trace_metadata'].apply(lambda meta: meta.get('mlflow.trace.user'))
traces['app_environment'] = traces['trace_metadata'].apply(lambda meta: meta.get('mlflow.source.type'))

interactions_per_version = traces.groupby('app_version').size()
print(f"Interactions per app version:")
print(interactions_per_version.to_string())

users_per_version = traces.groupby('app_version')['user_id'].nunique()
print(f"\nDistinct users per app version:")
print(users_per_version.to_string())

interactions_per_environment = traces.groupby('app_environment').size()
print(f"\nInteractions per app environment:")
print(interactions_per_environment.to_string())
Interactions per app version:
app_version
1.0.0    4
1.1.0    4

Distinct users per app version:
app_version
1.0.0    1
1.1.0    1

Interactions per app environment:
app_environment
staging    8

后续步骤

示例笔记本

教程:跟踪和分析用户和环境

获取笔记本