PydanticAI 是一个 Python 框架,用于生成具有强键入和人体工学 API 的生产级 AI 应用。 它以 Pydantic 模型为中心,在整个代理工作流中强制实施结构和验证。
MLflow 跟踪 与 PydanticAI 集成,以记录代理步骤、工具调用和模型调用及其类型化输入和输出。 使用 mlflow.pydantic_ai.autolog 启用它。
此集成提供的内容:
- 使用提示、kwargs 和输出响应的代理调用
- LLM 请求记录模型名称、提示、参数和响应
- 工具运行时捕获工具名称、参数以及使用指标
- MCP 服务器调用和工具调用跟踪列表
- 跨元数据:延迟、错误和运行 ID 链接
先决条件
若要将 MLflow 跟踪与 PydanticAI 配合使用,需要安装 MLflow 和相关 PydanticAI 包。
开发
对于开发环境,请使用 Databricks Extras 和 PydanticAI 安装完整的 MLflow 包:
pip install --upgrade "mlflow[databricks]>=3.1" pydantic-ai openai
完整 mlflow[databricks] 包包括用于 Databricks 的本地开发和试验的所有功能。
生产
对于生产部署,请安装 mlflow-tracing 和 PydanticAI。
pip install --upgrade mlflow-tracing pydantic-ai openai
包 mlflow-tracing 已针对生产用途进行优化。
注释
建议使用 MLflow 3 来获得最佳跟踪体验。
在运行示例之前,需要配置环境:
对于不使用 Databricks 笔记本的用户:设置 Databricks 环境变量:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-personal-access-token"
对于 Databricks 笔记本中的用户:这些凭据会自动为您设置。
示例用法
import os
from dataclasses import dataclass
from typing import Any
from httpx import AsyncClient
from pydantic_ai import Agent, ModelRetry, RunContext
@dataclass
class Deps:
client: AsyncClient
weather_api_key: str | None
geo_api_key: str | None
weather_agent = Agent(
# Switch to your favorite LLM
"google-gla:gemini-2.0-flash",
# 'Be concise, reply with one sentence.' is enough for some models (like openai) to use
# the below tools appropriately, but others like anthropic and gemini require a bit more direction.
system_prompt=(
"Be concise, reply with one sentence."
"Use the `get_lat_lng` tool to get the latitude and longitude of the locations, "
"then use the `get_weather` tool to get the weather."
),
deps_type=Deps,
retries=2,
instrument=True,
)
@weather_agent.tool
async def get_lat_lng(
ctx: RunContext[Deps], location_description: str
) -> dict[str, float]:
"""Get the latitude and longitude of a location.
Args:
ctx: The context.
location_description: A description of a location.
"""
if ctx.deps.geo_api_key is None:
return {"lat": 51.1, "lng": -0.1}
params = {
"q": location_description,
"api_key": ctx.deps.geo_api_key,
}
r = await ctx.deps.client.get("https://geocode.maps.co/search", params=params)
r.raise_for_status()
data = r.json()
if data:
return {"lat": data[0]["lat"], "lng": data[0]["lon"]}
else:
raise ModelRetry("Could not find the location")
@weather_agent.tool
async def get_weather(ctx: RunContext[Deps], lat: float, lng: float) -> dict[str, Any]:
"""Get the weather at a location.
Args:
ctx: The context.
lat: Latitude of the location.
lng: Longitude of the location.
"""
if ctx.deps.weather_api_key is None:
return {"temperature": "21 °C", "description": "Sunny"}
params = {
"apikey": ctx.deps.weather_api_key,
"location": f"{lat},{lng}",
"units": "metric",
}
r = await ctx.deps.client.get(
"https://api.tomorrow.io/v4/weather/realtime", params=params
)
r.raise_for_status()
data = r.json()
values = data["data"]["values"]
# https://docs.tomorrow.io/reference/data-layers-weather-codes
code_lookup = {
1000: "Clear, Sunny",
1100: "Mostly Clear",
1101: "Partly Cloudy",
1102: "Mostly Cloudy",
1001: "Cloudy",
2000: "Fog",
2100: "Light Fog",
4000: "Drizzle",
4001: "Rain",
4200: "Light Rain",
4201: "Heavy Rain",
5000: "Snow",
5001: "Flurries",
5100: "Light Snow",
5101: "Heavy Snow",
6000: "Freezing Drizzle",
6001: "Freezing Rain",
6200: "Light Freezing Rain",
6201: "Heavy Freezing Rain",
7000: "Ice Pellets",
7101: "Heavy Ice Pellets",
7102: "Light Ice Pellets",
8000: "Thunderstorm",
}
return {
"temperature": f'{values["temperatureApparent"]:0.0f}°C',
"description": code_lookup.get(values["weatherCode"], "Unknown"),
}
async def main():
async with AsyncClient() as client:
weather_api_key = os.getenv("WEATHER_API_KEY")
geo_api_key = os.getenv("GEO_API_KEY")
deps = Deps(
client=client, weather_api_key=weather_api_key, geo_api_key=geo_api_key
)
result = await weather_agent.run(
"What is the weather like in London and in Wiltshire?", deps=deps
)
print("Response:", result.output)
# If you are running this on a notebook
await main()
# Uncomment this is you are using an IDE or Python script.
# asyncio.run(main())
像往常一样使用 PydanticAI(代理、工具和业务流程)。 跟踪记录将显示在关联的实验中。
连接到 MCP 服务器
下面的示例演示如何使用已启用 MLflow 跟踪的 PydanticAI 运行 MCP 服务器。 所有工具调用和列操作都会作为 UI 中的跟踪跨度自动捕获。
import mlflow
import asyncio
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("MCP Server")
mlflow.pydantic_ai.autolog()
from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio
server = MCPServerStdio(
"deno",
args=[
"run",
"-N",
"-R=node_modules",
"-W=node_modules",
"--node-modules-dir=auto",
"jsr:@pydantic/mcp-run-python",
"stdio",
],
)
agent = Agent("openai:gpt-4o", mcp_servers=[server], instrument=True)
async def main():
async with agent.run_mcp_servers():
result = await agent.run("How many days between 2000-01-01 and 2025-03-18?")
print(result.output)
# > There are 9,208 days between January 1, 2000, and March 18, 2025.
# If you are running this on a notebook
await main()
# Uncomment this is you are using an IDE or Python script.
# asyncio.run(main())
令牌使用情况跟踪
MLflow 3.2.0+ 在跟踪信息中记录令牌使用总计,并在跨度属性中记录每次调用的令牌使用量。
import mlflow
last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(trace_id=last_trace_id)
print(trace.info.token_usage)
for span in trace.data.spans:
usage = span.get_attribute("mlflow.chat.tokenUsage")
if usage:
print(span.name, usage)
禁用自动跟踪
使用 mlflow.pydantic_ai.autolog(disable=True) 禁用,或使用 mlflow.autolog(disable=True) 全局禁用。