上下文 mlflow.start_span() 管理器允许为任意代码块创建范围。 虽然 函数修饰器 可以以函数为粒度进行跟踪,但 start_span() 能够在代码中捕捉更细致和复杂的交互。
使用上下文管理器进行跨度跟踪可让你对跟踪的代码进行精细控制:
- 任意代码块:跟踪任何代码块,而不仅仅是整个函数
- 自动上下文管理:MLflow 处理父子关系和清理
- 适用于函数装饰器:使用
@mlflow.trace实现混合式方法 - 异常处理:自动错误捕获,类似于函数修饰器
先决条件
本教程需要以下包:
-
mlflow[databricks]3.1 及更高版本:具有 GenAI 功能和 Databricks 连接的核心 MLflow 功能。 -
openai1.0.0 及更高版本:以下示例应用使用 OpenAI 客户端。 在你自己的代码中,根据需要将其替换为其他 SDK。
安装基本组件:
%pip install --upgrade "mlflow[databricks]>=3.1" "openai>=1.0.0"
dbutils.library.restartPython()
MLflow 2 的先决条件
Databricks 强烈建议安装 MLflow 3.1 或更高版本(如果使用 mlflow[databricks])。
对于 MLflow 2,跨度跟踪与上下文管理器需要以下包:
-
mlflow[databricks]2.15.0 及更高版本:具有 Databricks 连接的核心 MLflow 功能。 -
openai1.0.0 及更高版本:(可选)如果自定义代码使用 OpenAI 客户端,请安装。
%pip install --upgrade "mlflow[databricks]>=2.15.0,<3.0.0"
pip install --upgrade openai>=1.0.0 # Install if needed
dbutils.library.restartPython()
跨度跟踪 API
与函数修饰器类似,上下文管理器会自动捕获父子关系、异常和执行时间。 它还与 自动跟踪 兼容。
与函数修饰器不同,需要手动指定区段的名称、输入和输出。 您可以使用从上下文管理器返回的 LiveSpan 对象来配置它们。
下面的代码片段演示了基本的跨度跟踪:
import mlflow
with mlflow.start_span(name="my_span") as span:
x = 1
y = 2
span.set_inputs({"x":x, "y": y})
z = x + y
span.set_outputs(z)
跨事件
SpanEvent 对象 记录跨度生存期内的特定事件。 下面的代码片段显示了:
- 使用当前时间戳创建事件
- 创建一个具有特定时间戳(纳秒)的事件
- 从
Exception创建事件
import mlflow
from mlflow.entities import SpanEvent, SpanType
import time
with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:
# Create an event with current timestamp
event = SpanEvent(
name="validation_completed",
attributes={
"records_validated": 1000,
"errors_found": 3,
"validation_type": "schema"
}
)
span.add_event(event)
# Create an event with specific timestamp (nanoseconds)
specific_time_event = SpanEvent(
name="data_checkpoint",
timestamp=int(time.time() * 1e9),
attributes={"checkpoint_id": "ckpt_123"}
)
span.add_event(specific_time_event)
# Create an event from an exception
try:
raise ValueError("Invalid input format")
except Exception as e:
error_event = SpanEvent.from_exception(e)
# This creates an event with name="exception" and attributes containing:
# - exception.message
# - exception.type
# - exception.stacktrace
# Add to current span
span = mlflow.get_current_active_span()
span.add_event(error_event)
时间跨度状态
SpanStatus 定义范围的状态。 请注意,mlflow.start_span() 上下文管理器在退出时会重置状态。 下面的代码片段显示了设置跨度状态的各种方法:
import mlflow
from mlflow.entities import SpanStatus, SpanStatusCode, SpanType
with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:
# Create status objects
success_status = SpanStatus(SpanStatusCode.OK)
error_status = SpanStatus(
SpanStatusCode.ERROR,
description="Failed to connect to database"
)
# Set status on a live span
span.set_status(success_status)
# Or use string shortcuts
span.set_status("OK")
span.set_status("ERROR")
# When the context manager exits successfully, the status is overwritten with status "OK"
已完成范围的查询状态:
last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)
for span in trace.data.spans:
print(span.status.status_code)
RETRIEVER 范围
从数据存储检索文档时使用 RETRIEVER 范围 。
RETRIEVER spans 必须输出如下例所示的列表 Documents :
import mlflow
from mlflow.entities import Document, SpanType
@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
span = mlflow.get_current_active_span()
# Create Document objects (required for RETRIEVER spans)
documents = [
Document(
page_content="The content of the document...",
metadata={
"doc_uri": "path/to/document.md",
"chunk_id": "chunk_001",
"relevance_score": 0.95,
"source": "knowledge_base"
},
id="doc_123" # Optional document ID
),
Document(
page_content="Another relevant section...",
metadata={
"doc_uri": "path/to/other.md",
"chunk_id": "chunk_042",
"relevance_score": 0.87
}
)
]
# Set outputs as Document objects for proper UI rendering
span.set_outputs(documents)
# Return in your preferred format
return [doc.to_dict() for doc in documents]
retrieve_documents(query="What is ML?")
访问检索器输出:
last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)
retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]
if retriever_span.outputs:
for doc in retriever_span.outputs:
if isinstance(doc, dict):
content = doc.get('page_content', '')
uri = doc.get('metadata', {}).get('doc_uri', '')
score = doc.get('metadata', {}).get('relevance_score', 0)
print(f"Document from {uri} (score: {score})")
高级示例
下面是一个更复杂的示例,它结合了:
-
mlflow.start_span()上下文管理器 -
@mlflow.trace函数修饰器 - OpenAI 的自动追踪
from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.entities import SpanEvent, SpanType
import openai
import time
# Enable auto-tracing for OpenAI
mlflow.openai.autolog()
# Create an OpenAI client that is connected to Databricks-hosted LLMs.
workspace = WorkspaceClient()
client = workspace.serving_endpoints.get_open_ai_client()
@mlflow.trace(span_type=SpanType.CHAIN)
def chat_iteration(messages, user_input):
with mlflow.start_span(name="User", span_type=SpanType.CHAIN) as span:
span.set_inputs({
"messages": messages,
"timestamp": time.time(),
})
# Set individual attribute
span.set_attribute("messages_length", len(messages))
# Set multiple attributes at once
span.set_attributes({
"environment": "production",
"custom_metadata": {"key": "value"}
})
# Add events during execution
span.add_event(SpanEvent(
name="processing_started",
attributes={
"stage": "initialization",
"memory_usage_mb": 256,
}
))
span.set_outputs(user_input)
messages.append({"role": "user", "content": user_input})
response = client.chat.completions.create(
model="databricks-claude-sonnet-4-5",
max_tokens=100,
messages=messages,
)
answer = response.choices[0].message.content
print(f"Assistant: {answer}")
messages.append({"role": "assistant", "content": answer})
chat_iteration(
messages = [{"role": "system", "content": "You are a friendly chat bot"}],
user_input="What is your favorite color?",
)
要查看较长对话的嵌套跟踪示例,请在下面的示例中移除注释符号:
# @mlflow.trace(span_type=SpanType.CHAIN)
# def start_session():
# messages = [{"role": "system", "content": "You are a friendly chat bot"}]
# while True:
# user_input = input(">> ")
# chat_iteration(messages, user_input)
# if user_input == "BYE":
# break
# start_session()
后续步骤
- 函数修饰器 - 跟踪整个函数的更简单方法
- 低级别客户端 API - 了解需要完全控制的高级方案
- 调试和分析应用程序 - 查询和分析记录的追踪