共用方式為

使用上下文管理器进行范围跟踪

上下文 mlflow.start_span() 管理器允许为任意代码块创建范围。 虽然 函数修饰器 可以以函数为粒度进行跟踪,但 start_span() 能够在代码中捕捉更细致和复杂的交互。

使用上下文管理器进行跨度跟踪可让你对跟踪的代码进行精细控制:

  • 任意代码块:跟踪任何代码块,而不仅仅是整个函数
  • 自动上下文管理:MLflow 处理父子关系和清理
  • 适用于函数装饰器:使用 @mlflow.trace 实现混合式方法
  • 异常处理:自动错误捕获,类似于函数修饰器

先决条件

本教程需要以下包:

  • mlflow[databricks] 3.1 及更高版本:具有 GenAI 功能和 Databricks 连接的核心 MLflow 功能。
  • openai 1.0.0 及更高版本:以下示例应用使用 OpenAI 客户端。 在你自己的代码中,根据需要将其替换为其他 SDK。

安装基本组件:

%pip install --upgrade "mlflow[databricks]>=3.1" "openai>=1.0.0"

dbutils.library.restartPython()

MLflow 2 的先决条件

Databricks 强烈建议安装 MLflow 3.1 或更高版本(如果使用 mlflow[databricks])。

对于 MLflow 2,跨度跟踪与上下文管理器需要以下包:

  • mlflow[databricks] 2.15.0 及更高版本:具有 Databricks 连接的核心 MLflow 功能。
  • openai 1.0.0 及更高版本:(可选)如果自定义代码使用 OpenAI 客户端,请安装。
%pip install --upgrade "mlflow[databricks]>=2.15.0,<3.0.0"
pip install --upgrade openai>=1.0.0 # Install if needed
dbutils.library.restartPython()

跨度跟踪 API

与函数修饰器类似,上下文管理器会自动捕获父子关系、异常和执行时间。 它还与 自动跟踪 兼容。

与函数修饰器不同,需要手动指定区段的名称、输入和输出。 您可以使用从上下文管理器返回的 LiveSpan 对象来配置它们。

下面的代码片段演示了基本的跨度跟踪:

import mlflow

with mlflow.start_span(name="my_span") as span:
    x = 1
    y = 2
    span.set_inputs({"x":x, "y": y})
    z = x + y
    span.set_outputs(z)

跨事件

SpanEvent 对象 记录跨度生存期内的特定事件。 下面的代码片段显示了:

  • 使用当前时间戳创建事件
  • 创建一个具有特定时间戳(纳秒)的事件
  • Exception 创建事件
import mlflow
from mlflow.entities import SpanEvent, SpanType
import time

with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:
    # Create an event with current timestamp
    event = SpanEvent(
        name="validation_completed",
        attributes={
            "records_validated": 1000,
            "errors_found": 3,
            "validation_type": "schema"
        }
    )
    span.add_event(event)

    # Create an event with specific timestamp (nanoseconds)
    specific_time_event = SpanEvent(
        name="data_checkpoint",
        timestamp=int(time.time() * 1e9),
        attributes={"checkpoint_id": "ckpt_123"}
    )
    span.add_event(specific_time_event)

    # Create an event from an exception
    try:
        raise ValueError("Invalid input format")
    except Exception as e:
        error_event = SpanEvent.from_exception(e)
        # This creates an event with name="exception" and attributes containing:
        # - exception.message
        # - exception.type
        # - exception.stacktrace

        # Add to current span
        span = mlflow.get_current_active_span()
        span.add_event(error_event)

时间跨度状态

SpanStatus 定义范围的状态。 请注意,mlflow.start_span() 上下文管理器在退出时会重置状态。 下面的代码片段显示了设置跨度状态的各种方法:

import mlflow
from mlflow.entities import SpanStatus, SpanStatusCode, SpanType

with mlflow.start_span(name="manual_span", span_type=SpanType.CHAIN) as span:

    # Create status objects
    success_status = SpanStatus(SpanStatusCode.OK)
    error_status = SpanStatus(
        SpanStatusCode.ERROR,
        description="Failed to connect to database"
    )

    # Set status on a live span
    span.set_status(success_status)

    # Or use string shortcuts
    span.set_status("OK")
    span.set_status("ERROR")

    # When the context manager exits successfully, the status is overwritten with status "OK"

