feat: Basic Weave OTEL integration (#17439)

* test

* testfix
This commit is contained in:
Andrew Truong
2025-12-04 00:26:22 -05:00
committed by GitHub
parent 06d42fbd30
commit 1879623502
10 changed files with 551 additions and 1 deletions

View File

@@ -151,6 +151,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"mlflow",
"langfuse",
"langfuse_otel",
"weave_otel",
"pagerduty",
"humanloop",
"gcs_pubsub",

View File

@@ -1081,6 +1081,11 @@ class OpenTelemetry(CustomLogger):
span, kwargs, response_obj
)
return
elif self.callback_name == "weave_otel":
from litellm.integrations.weave.weave_otel import set_weave_otel_attributes
set_weave_otel_attributes(span, kwargs, response_obj)
return
from litellm.proxy._types import SpanAttributes
optional_params = kwargs.get("optional_params", {})

View File

@@ -0,0 +1,7 @@
"""
Weave (W&B) integration for LiteLLM via OpenTelemetry.
"""
from litellm.integrations.weave.weave_otel import WeaveOtelLogger
__all__ = ["WeaveOtelLogger"]

View File

@@ -0,0 +1,329 @@
from __future__ import annotations
import base64
import json
import os
from typing import TYPE_CHECKING, Any, Optional
from opentelemetry.trace import Status, StatusCode
from typing_extensions import override
from litellm._logging import verbose_logger
from litellm.integrations._types.open_inference import SpanAttributes as OpenInferenceSpanAttributes
from litellm.integrations.arize import _utils
from litellm.integrations.opentelemetry import OpenTelemetry, OpenTelemetryConfig
from litellm.integrations.opentelemetry_utils.base_otel_llm_obs_attributes import (
BaseLLMObsOTELAttributes,
safe_set_attribute,
)
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.types.integrations.weave_otel import WeaveOtelConfig, WeaveSpanAttributes
from litellm.types.utils import StandardCallbackDynamicParams
if TYPE_CHECKING:
from opentelemetry.trace import Span
# Weave OTEL endpoint
# Multi-tenant cloud: https://trace.wandb.ai/otel/v1/traces
# Dedicated cloud: https://<your-subdomain>.wandb.io/traces/otel/v1/traces
WEAVE_BASE_URL = "https://trace.wandb.ai"
WEAVE_OTEL_ENDPOINT = "/otel/v1/traces"
class WeaveLLMObsOTELAttributes(BaseLLMObsOTELAttributes):
"""
Weave-specific LLM observability OTEL attributes.
Weave automatically maps attributes from multiple frameworks including
GenAI, OpenInference, Langfuse, and others.
"""
@staticmethod
@override
def set_messages(span: "Span", kwargs: dict[str, Any]):
"""Set input messages as span attributes using OpenInference conventions."""
messages = kwargs.get("messages") or []
optional_params = kwargs.get("optional_params") or {}
prompt = {"messages": messages}
functions = optional_params.get("functions")
tools = optional_params.get("tools")
if functions is not None:
prompt["functions"] = functions
if tools is not None:
prompt["tools"] = tools
safe_set_attribute(span, OpenInferenceSpanAttributes.INPUT_VALUE, json.dumps(prompt))
def _set_weave_specific_attributes(span: Span, kwargs: dict[str, Any], response_obj: Any):
"""
Sets Weave-specific metadata attributes onto the OTEL span.
Based on Weave's OTEL attribute mappings from:
https://github.com/wandb/weave/blob/master/weave/trace_server/opentelemetry/constants.py
"""
# Extract all needed data upfront
litellm_params = kwargs.get("litellm_params") or {}
# optional_params = kwargs.get("optional_params") or {}
metadata = kwargs.get("metadata") or {}
model = kwargs.get("model") or ""
custom_llm_provider = litellm_params.get("custom_llm_provider") or ""
# Weave supports a custom display name and will default to the model name if not provided.
display_name = metadata.get("display_name")
if not display_name and model:
if custom_llm_provider:
display_name = f"{custom_llm_provider}/{model}"
else:
display_name = model
if display_name:
display_name = display_name.replace("/", "__")
safe_set_attribute(span, WeaveSpanAttributes.DISPLAY_NAME.value, display_name)
# Weave threads are OpenInference sessions.
if (session_id := metadata.get("session_id")) is not None:
if isinstance(session_id, (list, dict)):
session_id = safe_dumps(session_id)
safe_set_attribute(span, WeaveSpanAttributes.THREAD_ID.value, session_id)
safe_set_attribute(span, WeaveSpanAttributes.IS_TURN.value, True)
# Response attributes are already set by _utils.set_attributes,
# but we override them here to better match Weave's expectations
if response_obj:
output_dict = None
if hasattr(response_obj, "model_dump"):
output_dict = response_obj.model_dump()
elif hasattr(response_obj, "get"):
output_dict = response_obj
if output_dict:
safe_set_attribute(span, OpenInferenceSpanAttributes.OUTPUT_VALUE, safe_dumps(output_dict))
def _get_weave_authorization_header(api_key: str) -> str:
"""
Get the authorization header for Weave OpenTelemetry.
Weave uses Basic auth with format: api:<WANDB_API_KEY>
"""
auth_string = f"api:{api_key}"
auth_header = base64.b64encode(auth_string.encode()).decode()
return f"Basic {auth_header}"
def get_weave_otel_config() -> WeaveOtelConfig:
"""
Retrieves the Weave OpenTelemetry configuration based on environment variables.
Environment Variables:
WANDB_API_KEY: Required. W&B API key for authentication.
WANDB_PROJECT_ID: Required. Project ID in format <entity>/<project_name>.
WANDB_HOST: Optional. Custom Weave host URL. Defaults to cloud endpoint.
Returns:
WeaveOtelConfig: A Pydantic model containing Weave OTEL configuration.
Raises:
ValueError: If required environment variables are missing.
"""
api_key = os.getenv("WANDB_API_KEY")
project_id = os.getenv("WANDB_PROJECT_ID")
host = os.getenv("WANDB_HOST")
if not api_key:
raise ValueError("WANDB_API_KEY must be set for Weave OpenTelemetry integration.")
if not project_id:
raise ValueError(
"WANDB_PROJECT_ID must be set for Weave OpenTelemetry integration. Format: <entity>/<project_name>"
)
if host:
if not host.startswith("http"):
host = "https://" + host
# Self-managed instances use a different path
endpoint = host.rstrip("/") + WEAVE_OTEL_ENDPOINT
verbose_logger.debug(f"Using Weave OTEL endpoint from host: {endpoint}")
else:
endpoint = WEAVE_BASE_URL + WEAVE_OTEL_ENDPOINT
verbose_logger.debug(f"Using Weave cloud endpoint: {endpoint}")
# Weave uses Basic auth with format: api:<WANDB_API_KEY>
auth_header = _get_weave_authorization_header(api_key=api_key)
otlp_auth_headers = f"Authorization={auth_header},project_id={project_id}"
# Set standard OTEL environment variables
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = endpoint
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = otlp_auth_headers
return WeaveOtelConfig(
otlp_auth_headers=otlp_auth_headers,
endpoint=endpoint,
project_id=project_id,
protocol="otlp_http",
)
def set_weave_otel_attributes(span: Span, kwargs: dict[str, Any], response_obj: Any):
"""
Sets OpenTelemetry span attributes for Weave observability.
Uses the same attribute setting logic as other OTEL integrations for consistency.
"""
_utils.set_attributes(span, kwargs, response_obj, WeaveLLMObsOTELAttributes)
_set_weave_specific_attributes(span=span, kwargs=kwargs, response_obj=response_obj)
class WeaveOtelLogger(OpenTelemetry):
"""
Weave (W&B) OpenTelemetry Logger for LiteLLM.
Sends LLM traces to Weave via the OpenTelemetry Protocol (OTLP).
Environment Variables:
WANDB_API_KEY: Required. Weights & Biases API key for authentication.
WANDB_PROJECT_ID: Required. Project ID in format <entity>/<project_name>.
WANDB_HOST: Optional. Custom Weave host URL. Defaults to cloud endpoint.
Usage:
litellm.callbacks = ["weave_otel"]
Or manually:
from litellm.integrations.weave.weave_otel import WeaveOtelLogger
weave_logger = WeaveOtelLogger(callback_name="weave_otel")
litellm.callbacks = [weave_logger]
Reference:
https://docs.wandb.ai/weave/guides/tracking/otel
"""
def __init__(
self,
config: Optional[OpenTelemetryConfig] = None,
callback_name: Optional[str] = "weave_otel",
**kwargs,
):
"""
Initialize WeaveOtelLogger.
If config is not provided, automatically configures from environment variables
(WANDB_API_KEY, WANDB_PROJECT_ID, WANDB_HOST) via get_weave_otel_config().
"""
if config is None:
# Auto-configure from Weave environment variables
weave_config = get_weave_otel_config()
config = OpenTelemetryConfig(
exporter=weave_config.protocol,
endpoint=weave_config.endpoint,
headers=weave_config.otlp_auth_headers,
)
super().__init__(config=config, callback_name=callback_name, **kwargs)
def _maybe_log_raw_request(self, kwargs, response_obj, start_time, end_time, parent_span):
"""
Override to skip creating the raw_gen_ai_request child span.
For Weave, we only want a single span per LLM call. The parent span
already contains all the necessary attributes, so the child span
is redundant.
"""
pass
def _start_primary_span(
self,
kwargs,
response_obj,
start_time,
end_time,
context,
parent_span=None,
):
"""
Override to always create a child span instead of reusing the parent span.
This ensures that wrapper spans (like "B", "C", "D", "E") remain separate
from the LiteLLM LLM call spans, creating proper nesting in Weave.
"""
otel_tracer = self.get_tracer_to_use_for_request(kwargs)
# Always create a new child span, even if parent_span is provided
# This ensures wrapper spans remain separate from LLM call spans
span = otel_tracer.start_span(
name=self._get_span_name(kwargs),
start_time=self._to_ns(start_time),
context=context,
)
span.set_status(Status(StatusCode.OK))
self.set_attributes(span, kwargs, response_obj)
span.end(end_time=self._to_ns(end_time))
return span
def _handle_success(self, kwargs, response_obj, start_time, end_time):
"""
Override to prevent ending externally created parent spans.
When wrapper spans (like "B", "C", "D", "E") are provided as parent spans,
they should be managed by the user code, not ended by LiteLLM.
"""
verbose_logger.debug(
"Weave OpenTelemetry Logger: Logging kwargs: %s, OTEL config settings=%s",
kwargs,
self.config,
)
ctx, parent_span = self._get_span_context(kwargs)
# Always create a child span (handled by _start_primary_span override)
primary_span_parent = None
# 1. Primary span
span = self._start_primary_span(kwargs, response_obj, start_time, end_time, ctx, primary_span_parent)
# 2. Raw-request sub-span (skipped for Weave via _maybe_log_raw_request override)
self._maybe_log_raw_request(kwargs, response_obj, start_time, end_time, span)
# 3. Guardrail span
self._create_guardrail_span(kwargs=kwargs, context=ctx)
# 4. Metrics & cost recording
self._record_metrics(kwargs, response_obj, start_time, end_time)
# 5. Semantic logs.
if self.config.enable_events:
self._emit_semantic_logs(kwargs, response_obj, span)
# 6. Don't end parent span - it's managed by user code
# Since we always create a child span (never reuse parent), the parent span
# lifecycle is owned by the user. This prevents double-ending of wrapper spans
# like "B", "C", "D", "E" that users create and manage themselves.
def construct_dynamic_otel_headers(
self, standard_callback_dynamic_params: StandardCallbackDynamicParams
) -> dict | None:
"""
Construct dynamic Weave headers from standard callback dynamic params.
This is used for team/key based logging.
Returns:
dict: A dictionary of dynamic Weave headers
"""
dynamic_headers = {}
dynamic_wandb_api_key = standard_callback_dynamic_params.get("wandb_api_key")
dynamic_weave_project_id = standard_callback_dynamic_params.get("weave_project_id")
if dynamic_wandb_api_key:
auth_header = _get_weave_authorization_header(
api_key=dynamic_wandb_api_key,
)
dynamic_headers["Authorization"] = auth_header
if dynamic_weave_project_id:
dynamic_headers["project_id"] = dynamic_weave_project_id
return dynamic_headers if dynamic_headers else None

