Use LiteLLM key alias as fallback Noma applicationId in NomaGuardrail (#16832)

* Use auth key name if there are no app id in in headers or in extra_data

* use key alias instead of key name

* Fix

* last priority key alias

* Fix

* Add tests
This commit is contained in:
idola9
2025-11-20 03:44:23 +02:00
committed by GitHub
parent d389b9dd66
commit e1005cb9d3
2 changed files with 152 additions and 3 deletions

View File

@@ -116,8 +116,9 @@ class NomaGuardrail(CustomGuardrail):
"NOMA_API_BASE", NomaGuardrail._DEFAULT_API_BASE
)
self.application_id = application_id or os.environ.get(
"NOMA_APPLICATION_ID", "litellm"
"NOMA_APPLICATION_ID"
)
self.default_application_id = "litellm"
if monitor_mode is None:
self.monitor_mode = (
@@ -799,7 +800,9 @@ class NomaGuardrail(CustomGuardrail):
or request_data.get("metadata", {})
.get("headers", {})
.get("x-noma-application-id")
or self.application_id,
or self.application_id
or user_auth.key_alias
or self.default_application_id,
"ipAddress": request_data.get("metadata", {}).get(
"requester_ip_address", None
),

View File

@@ -1,5 +1,6 @@
import copy
import os
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
@@ -164,6 +165,151 @@ class TestNomaGuardrailConfiguration:
assert result.block_failures is False
mock_add.assert_called_once_with(result)
class TestNomaApplicationIdResolution:
"""Tests for determining which applicationId is sent to Noma."""
@staticmethod
def _clone_user_auth(user_auth: UserAPIKeyAuth) -> UserAPIKeyAuth:
return UserAPIKeyAuth(**user_auth.model_dump())
@staticmethod
def _clone_request_data(request_data: dict) -> dict:
return copy.deepcopy(request_data)
@staticmethod
async def _get_application_id(
guardrail: NomaGuardrail,
user_auth: UserAPIKeyAuth,
request_data: dict,
extra_data: dict,
) -> str:
mock_response = MagicMock()
mock_response.json.return_value = {"aggregatedScanResult": False, "scanResult": []}
mock_response.raise_for_status = MagicMock()
mock_post = AsyncMock(return_value=mock_response)
with patch.object(guardrail.async_handler, "post", mock_post):
await guardrail._call_noma_api(
payload={"input": []},
llm_request_id=None,
request_data=request_data,
user_auth=user_auth,
extra_data=extra_data,
)
sent_payload = mock_post.call_args.kwargs["json"]
return sent_payload["x-noma-context"]["applicationId"]
@pytest.mark.asyncio
async def test_application_id_prefers_extra_body(
self, noma_guardrail, mock_user_api_key_dict, mock_request_data
):
request_data = self._clone_request_data(mock_request_data)
request_data.setdefault("metadata", {}).setdefault(
"headers", {}
)["x-noma-application-id"] = "header-app"
user_auth = self._clone_user_auth(mock_user_api_key_dict)
user_auth.key_alias = "alias-app"
application_id = await self._get_application_id(
guardrail=noma_guardrail,
user_auth=user_auth,
request_data=request_data,
extra_data={"application_id": "dynamic-app"},
)
assert application_id == "dynamic-app"
@pytest.mark.asyncio
async def test_application_id_prefers_header_over_alias(
self, noma_guardrail, mock_user_api_key_dict, mock_request_data
):
request_data = self._clone_request_data(mock_request_data)
request_data.setdefault("metadata", {}).setdefault(
"headers", {}
)["x-noma-application-id"] = "header-app"
user_auth = self._clone_user_auth(mock_user_api_key_dict)
user_auth.key_alias = "alias-app"
original_app_id = noma_guardrail.application_id
noma_guardrail.application_id = None
try:
application_id = await self._get_application_id(
guardrail=noma_guardrail,
user_auth=user_auth,
request_data=request_data,
extra_data={},
)
finally:
noma_guardrail.application_id = original_app_id
assert application_id == "header-app"
@pytest.mark.asyncio
async def test_application_id_uses_guardrail_config_before_alias(
self, noma_guardrail, mock_user_api_key_dict, mock_request_data
):
request_data = self._clone_request_data(mock_request_data)
user_auth = self._clone_user_auth(mock_user_api_key_dict)
user_auth.key_alias = "alias-app"
original_app_id = noma_guardrail.application_id
noma_guardrail.application_id = "config-app"
try:
application_id = await self._get_application_id(
guardrail=noma_guardrail,
user_auth=user_auth,
request_data=request_data,
extra_data={},
)
finally:
noma_guardrail.application_id = original_app_id
assert application_id == "config-app"
@pytest.mark.asyncio
async def test_application_id_falls_back_to_alias(
self, noma_guardrail, mock_user_api_key_dict, mock_request_data
):
request_data = self._clone_request_data(mock_request_data)
user_auth = self._clone_user_auth(mock_user_api_key_dict)
user_auth.key_alias = "alias-app"
original_app_id = noma_guardrail.application_id
noma_guardrail.application_id = None
try:
application_id = await self._get_application_id(
guardrail=noma_guardrail,
user_auth=user_auth,
request_data=request_data,
extra_data={},
)
finally:
noma_guardrail.application_id = original_app_id
assert application_id == "alias-app"
@pytest.mark.asyncio
async def test_application_id_defaults_to_litellm(
self, noma_guardrail, mock_user_api_key_dict, mock_request_data
):
request_data = self._clone_request_data(mock_request_data)
user_auth = self._clone_user_auth(mock_user_api_key_dict)
user_auth.key_alias = None
original_app_id = noma_guardrail.application_id
noma_guardrail.application_id = None
try:
application_id = await self._get_application_id(
guardrail=noma_guardrail,
user_auth=user_auth,
request_data=request_data,
extra_data={},
)
finally:
noma_guardrail.application_id = original_app_id
assert application_id == "litellm"
class TestNomaBlockedMessage:
"""Test the NomaBlockedMessage exception class"""