已完成范围的查询状态:

last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)

for span in trace.data.spans:
    print(span.status.status_code)

RETRIEVER 范围

从数据存储检索文档时使用 RETRIEVER 范围RETRIEVER spans 必须输出如下例所示的列表 Documents

import mlflow
from mlflow.entities import Document, SpanType

@mlflow.trace(span_type=SpanType.RETRIEVER)
def retrieve_documents(query: str):
    span = mlflow.get_current_active_span()

    # Create Document objects (required for RETRIEVER spans)
    documents = [
        Document(
            page_content="The content of the document...",
            metadata={
                "doc_uri": "path/to/document.md",
                "chunk_id": "chunk_001",
                "relevance_score": 0.95,
                "source": "knowledge_base"
            },
            id="doc_123"  # Optional document ID
        ),
        Document(
            page_content="Another relevant section...",
            metadata={
                "doc_uri": "path/to/other.md",
                "chunk_id": "chunk_042",
                "relevance_score": 0.87
            }
        )
    ]

    # Set outputs as Document objects for proper UI rendering
    span.set_outputs(documents)

    # Return in your preferred format
    return [doc.to_dict() for doc in documents]

retrieve_documents(query="What is ML?")

访问检索器输出:

last_trace_id = mlflow.get_last_active_trace_id()
trace = mlflow.get_trace(last_trace_id)

retriever_span = trace.search_spans(span_type=SpanType.RETRIEVER)[0]

if retriever_span.outputs:
    for doc in retriever_span.outputs:
        if isinstance(doc, dict):
            content = doc.get('page_content', '')
            uri = doc.get('metadata', {}).get('doc_uri', '')
            score = doc.get('metadata', {}).get('relevance_score', 0)
            print(f"Document from {uri} (score: {score})")

高级示例

下面是一个更复杂的示例,它结合了:

from databricks.sdk import WorkspaceClient
import mlflow
from mlflow.entities import SpanEvent, SpanType
import openai
import time

# Enable auto-tracing for OpenAI
mlflow.openai.autolog()

# Create an OpenAI client that is connected to Databricks-hosted LLMs.
workspace = WorkspaceClient()
client = workspace.serving_endpoints.get_open_ai_client()

@mlflow.trace(span_type=SpanType.CHAIN)
def chat_iteration(messages, user_input):
    with mlflow.start_span(name="User", span_type=SpanType.CHAIN) as span:
        span.set_inputs({
            "messages": messages,
            "timestamp": time.time(),
        })

        # Set individual attribute
        span.set_attribute("messages_length", len(messages))

        # Set multiple attributes at once
        span.set_attributes({
            "environment": "production",
            "custom_metadata": {"key": "value"}
        })

        # Add events during execution
        span.add_event(SpanEvent(
            name="processing_started",
            attributes={
                "stage": "initialization",
                "memory_usage_mb": 256,
            }
        ))

        span.set_outputs(user_input)

    messages.append({"role": "user", "content": user_input})

    response = client.chat.completions.create(
        model="databricks-claude-sonnet-4-5",
        max_tokens=100,
        messages=messages,
    )
    answer = response.choices[0].message.content
    print(f"Assistant: {answer}")

    messages.append({"role": "assistant", "content": answer})

chat_iteration(
    messages = [{"role": "system", "content": "You are a friendly chat bot"}],
    user_input="What is your favorite color?",
)

要查看较长对话的嵌套跟踪示例,请在下面的示例中移除注释符号:

# @mlflow.trace(span_type=SpanType.CHAIN)
# def start_session():
# messages = [{"role": "system", "content": "You are a friendly chat bot"}]
# while True:
# user_input = input(">> ")
# chat_iteration(messages, user_input)

# if user_input == "BYE":
# break

# start_session()

后续步骤

示例笔记本

使用上下文管理器进行范围跟踪

获取笔记本