View File

@@ -75,6 +75,7 @@ class CustomLoggerRegistry:
"langfuse_otel": OpenTelemetry,
"arize_phoenix": OpenTelemetry,
"langtrace": OpenTelemetry,
"weave_otel": OpenTelemetry,
"mlflow": MlflowLogger,
"langfuse": LangfusePromptManagement,
"otel": OpenTelemetry,

View File

@@ -3802,6 +3802,31 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915
)
_in_memory_loggers.append(_otel_logger)
return _otel_logger # type: ignore
elif logging_integration == "weave_otel":
from litellm.integrations.opentelemetry import (
OpenTelemetryConfig,
)
from litellm.integrations.weave.weave_otel import WeaveOtelLogger, get_weave_otel_config
weave_otel_config = get_weave_otel_config()
otel_config = OpenTelemetryConfig(
exporter=weave_otel_config.protocol,
endpoint=weave_otel_config.endpoint,
headers=weave_otel_config.otlp_auth_headers,
)
for callback in _in_memory_loggers:
if (
isinstance(callback, WeaveOtelLogger)
and callback.callback_name == "weave_otel"
):
return callback # type: ignore
_otel_logger = WeaveOtelLogger(
config=otel_config, callback_name="weave_otel"
)
_in_memory_loggers.append(_otel_logger)
return _otel_logger # type: ignore
elif logging_integration == "pagerduty":
for callback in _in_memory_loggers:
if isinstance(callback, PagerDutyAlerting):

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from enum import Enum
from typing import Literal
from pydantic import BaseModel
class WeaveOtelConfig(BaseModel):
"""Configuration for Weave OpenTelemetry integration."""
otlp_auth_headers: str | None = None
endpoint: str | None = None
project_id: str | None = None
protocol: Literal["otlp_grpc", "otlp_http"] = "otlp_http"
class WeaveSpanAttributes(str, Enum):
"""
Weave-specific span attributes for OpenTelemetry traces.
Based on Weave's OTEL attribute mappings from:
https://github.com/wandb/weave/blob/master/weave/trace_server/opentelemetry/constants.py
"""
DISPLAY_NAME = "wandb.display_name"
# Thread organization, similar to OpenInference session_id.
THREAD_ID = "wandb.thread_id"
IS_TURN = "wandb.is_turn"

