跟踪工具 PydanticAI

PydanticAI 是一个 Python 框架,用于生成具有强键入和人体工学 API 的生产级 AI 应用。 它以 Pydantic 模型为中心,在整个代理工作流中强制实施结构和验证。

MLflow 跟踪 与 PydanticAI 集成,以记录代理步骤、工具调用和模型调用及其类型化输入和输出。 使用 mlflow.pydantic_ai.autolog 启用它。

此集成提供的内容:

  • 使用提示、kwargs 和输出响应的代理调用
  • LLM 请求记录模型名称、提示、参数和响应
  • 工具运行时捕获工具名称、参数以及使用指标
  • MCP 服务器调用和工具调用跟踪列表
  • 跨元数据:延迟、错误和运行 ID 链接

先决条件

若要将 MLflow 跟踪与 PydanticAI 配合使用,需要安装 MLflow 和相关 PydanticAI 包。

开发

对于开发环境,请使用 Databricks Extras 和 PydanticAI 安装完整的 MLflow 包:

pip install --upgrade "mlflow[databricks]>=3.1" pydantic-ai openai

完整 mlflow[databricks] 包包括用于 Databricks 的本地开发和试验的所有功能。

生产

对于生产部署,请安装 mlflow-tracing 和 PydanticAI。

pip install --upgrade mlflow-tracing pydantic-ai openai

mlflow-tracing 已针对生产用途进行优化。

注释

建议使用 MLflow 3 来获得最佳跟踪体验。

在运行示例之前,需要配置环境:

对于不使用 Databricks 笔记本的用户:设置 Databricks 环境变量:

export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-personal-access-token"

对于 Databricks 笔记本中的用户:这些凭据会自动为您设置。

示例用法

import os
from dataclasses import dataclass
from typing import Any

from httpx import AsyncClient

from pydantic_ai import Agent, ModelRetry, RunContext

@dataclass
class Deps:
    client: AsyncClient
    weather_api_key: str | None
    geo_api_key: str | None

weather_agent = Agent(
    # Switch to your favorite LLM
    "google-gla:gemini-2.0-flash",
    # 'Be concise, reply with one sentence.' is enough for some models (like openai) to use
    # the below tools appropriately, but others like anthropic and gemini require a bit more direction.
    system_prompt=(
        "Be concise, reply with one sentence."
        "Use the `get_lat_lng` tool to get the latitude and longitude of the locations, "
        "then use the `get_weather` tool to get the weather."
    ),
    deps_type=Deps,
    retries=2,
    instrument=True,
)

@weather_agent.tool
async def get_lat_lng(
    ctx: RunContext[Deps], location_description: str
) -> dict[str, float]:
    """Get the latitude and longitude of a location.

    Args:
        ctx: The context.
        location_description: A description of a location.
    """
    if ctx.deps.geo_api_key is None:
        return {"lat": 51.1, "lng": -0.1}

    params = {
        "q": location_description,
        "api_key": ctx.deps.geo_api_key,
    }
    r = await ctx.deps.client.get("https://geocode.maps.co/search", params=params)
    r.raise_for_status()
    data = r.json()

    if data:
        return {"lat": data[0]["lat"], "lng": data[0]["lon"]}
    else:
        raise ModelRetry("Could not find the location")

@weather_agent.tool
async def get_weather(ctx: RunContext[Deps], lat: float, lng: float) -> dict[str, Any]:
    """Get the weather at a location.

    Args:
        ctx: The context.
        lat: Latitude of the location.
        lng: Longitude of the location.
    """

    if ctx.deps.weather_api_key is None:
        return {"temperature": "21 °C", "description": "Sunny"}

    params = {
        "apikey": ctx.deps.weather_api_key,
        "location": f"{lat},{lng}",
        "units": "metric",
    }
    r = await ctx.deps.client.get(
        "https://api.tomorrow.io/v4/weather/realtime", params=params
    )
    r.raise_for_status()
    data = r.json()

    values = data["data"]["values"]
    # https://docs.tomorrow.io/reference/data-layers-weather-codes
    code_lookup = {
        1000: "Clear, Sunny",
        1100: "Mostly Clear",
        1101: "Partly Cloudy",
        1102: "Mostly Cloudy",
        1001: "Cloudy",
        2000: "Fog",
        2100: "Light Fog",
        4000: "Drizzle",
        4001: "Rain",
        4200: "Light Rain",
        4201: "Heavy Rain",
        5000: "Snow",
        5001: "Flurries",
        5100: "Light Snow",
        5101: "Heavy Snow",
        6000: "Freezing Drizzle",
        6001: "Freezing Rain",
        6200: "Light Freezing Rain",
        6201: "Heavy Freezing Rain",
        7000: "Ice Pellets",
        7101: "Heavy Ice Pellets",
        7102: "Light Ice Pellets",
        8000: "Thunderstorm",
    }
    return {
        "temperature": f'{values["temperatureApparent"]:0.0f}°C',
        "description": code_lookup.get(values["weatherCode"], "Unknown"),
    }

async def main():
    async with AsyncClient() as client:
        weather_api_key = os.getenv("WEATHER_API_KEY")
        geo_api_key = os.getenv("GEO_API_KEY")
        deps = Deps(
            client=client, weather_api_key=weather_api_key, geo_api_key=geo_api_key
        )
        result = await weather_agent.run(
            "What is the weather like in London and in Wiltshire?", deps=deps
        )
        print("Response:", result.output)

# If you are running this on a notebook
await main()

# Uncomment this is you are using an IDE or Python script.
# asyncio.run(main())

像往常一样使用 PydanticAI(代理、工具和业务流程)。 跟踪记录将显示在关联的实验中。

连接到 MCP 服务器

下面的示例演示如何使用已启用 MLflow 跟踪的 PydanticAI 运行 MCP 服务器。 所有工具调用和列操作都会作为 UI 中的跟踪跨度自动捕获。

import mlflow
import asyncio

mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("MCP Server")
mlflow.pydantic_ai.autolog()

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio

server = MCPServerStdio(
    "deno",
    args=[
        "run",
        "-N",
        "-R=node_modules",
        "-W=node_modules",
        "--node-modules-dir=auto",
        "jsr:@pydantic/mcp-run-python",
        "stdio",
    ],
)

agent = Agent("openai:gpt-4o", mcp_servers=[server], instrument=True)

async def main():
    async with agent.run_mcp_servers():
        result = await agent.run("How many days between 2000-01-01 and 2025-03-18?")
    print(result.output)
    # > There are 9,208 days between January 1, 2000, and March 18, 2025.

# If you are running this on a notebook
await main()

# Uncomment this is you are using an IDE or Python script.
# asyncio.run(main())

令牌使用情况跟踪

MLflow 3.2.0+ 在跟踪信息中记录令牌使用总计,并在跨度属性中记录每次调用的令牌使用量。

import mlflow

last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id=last_trace_id)

print(trace.info.token_usage)
for span in trace.data.spans:
    usage = span.get_attribute("mlflow.chat.tokenUsage")
    if usage:
        print(span.name, usage)

禁用自动跟踪

使用 mlflow.pydantic_ai.autolog(disable=True) 禁用,或使用 mlflow.autolog(disable=True) 全局禁用。