View File

@@ -2676,6 +2676,10 @@ class StandardCallbackDynamicParams(TypedDict, total=False):
posthog_api_key: Optional[str]
posthog_api_url: Optional[str]
# Weave (W&B) dynamic params
wandb_api_key: Optional[str]
weave_project_id: Optional[str]
# Logging settings
turn_off_message_logging: Optional[bool] # when true will not log messages
litellm_disabled_callbacks: Optional[List[str]]

2
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "aiofiles"

View File

@@ -0,0 +1,147 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from litellm.integrations.weave.weave_otel import (
_set_weave_specific_attributes,
get_weave_otel_config,
)
from litellm.types.integrations.weave_otel import WeaveOtelConfig, WeaveSpanAttributes
def test_get_weave_otel_config():
"""Test config creation with required env vars and error cases for missing vars."""
# Test successful config creation with required environment variables
with patch.dict(
os.environ,
{
"WANDB_API_KEY": "test_api_key",
"WANDB_PROJECT_ID": "test-entity/test-project",
},
clear=True,
):
config = get_weave_otel_config()
assert isinstance(config, WeaveOtelConfig)
assert config.protocol == "otlp_http"
assert config.project_id == "test-entity/test-project"
assert config.otlp_auth_headers is not None
assert "Authorization=" in config.otlp_auth_headers
assert "project_id=test-entity/test-project" in config.otlp_auth_headers
assert config.endpoint == "https://trace.wandb.ai/otel/v1/traces"
# Verify environment variables were set
assert os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] == "https://trace.wandb.ai/otel/v1/traces"
assert os.environ["OTEL_EXPORTER_OTLP_HEADERS"] == config.otlp_auth_headers
# Test ValueError when WANDB_API_KEY is missing
with patch.dict(os.environ, {"WANDB_PROJECT_ID": "test-entity/test-project"}, clear=True):
with pytest.raises(ValueError, match="WANDB_API_KEY must be set"):
get_weave_otel_config()
# Test ValueError when WANDB_PROJECT_ID is missing
with patch.dict(os.environ, {"WANDB_API_KEY": "test_api_key"}, clear=True):
with pytest.raises(ValueError, match="WANDB_PROJECT_ID must be set"):
get_weave_otel_config()
def test_get_weave_otel_config_with_custom_host():
"""Test config creation with custom WANDB_HOST."""
# Test with host that already has https://
with patch.dict(
os.environ,
{
"WANDB_API_KEY": "test_api_key",
"WANDB_PROJECT_ID": "test-entity/test-project",
"WANDB_HOST": "https://custom.wandb.io",
},
clear=True,
):
config = get_weave_otel_config()
assert config.endpoint == "https://custom.wandb.io/otel/v1/traces"
assert os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] == "https://custom.wandb.io/otel/v1/traces"
# Test with host without http:// or https://
with patch.dict(
os.environ,
{
"WANDB_API_KEY": "test_api_key",
"WANDB_PROJECT_ID": "test-entity/test-project",
"WANDB_HOST": "custom.wandb.io",
},
clear=True,
):
config = get_weave_otel_config()
assert config.endpoint == "https://custom.wandb.io/otel/v1/traces"
# Test with host with trailing slash
with patch.dict(
os.environ,
{
"WANDB_API_KEY": "test_api_key",
"WANDB_PROJECT_ID": "test-entity/test-project",
"WANDB_HOST": "https://custom.wandb.io/",
},
clear=True,
):
config = get_weave_otel_config()
assert config.endpoint == "https://custom.wandb.io/otel/v1/traces"
def test_set_weave_specific_attributes_display_name_from_metadata():
"""Test _set_weave_specific_attributes sets display_name from metadata."""
mock_span = MagicMock()
kwargs = {
"metadata": {"display_name": "custom-display-name"},
"model": "gpt-4",
}
with patch("litellm.integrations.weave.weave_otel.safe_set_attribute") as mock_safe_set:
_set_weave_specific_attributes(mock_span, kwargs, None)
# Should set display_name from metadata
mock_safe_set.assert_any_call(
mock_span, WeaveSpanAttributes.DISPLAY_NAME.value, "custom-display-name"
)
def test_set_weave_specific_attributes_display_name_from_model():
"""Test _set_weave_specific_attributes sets display_name from model when not in metadata."""
mock_span = MagicMock()
kwargs = {
"model": "openai/gpt-4o-mini",
"metadata": {},
}
with patch("litellm.integrations.weave.weave_otel.safe_set_attribute") as mock_safe_set:
_set_weave_specific_attributes(mock_span, kwargs, None)
# Should set display_name from model
mock_safe_set.assert_any_call(
mock_span, WeaveSpanAttributes.DISPLAY_NAME.value, "openai__gpt-4o-mini"
)
def test_set_weave_specific_attributes_thread_id_and_is_turn():
"""Test _set_weave_specific_attributes sets thread_id and is_turn from session_id."""
mock_span = MagicMock()
kwargs = {
"metadata": {"session_id": "session-123"},
}
with patch("litellm.integrations.weave.weave_otel.safe_set_attribute") as mock_safe_set:
_set_weave_specific_attributes(mock_span, kwargs, None)
# Should set thread_id and is_turn
mock_safe_set.assert_any_call(
mock_span, WeaveSpanAttributes.THREAD_ID.value, "session-123"
)
mock_safe_set.assert_any_call(
mock_span, WeaveSpanAttributes.IS_TURN.value, True
)