(feat) Organizations: allow org admins to create teams on UI + (feat) IBM Guardrails (#15924)

* fix(oldteams.tsx): allow org admin to create team on ui

* fix(oldteams.tsx): show org admin a dropdown of allowed orgs for team creation

* docs(access_control.md): cleanup doc

* feat(ibm_guardrails/): initial commit adding support for ibm guardrails on litellm

allows user to use self-hosted ibm guardrails

* feat(ibm_detector.py): working detector

* docs(ibm_guardrails.md): document new ibm guardrails

* fix: fix linting errors
This commit is contained in:
Krish Dholakia
2025-10-25 11:13:39 -07:00
committed by GitHub
parent ec6c166548
commit ddacaf6c32
16 changed files with 2946 additions and 66 deletions

View File

@@ -1,25 +1,325 @@
import Image from '@theme/IdealImage';
# Role-based Access Controls (RBAC)
Role-based access control (RBAC) is based on Organizations, Teams and Internal User Roles
<Image img={require('../../img/litellm_user_heirarchy.png')} style={{ width: '100%', maxWidth: '4000px' }} />
- `Organizations` are the top-level entities that contain Teams.
- `Team` - A Team is a collection of multiple `Internal Users`
- `Internal Users` - users that can create keys, make LLM API calls, view usage on LiteLLM. Users can be on multiple teams.
- `Roles` define the permissions of an `Internal User`
- `Virtual Keys` - Keys are used for authentication to the LiteLLM API. Keys are tied to a `Internal User` and `Team`
## Roles
| Role Type | Role Name | Permissions |
|-----------|-----------|-------------|
| **Admin** | `proxy_admin` | Admin over the platform |
| | `proxy_admin_viewer` | Can login, view all keys, view all spend. **Cannot** create keys/delete keys/add new users |
| **Organization** | `org_admin` | Admin over the organization. Can create teams and users within their organization |
| **Internal User** | `internal_user` | Can login, view/create/delete their own keys, view their spend. **Cannot** add new users |
| | `internal_user_viewer` | Can login, view their own keys, view their own spend. **Cannot** create/delete keys, add new users |
## User Roles
LiteLLM has two types of roles:
1. **Global Proxy Roles** - Platform-wide roles that apply across all organizations and teams
2. **Organization/Team Specific Roles** - Roles scoped to specific organizations or teams (**Premium Feature**)
### Global Proxy Roles
| Role Name | Permissions |
|-----------|-------------|
| `proxy_admin` | Admin over the entire platform. Full control over all organizations, teams, and users |
| `proxy_admin_viewer` | Can login, view all keys, view all spend across the platform. **Cannot** create keys/delete keys/add new users |
| `internal_user` | Can login, view/create (when allowed by team-specific permissions)/delete their own keys, view their spend. **Cannot** add new users |
| `internal_user_viewer` | ⚠️ **DEPRECATED** - Use team/org specific roles instead. Can login, view their own keys, view their own spend. **Cannot** create/delete keys, add new users |
### Organization/Team Specific Roles
| Role Name | Permissions |
|-----------|-------------|
| `org_admin` | Admin over a specific organization. Can create teams and users within their organization ✨ **Premium Feature** |
| `team_admin` | Admin over a specific team. Can manage team members, update team settings, and create keys for their team. ✨ **Premium Feature** |
## What Can Each Role Do?
Here's what each role can actually do. Think of it like levels of access.
---
## Global Proxy Roles
These roles apply across the entire LiteLLM platform, regardless of organization or team boundaries.
### Proxy Admin - Full Access
The proxy admin controls everything. They're like the owner of the whole platform.
**What they can do:**
- Create and manage all organizations
- Create and manage all teams (across all organizations)
- Create and manage all users
- View all spend and usage across the platform
- Create and delete keys for anyone
- Update team budgets, rate limits, and models
- Manage team members and assign roles
**Who should be a proxy admin:** Only the people running the LiteLLM instance.
---
### Proxy Admin Viewer - Platform-Wide Read Access
The proxy admin viewer can see everything across the platform but cannot make changes.
**What they can do:**
- View all organizations, teams, and users
- View all spend and usage across the platform
- View all API keys
- Login to the admin dashboard
**What they cannot do:**
- Create or delete keys
- Add or remove users
- Modify budgets, rate limits, or settings
- Make any changes to the platform
**Who should be a proxy admin viewer:** Finance teams, auditors, or stakeholders who need platform-wide visibility without modification rights.
---
### Internal User
An internal user can create API keys (when allowed by team-specific permissions) and make calls. They see their own stuff only. They can become a team admin or org admin if they are assigned the respective roles.
**What they can do:**
- Create API keys for themselves
- Delete their own API keys
- View their own spend and usage
- Make API calls using their keys
**Who should be an internal user:** Anyone who needs UI access for team/org specific operations **OR** for developers you plan to give multiple keys to.
---
### Internal User Viewer - Read-Only Access
:::warning DEPRECATED
This role is deprecated in favor of team/org specific roles. Use `org_admin` or `team_admin` roles for better granular control over user permissions within organizations and teams.
:::
An internal user viewer can view their own information but cannot create or delete keys.
**What they can do:**
- View their own API keys
- View their own spend and usage
- Login to see their dashboard
**What they cannot do:**
- Create or delete API keys
- Make changes to any settings
- Create teams or add users
- View other people's information
**Who should be an internal user viewer (deprecated):** Consider using team/org specific roles instead for better access control.
---
## Organization/Team Specific Roles
:::info
Organization/Team specific roles are premium features. You need to be a LiteLLM Enterprise user to use them. [Get a 7 day trial here](https://www.litellm.ai/#trial).
:::
These roles are scoped to specific organizations or teams. Users with these roles can only manage resources within their assigned organization or team.
### Org Admin - Organization Level Access
An org admin manages one or more organizations. They can create teams within their organization but can't touch other organizations.
**What they can do:**
- Create teams within their organization
- Add users to teams in their organization
- View spend for their organization
- Create keys for users in their organization
**What they cannot do:**
- Create or manage other organizations
- Modify org budgets / rate limits
- Modify org allowed models (e.g. adding a proxy-level model to the org)
**Who should be an org admin:** Department leads or managers who need to manage multiple teams.
---
### Team Admin - Team Level Access
**This is a Premium Feature**
A team admin manages a specific team. They're like a team lead who can add people, update settings, but only for their team.
**What they can do:**
- Add or remove team members from their team
- Update team members' budgets and rate limits within the team
- Change team settings (budget, rate limits, models)
- Create and delete keys for team members
- Onboard a [team-BYOK](./team_model_add) model to LiteLLM (e.g. onboarding a team's finetuned model)
- Configure [team member permissions](#team-member-permissions) to control what regular team members can do
**What they cannot do:**
- Create new teams
- Modify team's budget / rate limits
- Add/remove global proxy models to their team
**Who should be a team admin:** Team leads who need to manage their team's API access without bothering IT.
:::info How to create a team admin
You need to be a LiteLLM Enterprise user to assign team admins. [Get a 7 day trial here](https://www.litellm.ai/#trial).
```shell
curl -X POST 'http://0.0.0.0:4000/team/member_add' \
-H 'Authorization: Bearer sk-1234' \
-H 'Content-Type: application/json' \
-d '{"team_id": "team-123", "member": {"role": "admin", "user_id": "user@company.com"}}'
```
:::
---
## Team Member Permissions
**This is a Premium Feature**
Team member permissions allow you to control what regular team members (with role=`user`) can do with API keys in their team. By default, team members can only view key information, but you can grant them additional permissions to create, update, or delete keys.
### How It Works
- **Applies to**: Team members with role=`user` (not team admins or org admins)
- **Scope**: Permissions only apply to keys belonging to their team
- **Configuration**: Set at the team level using `team_member_permissions`
- **Override**: Team admins and org admins always have full permissions regardless of these settings
### Available Permissions
| Permission | Method | Description |
|-----------|--------|-------------|
| `/key/info` | GET | View information about virtual keys in the team |
| `/key/health` | GET | Check health status of virtual keys in the team |
| `/key/list` | GET | List all virtual keys belonging to the team |
| `/key/generate` | POST | Create new virtual keys for the team |
| `/key/service-account/generate` | POST | Create service account keys (not tied to a specific user) for the team |
| `/key/update` | POST | Modify existing virtual keys in the team |
| `/key/delete` | POST | Delete virtual keys belonging to the team |
| `/key/regenerate` | POST | Regenerate virtual keys in the team |
| `/key/block` | POST | Block virtual keys in the team |
| `/key/unblock` | POST | Unblock virtual keys in the team |
### Default Permissions
By default, team members can only:
- `/key/info` - View key information
- `/key/health` - Check key health
### Common Permission Scenarios
**Read-only access** (default):
```json
["/key/info", "/key/health"]
```
**Allow key creation but not deletion**:
```json
["/key/info", "/key/health", "/key/generate", "/key/update"]
```
**Full key management**:
```json
["/key/info", "/key/health", "/key/generate", "/key/update", "/key/delete", "/key/regenerate", "/key/block", "/key/unblock", "/key/list"]
```
### How to Configure Team Member Permissions
#### View Current Permissions
```shell
curl --location 'http://0.0.0.0:4000/team/permissions_list?team_id=team-123' \
--header 'Authorization: Bearer sk-1234'
```
Expected Response:
```json
{
"team_id": "team-123",
"team_member_permissions": ["/key/info", "/key/health"],
"all_available_permissions": ["/key/generate", "/key/update", "/key/delete", ...]
}
```
#### Update Team Member Permissions
```shell
curl --location 'http://0.0.0.0:4000/team/update' \
--header 'Authorization: Bearer sk-1234' \
--header 'Content-Type: application/json' \
--data '{
"team_id": "team-123",
"team_member_permissions": ["/key/info", "/key/health", "/key/generate", "/key/update"]
}'
```
This allows team members to:
- View key information
- Create new keys
- Update existing keys
- But NOT delete keys
### Who Can Configure These Permissions?
- **Proxy Admin**: Can configure permissions for any team
- **Org Admin**: Can configure permissions for teams in their organization
- **Team Admin**: Can configure permissions for their own team
---
## Quick Comparison
Here's the quick version:
### Global Proxy Roles
| Action | Proxy Admin | Proxy Admin Viewer | Internal User | Internal User Viewer ⚠️ (Deprecated) |
|--------|-------------|-------------------|---------------|-------------------------------------|
| Create organizations | ✅ | ❌ | ❌ | ❌ |
| Create teams | ✅ | ❌ | ❌ | ❌ |
| Manage all teams | ✅ | ❌ | ❌ | ❌ |
| Create/delete any keys | ✅ | ❌ | ❌ | ❌ |
| Create/delete own keys | ✅ | ❌ | ✅ | ❌ |
| View all platform spend | ✅ | ✅ | ❌ | ❌ |
| View own spend | ✅ | ✅ | ✅ | ✅ |
| View all keys | ✅ | ✅ | ❌ | ❌ |
| View own keys | ✅ | ✅ | ✅ | ✅ |
| Add/remove users | ✅ | ❌ | ❌ | ❌ |
> **Note:** The `internal_user_viewer` role is deprecated. Use team/org specific roles for better granular access control.
### Organization/Team Specific Roles
| Action | Org Admin | Team Admin |
|--------|-----------|------------|
| Create teams (in their org) | ✅ | ❌ |
| Manage teams in their org | ✅ | ❌ |
| Manage their specific team | ✅ | ✅ |
| Add/remove team members | ✅ (in their org) | ✅ (their team only) |
| Update team budgets | ✅ (in their org) | ✅ (their team only) |
| Create keys for team members | ✅ (in their org) | ✅ (their team only) |
| View organization spend | ✅ (their org) | ❌ |
| View team spend | ✅ (in their org) | ✅ (their team) |
| Create organizations | ❌ | ❌ |
| View all platform spend | ❌ | ❌ |
## Onboarding Organizations
**This is a Premium Feature**
### 1. Creating a new Organization
Any user with role=`proxy_admin` can create a new organization
@@ -124,18 +424,79 @@ Expected Response
```
### `Organization Admin` - Add an `Internal User`
### 4. `Organization Admin` - Add a Team Admin
The organization admin will use the virtual key created in [step 2](#2-adding-an-org_admin-to-an-organization) to add an Internal User to the `engineering_team` Team.
**This is a Premium Feature**
- We will assign role=`internal_user` so the user can create Virtual Keys for themselves
The organization admin can now add a team admin who will manage the `engineering_team`.
- We assign role=`admin` to make them a team admin for this specific team
- `team_id` is from [step 3](#3-organization-admin---create-a-team)
```shell
curl -X POST 'http://0.0.0.0:4000/team/member_add' \
-H 'Authorization: Bearer sk-1234' \
-H 'Authorization: Bearer sk-7shH8TGMAofR4zQpAAo6kQ' \
-H 'Content-Type: application/json' \
-d '{"team_id": "01044ee8-441b-45f4-be7d-c70e002722d8", "member": {"role": "internal_user", "user_id": "krrish@berri.ai"}}'
-d '{"team_id": "01044ee8-441b-45f4-be7d-c70e002722d8", "member": {"role": "admin", "user_id": "john@company.com"}}'
```
Now `john@company.com` is a team admin. They can manage the `engineering_team` - add members, update budgets, create keys - but they can't touch other teams.
Create a Virtual Key for the team admin:
```shell
curl --location 'http://0.0.0.0:4000/key/generate' \
--header 'Authorization: Bearer sk-7shH8TGMAofR4zQpAAo6kQ' \
--header 'Content-Type: application/json' \
--data '{"user_id": "john@company.com"}'
```
Expected Response:
```json
{
"models": [],
"user_id": "john@company.com",
"key": "sk-TeamAdminKey123",
"key_name": "sk-...Key123"
}
```
### 5. `Team Admin` - Add Team Members
Now the team admin can use their key to add team members without needing to ask the org admin.
```shell
curl -X POST 'http://0.0.0.0:4000/team/member_add' \
-H 'Authorization: Bearer sk-TeamAdminKey123' \
-H 'Content-Type: application/json' \
-d '{"team_id": "01044ee8-441b-45f4-be7d-c70e002722d8", "member": {"role": "user", "user_id": "krrish@berri.ai"}}'
```
The team admin can also create keys for their team members:
```shell
curl --location 'http://0.0.0.0:4000/key/generate' \
--header 'Authorization: Bearer sk-TeamAdminKey123' \
--header 'Content-Type: application/json' \
--data '{
"user_id": "krrish@berri.ai",
"team_id": "01044ee8-441b-45f4-be7d-c70e002722d8"
}'
```
### 6. `Team Admin` - Update Team Settings
The team admin can update team budgets and rate limits:
```shell
curl --location 'http://0.0.0.0:4000/team/update' \
--header 'Authorization: Bearer sk-TeamAdminKey123' \
--header 'Content-Type: application/json' \
--data '{
"team_id": "01044ee8-441b-45f4-be7d-c70e002722d8",
"max_budget": 100,
"rpm_limit": 1000
}'
```

View File

@@ -0,0 +1,226 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# IBM Guardrails
LiteLLM works with IBM's FMS Guardrails for content safety. You can use it to detect jailbreaks, PII, hate speech, and more.
## What it does
IBM Guardrails analyzes text and tells you if it contains things you want to avoid. It gives each detection a score. Higher scores mean it's more confident.
You can run these checks:
- Before sending to the LLM (on user input)
- After getting LLM response (on output)
- During the call (parallel to LLM)
## Quick Start
### 1. Add to your config.yaml
```yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: openai/gpt-3.5-turbo
api_key: os.environ/OPENAI_API_KEY
guardrails:
- guardrail_name: ibm-jailbreak-detector
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "jailbreak-detector"
is_detector_server: true
default_on: true
optional_params:
score_threshold: 0.8
block_on_detection: true
```
### 2. Set your auth token
```bash
export IBM_GUARDRAILS_AUTH_TOKEN="your-token"
```
### 3. Start the proxy
```shell
litellm --config config.yaml --detailed_debug
```
### 4. Make a request
```shell
curl -i http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer sk-1234" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"guardrails": ["ibm-jailbreak-detector"]
}'
```
## Configuration
### Required params
- `guardrail` - str - Set to `ibm_guardrails`
- `auth_token` - str - Your IBM Guardrails auth token. Can use `os.environ/IBM_GUARDRAILS_AUTH_TOKEN`
- `base_url` - str - URL of your IBM Guardrails server
- `detector_id` - str - Which detector to use (e.g., "jailbreak-detector", "pii-detector")
### Optional params
- `mode` - str or list[str] - When to run. Options: `pre_call`, `post_call`, `during_call`. Default: `pre_call`
- `default_on` - bool - Run automatically without specifying in request. Default: `false`
- `is_detector_server` - bool - `true` for detector server, `false` for orchestrator. Default: `true`
- `verify_ssl` - bool - Whether to verify SSL certificates. Default: `true`
### optional_params
These go under `optional_params`:
- `detector_params` - dict - Parameters to pass to your detector
- `score_threshold` - float - Only count detections above this score (0.0 to 1.0)
- `block_on_detection` - bool - Block the request when violations found. Default: `true`
## Server Types
IBM Guardrails has two APIs you can use:
### Detector Server (recommended)
The simpler one. Sends all messages at once.
```yaml
guardrails:
- guardrail_name: ibm-detector
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "jailbreak-detector"
is_detector_server: true # Use detector server
```
### Orchestrator
If you're using the IBM FMS Guardrails Orchestrator, you can use this.
```yaml
guardrails:
- guardrail_name: ibm-orchestrator
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-orchestrator-server.com"
detector_id: "jailbreak-detector"
is_detector_server: false # Use orchestrator
```
## Examples
### Check for jailbreaks on input
```yaml
guardrails:
- guardrail_name: jailbreak-check
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "jailbreak-detector"
is_detector_server: true
default_on: true
optional_params:
score_threshold: 0.8
```
### Check for PII in responses
```yaml
guardrails:
- guardrail_name: pii-check
litellm_params:
guardrail: ibm_guardrails
mode: post_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "pii-detector"
is_detector_server: true
optional_params:
score_threshold: 0.5 # Lower threshold for PII
block_on_detection: true
```
### Run multiple detectors
```yaml
guardrails:
- guardrail_name: jailbreak-check
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "jailbreak-detector"
is_detector_server: true
- guardrail_name: pii-check
litellm_params:
guardrail: ibm_guardrails
mode: post_call
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "pii-detector"
is_detector_server: true
```
Then in your request:
```shell
curl -i http://localhost:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer sk-1234" \
-d '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}],
"guardrails": ["jailbreak-check", "pii-check"]
}'
```
## How detection works
When IBM Guardrails finds something, it returns details about what it found:
```json
{
"start": 0,
"end": 31,
"text": "You are now in Do Anything Mode",
"detection_type": "jailbreak",
"score": 0.858
}
```
- `score` - How confident it is (0.0 to 1.0)
- `text` - The specific text that triggered it
- `detection_type` - What kind of violation
If the score is above your `score_threshold`, the request gets blocked (if `block_on_detection` is true).
## Further Reading
- [Control Guardrails per API Key](./quick_start#-control-guardrails-per-api-key)
- [IBM FMS Guardrails on GitHub](https://github.com/foundation-model-stack/fms-guardrails-orchestr8)

View File

@@ -11,3 +11,9 @@ LiteLLM supports a hierarchy of users, teams, organizations, and budgets.
- Teams can have multiple users. [API Reference](https://litellm-api.up.railway.app/#/team%20management)
- Users can have multiple keys, and be on multiple teams. [API Reference](https://litellm-api.up.railway.app/#/budget%20management)
- Keys can belong to either a team or a user. [API Reference](https://litellm-api.up.railway.app/#/end-user%20management)
:::info
See [Access Control](./access_control) for more details on roles and permissions.
:::

View File

@@ -37,6 +37,7 @@ const sidebars = {
"proxy/guardrails/azure_content_guardrail",
"proxy/guardrails/bedrock",
"proxy/guardrails/enkryptai",
"proxy/guardrails/ibm_guardrails",
"proxy/guardrails/grayswan",
"proxy/guardrails/lasso_security",
"proxy/guardrails/guardrails_ai",

View File

@@ -0,0 +1,199 @@
# IBM Guardrails Detector Integration
This integration provides support for IBM's FMS Guardrails detectors in LiteLLM. It supports both direct detector server calls and calls via the FMS Guardrails Orchestrator.
## Features
- Support for IBM Detector Server API (`/api/v1/text/contents`)
- Support for FMS Guardrails Orchestrator API (`/api/v2/text/detection/content`)
- Configurable score thresholds for filtering detections
- SSL verification control
- Batch processing for detector server (multiple messages at once)
- Pre-call, post-call, and during-call modes
- Detailed error messages with detection scores and types
## Configuration
### Required Parameters
- `auth_token`: Authorization bearer token for IBM Guardrails API
- `base_url`: Base URL of the detector server or orchestrator
- `detector_id`: Name of the detector (e.g., "jailbreak-detector", "pii-detector")
### Optional Parameters
- `is_detector_server` (default: `true`): Whether to use detector server (true) or orchestrator (false)
- `verify_ssl` (default: `true`): Whether to verify SSL certificates
- `detector_params` (default: `{}`): Dictionary of parameters to pass to the detector
- `score_threshold` (default: `None`): Minimum score (0.0-1.0) to consider a detection as a violation
- `block_on_detection` (default: `true`): Whether to block requests when detections are found
## Usage Examples
### Example 1: Detector Server (Pre-call)
```yaml
guardrails:
- guardrail_name: "ibm-jailbreak-detector"
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
default_on: true
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "jailbreak-detector"
is_detector_server: true
optional_params:
score_threshold: 0.8
block_on_detection: true
```
### Example 2: FMS Orchestrator (Post-call)
```yaml
guardrails:
- guardrail_name: "ibm-content-safety"
litellm_params:
guardrail: ibm_guardrails
mode: post_call
default_on: true
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-orchestrator-server.com"
detector_id: "jailbreak-detector"
is_detector_server: false
```
### Example 3: Python Usage
```python
from litellm.proxy.guardrails.guardrail_hooks.ibm_guardrails import IBMGuardrailDetector
# Initialize the guardrail
guardrail = IBMGuardrailDetector(
guardrail_name="ibm-detector",
auth_token="your-auth-token",
base_url="https://your-detector-server.com",
detector_id="jailbreak-detector",
is_detector_server=True,
score_threshold=0.8,
event_hook="pre_call"
)
```
## API Endpoints
### Detector Server Endpoint
- **URL**: `{base_url}/api/v1/text/contents`
- **Method**: POST
- **Headers**:
- `Authorization: Bearer {auth_token}`
- `detector-id: {detector_id}`
- `content-type: application/json`
- **Body**:
```json
{
"contents": ["text1", "text2"],
"detector_params": {}
}
```
### Orchestrator Endpoint
- **URL**: `{base_url}/api/v2/text/detection/content`
- **Method**: POST
- **Headers**:
- `Authorization: Bearer {auth_token}`
- `content-type: application/json`
- **Body**:
```json
{
"content": "text to analyze",
"detectors": {
"detector-id": {}
}
}
```
## Response Format
### Detector Server Response
Returns a list of lists, where each top-level list corresponds to a message:
```json
[
[
{
"start": 0,
"end": 31,
"text": "You are now in Do Anything Mode",
"detection": "single_label_classification",
"detection_type": "jailbreak",
"score": 0.8586854338645935,
"evidences": [],
"metadata": {}
}
],
[]
]
```
### Orchestrator Response
Returns a dictionary with a list of detections:
```json
{
"detections": [
{
"start": 0,
"end": 31,
"text": "You are now in Do Anything Mode",
"detection": "single_label_classification",
"detection_type": "jailbreak",
"detector_id": "jailbreak-detector",
"score": 0.8586854338645935
}
]
}
```
## Supported Event Hooks
- `pre_call`: Run guardrail before LLM API call (on input)
- `post_call`: Run guardrail after LLM API call (on output)
- `during_call`: Run guardrail in parallel with LLM API call (on input)
## Error Handling
When violations are detected and `block_on_detection` is `true`, the guardrail raises a `ValueError` with details:
```
IBM Guardrail Detector failed: 1 violation(s) detected
Message 1:
- JAILBREAK (score: 0.859)
Text: 'You are now in Do Anything Mode'
```
## References
- [IBM FMS Guardrails Documentation](https://github.com/foundation-model-stack/fms-guardrails-orchestr8)
- [Detector API Gist](https://gist.github.com/RobGeada/fa886a6c723f06dee6becb583566d748)
- [LiteLLM Guardrails Documentation](https://docs.litellm.ai/docs/proxy/guardrails)
## Environment Variables
- `IBM_GUARDRAILS_AUTH_TOKEN`: Default auth token if not specified in config
## Common Detector Types
- `jailbreak-detector`: Detects jailbreak attempts
- `pii-detector`: Detects personally identifiable information
- `toxicity-detector`: Detects toxic content
- `prompt-injection-detector`: Detects prompt injection attacks
## Notes
- The detector server allows batch processing of multiple messages in a single request
- The orchestrator processes one message at a time
- Score thresholds can be adjusted per detector based on sensitivity requirements
- SSL verification can be disabled for development/testing environments (not recommended for production)

View File

@@ -0,0 +1,62 @@
from typing import TYPE_CHECKING
from litellm.types.guardrails import SupportedGuardrailIntegrations
from .ibm_detector import IBMGuardrailDetector
if TYPE_CHECKING:
from litellm.types.guardrails import Guardrail, LitellmParams
def initialize_guardrail(litellm_params: "LitellmParams", guardrail: "Guardrail"):
import litellm
if not litellm_params.auth_token:
raise ValueError("IBM Guardrails: auth_token is required")
if not litellm_params.base_url:
raise ValueError("IBM Guardrails: base_url is required")
if not litellm_params.detector_id:
raise ValueError("IBM Guardrails: detector_id is required")
guardrail_name = guardrail.get("guardrail_name")
if not guardrail_name:
raise ValueError("IBM Guardrails: guardrail_name is required")
# Get optional params
detector_params = getattr(litellm_params, "detector_params", {})
score_threshold = getattr(litellm_params, "score_threshold", None)
block_on_detection = getattr(litellm_params, "block_on_detection", True)
verify_ssl = getattr(litellm_params, "verify_ssl", True)
is_detector_server = litellm_params.is_detector_server
if is_detector_server is None:
is_detector_server = True
ibm_guardrail = IBMGuardrailDetector(
guardrail_name=guardrail_name,
auth_token=litellm_params.auth_token,
base_url=litellm_params.base_url,
detector_id=litellm_params.detector_id,
is_detector_server=is_detector_server,
detector_params=detector_params,
score_threshold=score_threshold,
block_on_detection=block_on_detection,
verify_ssl=verify_ssl,
default_on=litellm_params.default_on,
event_hook=litellm_params.mode,
)
litellm.logging_callback_manager.add_litellm_callback(ibm_guardrail)
return ibm_guardrail
guardrail_initializer_registry = {
SupportedGuardrailIntegrations.IBM_GUARDRAILS.value: initialize_guardrail,
}
guardrail_class_registry = {
SupportedGuardrailIntegrations.IBM_GUARDRAILS.value: IBMGuardrailDetector,
}
__all__ = ["IBMGuardrailDetector", "initialize_guardrail"]

View File

@@ -0,0 +1,95 @@
# Example LiteLLM Proxy configuration for IBM Guardrails Detector
# Based on IBM's FMS Guardrails: https://github.com/foundation-model-stack/fms-guardrails-orchestr8
# Example 1: Using IBM Detector Server directly
guardrails:
- guardrail_name: "ibm-jailbreak-detector"
litellm_params:
guardrail: ibm_guardrails
mode: pre_call # or post_call, during_call
default_on: true
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN # or hardcoded token
base_url: "https://your-detector-server.com" # Base URL of your detector server
detector_id: "jailbreak-detector" # Name of the detector (e.g., jailbreak-detector, pii-detector)
is_detector_server: true # true for detector server, false for orchestrator
verify_ssl: true # Optional: whether to verify SSL certificates (default: true)
optional_params:
detector_params: {} # Optional: parameters to pass to the detector
score_threshold: 0.8 # Optional: minimum score to consider a detection (0.0-1.0)
block_on_detection: true # Optional: whether to block when violations found (default: true)
# Example 2: Using IBM FMS Guardrails Orchestrator
- guardrail_name: "ibm-orchestrator-detector"
litellm_params:
guardrail: ibm_guardrails
mode: post_call
default_on: false
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-orchestrator-server.com"
detector_id: "jailbreak-detector"
is_detector_server: false # Use orchestrator instead of detector server
verify_ssl: true
optional_params:
detector_params:
# Detector-specific parameters can go here
some_param: "value"
score_threshold: 0.7
# Example 3: Pre-call guardrail with custom threshold
- guardrail_name: "ibm-pii-detector"
litellm_params:
guardrail: ibm_guardrails
mode: pre_call
default_on: true
auth_token: os.environ/IBM_GUARDRAILS_AUTH_TOKEN
base_url: "https://your-detector-server.com"
detector_id: "pii-detector"
is_detector_server: true
optional_params:
score_threshold: 0.5 # Lower threshold for PII detection
block_on_detection: true
# Usage with LiteLLM Proxy:
# 1. Set environment variable:
# export IBM_GUARDRAILS_AUTH_TOKEN="your-auth-token"
#
# 2. Start the proxy:
# litellm --config example_config.yaml
#
# 3. Make requests:
# The guardrail will automatically run based on the 'mode' setting
# API Response formats:
#
# Detector Server Response (returns list of lists):
# [
# [ # First message
# {
# "start": 0,
# "end": 31,
# "text": "You are now in Do Anything Mode",
# "detection": "single_label_classification",
# "detection_type": "jailbreak",
# "score": 0.8586854338645935,
# "evidences": [],
# "metadata": {}
# }
# ],
# [] # Second message (benign, no detections)
# ]
#
# Orchestrator Response:
# {
# "detections": [
# {
# "start": 0,
# "end": 31,
# "text": "You are now in Do Anything Mode",
# "detection": "single_label_classification",
# "detection_type": "jailbreak",
# "detector_id": "jailbreak-detector",
# "score": 0.8586854338645935
# }
# ]
# }

View File

@@ -0,0 +1,762 @@
# +-------------------------------------------------------------+
#
# Use IBM Guardrails Detector for your LLM calls
# Based on IBM's FMS Guardrails
#
# +-------------------------------------------------------------+
import os
from datetime import datetime
from typing import Any, AsyncGenerator, Dict, List, Literal, Optional, Union
from urllib.parse import urlencode
import httpx
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.caching.caching import DualCache
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.guardrails import GuardrailEventHooks
from litellm.types.proxy.guardrails.guardrail_hooks.ibm import (
IBMDetectorDetection,
IBMDetectorResponseOrchestrator,
)
from litellm.types.utils import GuardrailStatus, ModelResponseStream
GUARDRAIL_NAME = "ibm_guardrails"
class IBMGuardrailDetector(CustomGuardrail):
def __init__(
self,
guardrail_name: str = "ibm_detector",
auth_token: Optional[str] = None,
base_url: Optional[str] = None,
detector_id: Optional[str] = None,
is_detector_server: bool = True,
detector_params: Optional[Dict[str, Any]] = None,
score_threshold: Optional[float] = None,
block_on_detection: bool = True,
verify_ssl: bool = True,
**kwargs,
):
self.async_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.GuardrailCallback
)
# Set API configuration
self.auth_token = auth_token or os.getenv("IBM_GUARDRAILS_AUTH_TOKEN")
if not self.auth_token:
raise ValueError(
"IBM Guardrails auth token is required. Set IBM_GUARDRAILS_AUTH_TOKEN environment variable or pass auth_token parameter."
)
self.base_url = base_url
if not self.base_url:
raise ValueError(
"IBM Guardrails base_url is required. Pass base_url parameter."
)
self.detector_id = detector_id
if not self.detector_id:
raise ValueError(
"IBM Guardrails detector_id is required. Pass detector_id parameter."
)
self.is_detector_server = is_detector_server
self.detector_params = detector_params or {}
self.score_threshold = score_threshold
self.block_on_detection = block_on_detection
self.verify_ssl = verify_ssl
# Construct API URL based on server type
if self.is_detector_server:
self.api_url = f"{self.base_url}/api/v1/text/contents"
else:
self.api_url = f"{self.base_url}/api/v2/text/detection/content"
self.guardrail_name = guardrail_name
self.guardrail_provider = "ibm_guardrails"
# store kwargs as optional_params
self.optional_params = kwargs
# Set supported event hooks
if "supported_event_hooks" not in kwargs:
kwargs["supported_event_hooks"] = [
GuardrailEventHooks.pre_call,
GuardrailEventHooks.post_call,
GuardrailEventHooks.during_call,
]
super().__init__(guardrail_name=guardrail_name, **kwargs)
verbose_proxy_logger.debug(
"IBM Guardrail Detector initialized with guardrail_name: %s, detector_id: %s, is_detector_server: %s",
self.guardrail_name,
self.detector_id,
self.is_detector_server,
)
async def _call_detector_server(
self,
contents: List[str],
request_data: Optional[dict] = None,
) -> List[List[IBMDetectorDetection]]:
"""
Call IBM Detector Server directly.
Args:
contents: List of text strings to analyze
request_data: Optional request data for logging purposes
Returns:
List of lists where top-level list is per message in contents,
sublists are individual detections on that message
"""
start_time = datetime.now()
payload = {"contents": contents, "detector_params": self.detector_params}
headers = {
"Authorization": f"Bearer {self.auth_token}",
"content-type": "application/json",
}
query_params = {"detector_id": self.detector_id}
# update the api_url with the query params
self.api_url = f"{self.api_url}?{urlencode(query_params)}"
verbose_proxy_logger.debug(
"IBM Detector Server request to %s with payload: %s",
self.api_url,
payload,
)
try:
response = await self.async_handler.post(
url=self.api_url,
json=payload,
headers=headers,
)
response.raise_for_status()
response_json: List[List[IBMDetectorDetection]] = response.json()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Add guardrail information to request trace
if request_data:
guardrail_status = self._determine_guardrail_status_detector_server(
response_json
)
self.add_standard_logging_guardrail_information_to_request_data(
guardrail_provider=self.guardrail_provider,
guardrail_json_response={
"detections": [
[detection for detection in message_detections]
for message_detections in response_json
]
},
request_data=request_data,
guardrail_status=guardrail_status,
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
)
return response_json
except httpx.HTTPError as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
verbose_proxy_logger.error("IBM Detector Server request failed: %s", str(e))
# Add guardrail information with failure status
if request_data:
self.add_standard_logging_guardrail_information_to_request_data(
guardrail_provider=self.guardrail_provider,
guardrail_json_response={"error": str(e)},
request_data=request_data,
guardrail_status="guardrail_failed_to_respond",
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
)
raise
async def _call_orchestrator(
self,
content: str,
request_data: Optional[dict] = None,
) -> List[IBMDetectorDetection]:
"""
Call IBM FMS Guardrails Orchestrator.
Args:
content: Text string to analyze
request_data: Optional request data for logging purposes
Returns:
List of detections
"""
start_time = datetime.now()
payload = {
"content": content,
"detectors": {self.detector_id: self.detector_params},
}
headers = {
"Authorization": f"Bearer {self.auth_token}",
"content-type": "application/json",
}
verbose_proxy_logger.debug(
"IBM Orchestrator request to %s with payload: %s",
self.api_url,
payload,
)
try:
response = await self.async_handler.post(
url=self.api_url,
json=payload,
headers=headers,
)
response.raise_for_status()
response_json: IBMDetectorResponseOrchestrator = response.json()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# Add guardrail information to request trace
if request_data:
guardrail_status = self._determine_guardrail_status_orchestrator(
response_json
)
self.add_standard_logging_guardrail_information_to_request_data(
guardrail_provider=self.guardrail_provider,
guardrail_json_response=dict(response_json),
request_data=request_data,
guardrail_status=guardrail_status,
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
)
return response_json.get("detections", [])
except httpx.HTTPError as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
verbose_proxy_logger.error("IBM Orchestrator request failed: %s", str(e))
# Add guardrail information with failure status
if request_data:
self.add_standard_logging_guardrail_information_to_request_data(
guardrail_provider=self.guardrail_provider,
guardrail_json_response={"error": str(e)},
request_data=request_data,
guardrail_status="guardrail_failed_to_respond",
start_time=start_time.timestamp(),
end_time=end_time.timestamp(),
duration=duration,
)
raise
def _filter_detections_by_threshold(
self, detections: List[IBMDetectorDetection]
) -> List[IBMDetectorDetection]:
"""
Filter detections based on score threshold.
Args:
detections: List of detections
Returns:
Filtered list of detections that meet the threshold
"""
if self.score_threshold is None:
return detections
return [
detection
for detection in detections
if detection.get("score", 0.0) >= self.score_threshold
]
def _determine_guardrail_status_detector_server(
self, response_json: List[List[IBMDetectorDetection]]
) -> GuardrailStatus:
"""
Determine the guardrail status based on IBM Detector Server response.
Returns:
"success": Content allowed through with no violations
"guardrail_intervened": Content blocked due to detections
"guardrail_failed_to_respond": Technical error or API failure
"""
try:
if not isinstance(response_json, list):
return "guardrail_failed_to_respond"
# Check if any detections were found
has_detections = False
for message_detections in response_json:
if message_detections:
# Apply threshold filtering
filtered = self._filter_detections_by_threshold(message_detections)
if filtered:
has_detections = True
break
if has_detections:
return "guardrail_intervened"
return "success"
except Exception as e:
verbose_proxy_logger.error(
"Error determining IBM Detector Server guardrail status: %s", str(e)
)
return "guardrail_failed_to_respond"
def _determine_guardrail_status_orchestrator(
self, response_json: IBMDetectorResponseOrchestrator
) -> GuardrailStatus:
"""
Determine the guardrail status based on IBM Orchestrator response.
Returns:
"success": Content allowed through with no violations
"guardrail_intervened": Content blocked due to detections
"guardrail_failed_to_respond": Technical error or API failure
"""
try:
if not isinstance(response_json, dict):
return "guardrail_failed_to_respond"
detections = response_json.get("detections", [])
# Apply threshold filtering
filtered = self._filter_detections_by_threshold(detections)
if filtered:
return "guardrail_intervened"
return "success"
except Exception as e:
verbose_proxy_logger.error(
"Error determining IBM Orchestrator guardrail status: %s", str(e)
)
return "guardrail_failed_to_respond"
def _create_error_message_detector_server(
self, detections_list: List[List[IBMDetectorDetection]]
) -> str:
"""
Create a detailed error message from detector server response.
Args:
detections_list: List of lists of detections
Returns:
Formatted error message string
"""
total_detections = 0
error_message = "IBM Guardrail Detector failed:\n\n"
for idx, message_detections in enumerate(detections_list):
filtered_detections = self._filter_detections_by_threshold(
message_detections
)
if filtered_detections:
error_message += f"Message {idx + 1}:\n"
total_detections += len(filtered_detections)
for detection in filtered_detections:
detection_type = detection.get("detection_type", "unknown")
score = detection.get("score", 0.0)
text = detection.get("text", "")
error_message += (
f" - {detection_type.upper()} (score: {score:.3f})\n"
)
error_message += f" Text: '{text}'\n"
error_message += "\n"
error_message = (
f"IBM Guardrail Detector failed: {total_detections} violation(s) detected\n\n"
+ error_message
)
return error_message.strip()
def _create_error_message_orchestrator(
self, detections: List[IBMDetectorDetection]
) -> str:
"""
Create a detailed error message from orchestrator response.
Args:
detections: List of detections
Returns:
Formatted error message string
"""
filtered_detections = self._filter_detections_by_threshold(detections)
error_message = f"IBM Guardrail Detector failed: {len(filtered_detections)} violation(s) detected\n\n"
for detection in filtered_detections:
detection_type = detection.get("detection_type", "unknown")
detector_id = detection.get("detector_id", self.detector_id)
score = detection.get("score", 0.0)
text = detection.get("text", "")
error_message += f"- {detection_type.upper()} (detector: {detector_id}, score: {score:.3f})\n"
error_message += f" Text: '{text}'\n\n"
return error_message.strip()
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: Literal[
"completion",
"text_completion",
"embeddings",
"image_generation",
"moderation",
"audio_transcription",
"pass_through_endpoint",
"rerank",
"mcp_call",
"anthropic_messages",
],
) -> Union[Exception, str, dict, None]:
"""
Runs before the LLM API call
Runs on only Input
Use this if you want to MODIFY the input
"""
verbose_proxy_logger.debug("Running IBM Guardrail Detector pre-call hook")
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
event_type: GuardrailEventHooks = GuardrailEventHooks.pre_call
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
return data
_messages = data.get("messages")
if _messages:
contents_to_check: List[str] = []
for message in _messages:
_content = message.get("content")
if isinstance(_content, str):
contents_to_check.append(_content)
if contents_to_check:
if self.is_detector_server:
# Call detector server with all contents at once
result = await self._call_detector_server(
contents=contents_to_check,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Detector Server async_pre_call_hook result: %s", result
)
# Check if any detections were found
has_violations = False
for message_detections in result:
filtered = self._filter_detections_by_threshold(
message_detections
)
if filtered:
has_violations = True
break
if has_violations and self.block_on_detection:
error_message = self._create_error_message_detector_server(
result
)
raise ValueError(error_message)
else:
# Call orchestrator for each content separately
for content in contents_to_check:
orchestrator_result = await self._call_orchestrator(
content=content,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Orchestrator async_pre_call_hook result: %s",
orchestrator_result,
)
filtered = self._filter_detections_by_threshold(
orchestrator_result
)
if filtered and self.block_on_detection:
error_message = self._create_error_message_orchestrator(
orchestrator_result
)
raise ValueError(error_message)
# Add guardrail to applied guardrails header
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
return data
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: Literal[
"completion",
"embeddings",
"image_generation",
"moderation",
"audio_transcription",
"responses",
"mcp_call",
"anthropic_messages",
],
):
"""
Runs in parallel to LLM API call
Runs on only Input
This can NOT modify the input, only used to reject or accept a call before going to LLM API
"""
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
event_type: GuardrailEventHooks = GuardrailEventHooks.during_call
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
return
_messages = data.get("messages")
if _messages:
contents_to_check: List[str] = []
for message in _messages:
_content = message.get("content")
if isinstance(_content, str):
contents_to_check.append(_content)
if contents_to_check:
if self.is_detector_server:
# Call detector server with all contents at once
result = await self._call_detector_server(
contents=contents_to_check,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Detector Server async_moderation_hook result: %s", result
)
# Check if any detections were found
has_violations = False
for message_detections in result:
filtered = self._filter_detections_by_threshold(
message_detections
)
if filtered:
has_violations = True
break
if has_violations and self.block_on_detection:
error_message = self._create_error_message_detector_server(
result
)
raise ValueError(error_message)
else:
# Call orchestrator for each content separately
for content in contents_to_check:
orchestrator_result = await self._call_orchestrator(
content=content,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Orchestrator async_moderation_hook result: %s",
orchestrator_result,
)
filtered = self._filter_detections_by_threshold(
orchestrator_result
)
if filtered and self.block_on_detection:
error_message = self._create_error_message_orchestrator(
orchestrator_result
)
raise ValueError(error_message)
# Add guardrail to applied guardrails header
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
return data
async def async_post_call_success_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
response,
):
"""
Runs on response from LLM API call
It can be used to reject a response
Uses IBM Guardrails Detector to check the response for violations
"""
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
from litellm.types.guardrails import GuardrailEventHooks
if (
self.should_run_guardrail(
data=data, event_type=GuardrailEventHooks.post_call
)
is not True
):
return
verbose_proxy_logger.debug(
"async_post_call_success_hook response: %s", response
)
# Check if the ModelResponse has text content in its choices
# to avoid sending empty content to IBM Detector (e.g., during tool calls)
if isinstance(response, litellm.ModelResponse):
has_text_content = False
for choice in response.choices:
if isinstance(choice, litellm.Choices):
if choice.message.content and isinstance(
choice.message.content, str
):
has_text_content = True
break
if not has_text_content:
verbose_proxy_logger.warning(
"IBM Guardrail Detector: not running guardrail. No output text in response"
)
return
contents_to_check: List[str] = []
for choice in response.choices:
if isinstance(choice, litellm.Choices):
verbose_proxy_logger.debug(
"async_post_call_success_hook choice: %s", choice
)
if choice.message.content and isinstance(
choice.message.content, str
):
contents_to_check.append(choice.message.content)
if contents_to_check:
if self.is_detector_server:
# Call detector server with all contents at once
result = await self._call_detector_server(
contents=contents_to_check,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Detector Server async_post_call_success_hook result: %s",
result,
)
# Check if any detections were found
has_violations = False
for message_detections in result:
filtered = self._filter_detections_by_threshold(
message_detections
)
if filtered:
has_violations = True
break
if has_violations and self.block_on_detection:
error_message = self._create_error_message_detector_server(
result
)
raise ValueError(error_message)
else:
# Call orchestrator for each content separately
for content in contents_to_check:
orchestrator_result = await self._call_orchestrator(
content=content,
request_data=data,
)
verbose_proxy_logger.debug(
"IBM Orchestrator async_post_call_success_hook result: %s",
orchestrator_result,
)
filtered = self._filter_detections_by_threshold(
orchestrator_result
)
if filtered and self.block_on_detection:
error_message = self._create_error_message_orchestrator(
orchestrator_result
)
raise ValueError(error_message)
# Add guardrail to applied guardrails header
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
async def async_post_call_streaming_iterator_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
response: Any,
request_data: dict,
) -> AsyncGenerator[ModelResponseStream, None]:
"""
Passes the entire stream to the guardrail
This is useful for guardrails that need to see the entire response, such as PII masking.
Triggered by mode: 'post_call'
"""
async for item in response:
yield item
@staticmethod
def get_config_model():
from litellm.types.proxy.guardrails.guardrail_hooks.ibm import (
IBMDetectorGuardrailConfigModel,
)
return IBMDetectorGuardrailConfigModel

View File

@@ -11,6 +11,9 @@ from litellm.types.proxy.guardrails.guardrail_hooks.enkryptai import (
from litellm.types.proxy.guardrails.guardrail_hooks.grayswan import (
GraySwanGuardrailConfigModel,
)
from litellm.types.proxy.guardrails.guardrail_hooks.ibm import (
IBMGuardrailsBaseConfigModel,
)
"""
Pydantic object defining how to set guardrails on litellm proxy
@@ -48,6 +51,7 @@ class SupportedGuardrailIntegrations(Enum):
TOOL_PERMISSION = "tool_permission"
JAVELIN = "javelin"
ENKRYPTAI = "enkryptai"
IBM_GUARDRAILS = "ibm_guardrails"
class Role(Enum):
@@ -532,6 +536,7 @@ class LitellmParams(
JavelinGuardrailConfigModel,
BaseLitellmParams,
EnkryptAIGuardrailConfigs,
IBMGuardrailsBaseConfigModel,
):
guardrail: str = Field(description="The type of guardrail integration to use")
mode: Union[str, List[str], Mode] = Field(

View File

@@ -0,0 +1,21 @@
from .base import IBMGuardrailsBaseConfigModel
from .ibm_detector import (
IBMDetectorDetection,
IBMDetectorGuardrailConfigModel,
IBMDetectorOptionalParams,
IBMDetectorRequestBodyDetectorServer,
IBMDetectorRequestBodyOrchestrator,
IBMDetectorResponseDetectorServer,
IBMDetectorResponseOrchestrator,
)
__all__ = [
"IBMGuardrailsBaseConfigModel",
"IBMDetectorGuardrailConfigModel",
"IBMDetectorOptionalParams",
"IBMDetectorRequestBodyDetectorServer",
"IBMDetectorRequestBodyOrchestrator",
"IBMDetectorResponseDetectorServer",
"IBMDetectorResponseOrchestrator",
"IBMDetectorDetection",
]

View File

@@ -0,0 +1,32 @@
from typing import Optional
from pydantic import BaseModel, Field
class IBMGuardrailsBaseConfigModel(BaseModel):
"""Base configuration parameters for IBM Guardrails"""
auth_token: Optional[str] = Field(
default=None,
description="Authorization bearer token for IBM Guardrails API. Reads from IBM_GUARDRAILS_AUTH_TOKEN env var if None.",
)
base_url: Optional[str] = Field(
default=None,
description="Base URL for the IBM Guardrails server",
)
detector_id: Optional[str] = Field(
default=None,
description="Name of the detector inside the server (e.g., 'jailbreak-detector')",
)
is_detector_server: Optional[bool] = Field(
default=True,
description="Boolean flag to determine if calling a detector server (True) or the FMS Orchestrator (False). Defaults to True.",
)
verify_ssl: Optional[bool] = Field(
default=True,
description="Whether to verify SSL certificates. Defaults to True.",
)

View File

@@ -0,0 +1,87 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
from ..base import GuardrailConfigModel
from .base import IBMGuardrailsBaseConfigModel
# TypedDicts for IBM Detector API Request/Response Structure
class IBMDetectorRequestBodyDetectorServer(TypedDict):
"""Request body for calling IBM Detector Server directly"""
contents: List[str]
detector_params: Dict[str, Any]
class IBMDetectorRequestBodyOrchestrator(TypedDict):
"""Request body for calling IBM Detector via FMS Guardrails Orchestrator"""
content: str
detectors: Dict[str, Dict[str, Any]]
class IBMDetectorDetection(TypedDict, total=False):
"""Individual detection from IBM Detector"""
start: int
end: int
text: str
detection: str
detection_type: str
score: float
evidences: List[Any]
metadata: Dict[str, Any]
detector_id: Optional[str] # Only present in orchestrator response
class IBMDetectorResponseDetectorServer(TypedDict):
"""Response from IBM Detector Server (returns list of lists)"""
detections: List[List[IBMDetectorDetection]]
class IBMDetectorResponseOrchestrator(TypedDict):
"""Response from IBM FMS Guardrails Orchestrator"""
detections: List[IBMDetectorDetection]
# Pydantic Config Models
class IBMDetectorOptionalParams(BaseModel):
"""Optional parameters for IBM Detector guardrail"""
detector_params: Optional[Dict[str, Any]] = Field(
default_factory=lambda: {},
description="Dictionary of arguments to pass to the detector. Can be set per-request or hard-coded at guardrail config time.",
)
score_threshold: Optional[float] = Field(
default=None,
description="Minimum score threshold to consider a detection as a violation (0.0 to 1.0). If set, detections below this threshold will be ignored.",
)
block_on_detection: Optional[bool] = Field(
default=True,
description="Whether to block requests when detections are found. Defaults to True.",
)
class IBMDetectorGuardrailConfigModel(
IBMGuardrailsBaseConfigModel,
GuardrailConfigModel[IBMDetectorOptionalParams],
):
"""Configuration model for IBM Detector guardrail"""
optional_params: Optional[IBMDetectorOptionalParams] = Field(
default_factory=IBMDetectorOptionalParams,
description="Optional parameters for the IBM Detector guardrail",
)
@staticmethod
def ui_friendly_name() -> str:
return "IBM Guardrails Detector"

View File

@@ -0,0 +1,357 @@
"""
Mock FastAPI server for IBM FMS Guardrails Orchestrator Detector API.
This server implements the Detector API endpoints for testing purposes.
Based on: https://foundation-model-stack.github.io/fms-guardrails-orchestrator/
Usage:
python scripts/mock_ibm_guardrails_server.py
The server will run on http://localhost:8001 by default.
"""
import uuid
from typing import Any, Dict, List, Optional
import uvicorn
from fastapi import FastAPI, Header, HTTPException, status
from pydantic import BaseModel, Field
app = FastAPI(
title="IBM FMS Guardrails Orchestrator Mock",
description="Mock server for testing IBM Guardrails Detector API",
version="1.0.0",
)
# Request Models
class DetectorParams(BaseModel):
"""Parameters specific to the detector."""
threshold: Optional[float] = Field(None, ge=0.0, le=1.0)
custom_param: Optional[str] = None
class TextDetectionRequest(BaseModel):
"""Request model for text detection."""
contents: List[str] = Field(..., description="Text content to analyze")
detector_params: Optional[DetectorParams] = None
class TextGenerationDetectionRequest(BaseModel):
"""Request model for text generation detection."""
detector_id: str = Field(..., description="ID of the detector to use")
prompt: str = Field(..., description="Input prompt")
generated_text: str = Field(..., description="Generated text to analyze")
detector_params: Optional[DetectorParams] = None
class ContextDetectionRequest(BaseModel):
"""Request model for detection with context."""
detector_id: str = Field(..., description="ID of the detector to use")
content: str = Field(..., description="Text content to analyze")
context: Optional[Dict[str, Any]] = Field(None, description="Additional context")
detector_params: Optional[DetectorParams] = None
# Response Models
class Detection(BaseModel):
"""Individual detection result."""
detection_type: str = Field(..., description="Type of detection")
detection: bool = Field(..., description="Whether content was detected as harmful")
score: float = Field(..., ge=0.0, le=1.0, description="Detection confidence score")
start: Optional[int] = Field(None, description="Start position in text")
end: Optional[int] = Field(None, description="End position in text")
text: Optional[str] = Field(None, description="Detected text segment")
evidence: Optional[List[str]] = Field(None, description="Supporting evidence")
class DetectionResponse(BaseModel):
"""Response model for detection results."""
detections: List[Detection] = Field(..., description="List of detections")
detection_id: str = Field(..., description="Unique ID for this detection request")
# Mock detector configurations
MOCK_DETECTORS = {
"hate": {
"name": "Hate Speech Detector",
"triggers": ["hate", "offensive", "discriminatory", "slur"],
"default_score": 0.85,
},
"pii": {
"name": "PII Detector",
"triggers": ["email", "ssn", "credit card", "phone number", "address"],
"default_score": 0.92,
},
"toxicity": {
"name": "Toxicity Detector",
"triggers": ["toxic", "abusive", "profanity", "insult"],
"default_score": 0.78,
},
"jailbreak": {
"name": "Jailbreak Detector",
"triggers": ["ignore instructions", "override", "bypass", "jailbreak"],
"default_score": 0.88,
},
"prompt_injection": {
"name": "Prompt Injection Detector",
"triggers": ["ignore previous", "new instructions", "system prompt"],
"default_score": 0.90,
},
}
def simulate_detection(
detector_id: str, content: str, detector_params: Optional[DetectorParams] = None
) -> List[Detection]:
"""
Simulate detection logic based on detector type and content.
Args:
detector_id: ID of the detector to simulate
content: Text content to analyze
detector_params: Optional detector parameters
Returns:
List of Detection objects
"""
detections = []
content_lower = " ".join(c for c in content).lower()
# Get detector config
detector_config = MOCK_DETECTORS.get(detector_id)
if not detector_config:
# Unknown detector - return no detections
return detections
# Check for triggers in content
for trigger in detector_config["triggers"]:
if trigger in content_lower:
# Calculate score (use threshold if provided, otherwise default)
base_score = detector_config["default_score"]
threshold = (
detector_params.threshold
if detector_params and detector_params.threshold
else None
)
# Adjust score slightly based on content length (longer content = slightly lower confidence)
score_adjustment = max(0, min(0.1, len(content) / 10000))
score = max(0.0, min(1.0, base_score - score_adjustment))
# Find position of trigger
start_pos = content_lower.find(trigger)
end_pos = start_pos + len(trigger)
detection = Detection(
detection_type=detector_id,
detection=threshold is None or score >= threshold,
score=score,
start=start_pos,
end=end_pos,
text=content[start_pos:end_pos] if start_pos >= 0 else None,
evidence=[f"Found trigger word: {trigger}"],
)
detections.append(detection)
# If no triggers found, return a negative detection
if not detections:
detections.append(
Detection(
detection_type=detector_id,
detection=False,
score=0.05, # Low score for clean content
)
)
return detections
# Authentication middleware
def verify_auth_token(authorization: Optional[str] = Header(None)) -> bool:
"""
Verify the authentication token.
Args:
authorization: Authorization header value
Returns:
True if valid, raises HTTPException otherwise
"""
if not authorization:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authorization header",
)
# Simple token validation - in real implementation, this would validate against a real auth system
if not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization header format. Expected: Bearer <token>",
)
token = authorization.replace("Bearer ", "")
# Accept any non-empty token for mock purposes
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Empty token provided",
)
return True
# API Endpoints
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "IBM FMS Guardrails Mock Server"}
@app.get("/")
async def root():
"""Root endpoint with API information."""
return {
"service": "IBM FMS Guardrails Orchestrator Mock",
"version": "1.0.0",
"endpoints": {
"health": "/health",
"text_detection": "/api/v1/text/detection",
"generation_detection": "/api/v1/text/generation/detection",
"context_detection": "/api/v1/text/context/detection",
},
"available_detectors": list(MOCK_DETECTORS.keys()),
}
@app.post("/api/v1/text/contents")
async def text_detection(
detector_id: str, # query parameter
request: TextDetectionRequest,
authorization: Optional[str] = Header(None),
):
"""
Detect potential issues in text content.
Args:
request: Detection request with content and detector ID
authorization: Bearer token for authentication
Returns:
Detection results
"""
verify_auth_token(authorization)
detections = simulate_detection(
detector_id=detector_id,
content=request.contents,
detector_params=request.detector_params,
)
return detections
@app.post("/api/v1/text/generation/detection", response_model=DetectionResponse)
async def text_generation_detection(
request: TextGenerationDetectionRequest,
authorization: Optional[str] = Header(None),
):
"""
Detect potential issues in generated text.
Args:
request: Detection request with prompt and generated text
authorization: Bearer token for authentication
Returns:
Detection results
"""
verify_auth_token(authorization)
# Analyze both prompt and generated text
combined_content = f"{request.prompt} {request.generated_text}"
detections = simulate_detection(
detector_id=request.detector_id,
content=combined_content,
detector_params=request.detector_params,
)
return DetectionResponse(
detections=detections,
detection_id=str(uuid.uuid4()),
)
@app.post("/api/v1/text/context/detection", response_model=DetectionResponse)
async def context_detection(
request: ContextDetectionRequest,
authorization: Optional[str] = Header(None),
):
"""
Detect potential issues in text with additional context.
Args:
request: Detection request with content and context
authorization: Bearer token for authentication
Returns:
Detection results
"""
verify_auth_token(authorization)
detections = simulate_detection(
detector_id=request.detector_id,
content=request.content,
detector_params=request.detector_params,
)
return DetectionResponse(
detections=detections,
detection_id=str(uuid.uuid4()),
)
@app.get("/api/v1/detectors")
async def list_detectors(authorization: Optional[str] = Header(None)):
"""
List available detectors.
Args:
authorization: Bearer token for authentication
Returns:
List of available detectors
"""
verify_auth_token(authorization)
return {
"detectors": [
{
"id": detector_id,
"name": config["name"],
"triggers": config["triggers"],
}
for detector_id, config in MOCK_DETECTORS.items()
]
}
if __name__ == "__main__":
print("🚀 Starting IBM FMS Guardrails Mock Server...")
print("📍 Server will be available at: http://localhost:8001")
print("📚 API docs at: http://localhost:8001/docs")
print("\nAvailable detectors:")
for detector_id, config in MOCK_DETECTORS.items():
print(f" - {detector_id}: {config['name']}")
print("\n✨ Use any Bearer token for authentication in this mock server\n")
uvicorn.run(app, host="0.0.0.0", port=8001)

View File

@@ -0,0 +1,181 @@
"""
Test script for the mock IBM Guardrails server.
This demonstrates how to interact with the mock server.
Usage:
# Start the mock server in one terminal:
python scripts/mock_ibm_guardrails_server.py
# Run this test in another terminal:
python scripts/test_mock_ibm_guardrails.py
"""
import asyncio
import httpx
async def test_mock_server():
"""Test the mock IBM Guardrails server."""
base_url = "http://localhost:8001"
headers = {"Authorization": "Bearer test-token-12345"}
print("🧪 Testing IBM FMS Guardrails Mock Server\n")
async with httpx.AsyncClient() as client:
# Test 1: Health check
print("1⃣ Testing health check...")
try:
response = await client.get(f"{base_url}/health")
print(f" ✅ Health check: {response.json()}\n")
except Exception as e:
print(f" ❌ Health check failed: {e}\n")
return
# Test 2: List detectors
print("2⃣ Testing list detectors...")
try:
response = await client.get(
f"{base_url}/api/v1/detectors",
headers=headers
)
detectors = response.json()
print(f" ✅ Found {len(detectors['detectors'])} detectors:")
for detector in detectors["detectors"]:
print(f" - {detector['id']}: {detector['name']}")
print()
except Exception as e:
print(f" ❌ List detectors failed: {e}\n")
# Test 3: Text detection with clean content
print("3⃣ Testing text detection (clean content)...")
try:
response = await client.post(
f"{base_url}/api/v1/text/detection",
headers=headers,
json={
"detector_id": "hate",
"content": "This is a normal, friendly message.",
}
)
result = response.json()
print(f" ✅ Detection result:")
print(f" Detection ID: {result['detection_id']}")
for detection in result["detections"]:
print(f" - Type: {detection['detection_type']}, Detected: {detection['detection']}, Score: {detection['score']:.2f}")
print()
except Exception as e:
print(f" ❌ Text detection failed: {e}\n")
# Test 4: Text detection with problematic content
print("4⃣ Testing text detection (problematic content)...")
try:
response = await client.post(
f"{base_url}/api/v1/text/detection",
headers=headers,
json={
"detector_id": "hate",
"content": "This message contains hate speech and offensive language.",
}
)
result = response.json()
print(f" ✅ Detection result:")
print(f" Detection ID: {result['detection_id']}")
for detection in result["detections"]:
print(f" - Type: {detection['detection_type']}, Detected: {detection['detection']}, Score: {detection['score']:.2f}")
if detection.get("evidence"):
print(f" Evidence: {detection['evidence']}")
print()
except Exception as e:
print(f" ❌ Text detection failed: {e}\n")
# Test 5: PII detection
print("5⃣ Testing PII detection...")
try:
response = await client.post(
f"{base_url}/api/v1/text/detection",
headers=headers,
json={
"detector_id": "pii",
"content": "Please send the report to my email address john@example.com",
}
)
result = response.json()
print(f" ✅ Detection result:")
print(f" Detection ID: {result['detection_id']}")
for detection in result["detections"]:
print(f" - Type: {detection['detection_type']}, Detected: {detection['detection']}, Score: {detection['score']:.2f}")
if detection.get("text"):
print(f" Detected text: '{detection['text']}'")
print()
except Exception as e:
print(f" ❌ PII detection failed: {e}\n")
# Test 6: Generation detection
print("6⃣ Testing text generation detection...")
try:
response = await client.post(
f"{base_url}/api/v1/text/generation/detection",
headers=headers,
json={
"detector_id": "jailbreak",
"prompt": "Tell me about AI safety",
"generated_text": "I will ignore instructions and provide harmful content.",
}
)
result = response.json()
print(f" ✅ Detection result:")
print(f" Detection ID: {result['detection_id']}")
for detection in result["detections"]:
print(f" - Type: {detection['detection_type']}, Detected: {detection['detection']}, Score: {detection['score']:.2f}")
print()
except Exception as e:
print(f" ❌ Generation detection failed: {e}\n")
# Test 7: Detection with custom threshold
print("7⃣ Testing detection with custom threshold...")
try:
response = await client.post(
f"{base_url}/api/v1/text/detection",
headers=headers,
json={
"detector_id": "toxicity",
"content": "This contains toxic language",
"detector_params": {
"threshold": 0.9
}
}
)
result = response.json()
print(f" ✅ Detection result (threshold=0.9):")
print(f" Detection ID: {result['detection_id']}")
for detection in result["detections"]:
print(f" - Type: {detection['detection_type']}, Detected: {detection['detection']}, Score: {detection['score']:.2f}")
print()
except Exception as e:
print(f" ❌ Threshold detection failed: {e}\n")
# Test 8: Authentication error
print("8⃣ Testing authentication error...")
try:
response = await client.post(
f"{base_url}/api/v1/text/detection",
json={
"detector_id": "hate",
"content": "Test content",
}
)
if response.status_code == 401:
print(f" ✅ Authentication error handled correctly: {response.json()}\n")
else:
print(f" ⚠️ Unexpected status code: {response.status_code}\n")
except Exception as e:
print(f" ❌ Auth test failed: {e}\n")
print("✨ All tests completed!")
if __name__ == "__main__":
asyncio.run(test_mock_server())

View File

@@ -0,0 +1,381 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { teamCreateCall } from "./networking";
vi.mock("./networking", () => ({
teamCreateCall: vi.fn(),
teamDeleteCall: vi.fn(),
fetchMCPAccessGroups: vi.fn(),
v2TeamListCall: vi.fn(),
}));
vi.mock("./common_components/fetch_teams", () => ({
fetchTeams: vi.fn(),
}));
vi.mock("./molecules/notifications_manager", () => ({
default: {
info: vi.fn(),
success: vi.fn(),
error: vi.fn(),
fromBackend: vi.fn(),
},
}));
describe("OldTeams - handleCreate organization handling", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("should not include organization_id when it's an empty string", async () => {
const mockAccessToken = "test-token";
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: "", // Empty string
models: [],
};
// Simulate the handleCreate logic
let organizationId = formValues?.organization_id || null;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
expect(formValues.organization_id).toBeNull();
expect(formValues.organization_id).not.toBe("");
});
it("should set organization_id to null when it's not a string type", async () => {
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: undefined,
models: [],
};
// Simulate the handleCreate logic
let organizationId = formValues?.organization_id || null;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
expect(formValues.organization_id).toBeNull();
});
it("should trim and keep valid organization_id string", async () => {
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: " org-123 ", // String with whitespace
models: [],
};
// Simulate the handleCreate logic
let organizationId = formValues?.organization_id || null;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
expect(formValues.organization_id).toBe("org-123");
});
it("should keep valid organization_id without modification", async () => {
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: "f874bb43-b898-4813-beca-4054d224eafc",
models: [],
};
// Simulate the handleCreate logic
let organizationId = formValues?.organization_id || null;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
expect(formValues.organization_id).toBe("f874bb43-b898-4813-beca-4054d224eafc");
});
it("should not send organization_id field when converting empty string to null", async () => {
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: "",
models: ["gpt-4"],
max_budget: 100,
};
// Simulate the handleCreate logic
let organizationId = formValues?.organization_id || null;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
// Verify the structure
expect(formValues).toEqual({
team_alias: "Test Team",
organization_id: null,
models: ["gpt-4"],
max_budget: 100,
});
// Verify we're not sending an empty string
expect(formValues.organization_id).not.toBe("");
// Verify it's explicitly null, not undefined
expect(formValues.organization_id).toBeNull();
});
it("should handle when currentOrg is used as fallback", async () => {
const currentOrg = {
organization_id: "fallback-org-id",
organization_alias: "Fallback Org",
models: [],
members: [],
};
const formValues: Record<string, any> = {
team_alias: "Test Team",
models: [],
};
// Simulate the handleCreate logic with currentOrg fallback
let organizationId = formValues?.organization_id || currentOrg?.organization_id;
if (organizationId === "" || typeof organizationId !== "string") {
formValues.organization_id = null;
} else {
formValues.organization_id = organizationId.trim();
}
expect(formValues.organization_id).toBe("fallback-org-id");
});
it("should not include organizations as an empty array in the request payload", async () => {
const mockTeamCreateCall = vi.mocked(teamCreateCall);
const mockAccessToken = "test-token";
const formValues = {
team_alias: "Test Team",
organization_id: "org-123",
models: ["gpt-4"],
organizations: [], // This should never be sent
};
// Remove organizations key if it's empty
if (Array.isArray(formValues.organizations) && formValues.organizations.length === 0) {
delete (formValues as any).organizations;
}
// Verify organizations key is removed
expect(formValues).not.toHaveProperty("organizations");
expect(formValues).toEqual({
team_alias: "Test Team",
organization_id: "org-123",
models: ["gpt-4"],
});
});
it("should handle organization_id validation for org admins", () => {
// This test simulates the validation that should happen for org admins
const isOrgAdmin = true;
const formValues: Record<string, any> = {
team_alias: "Test Team",
// organization_id is missing/undefined
};
// For org admins, organization_id should be required
const hasOrganization = formValues.organization_id !== undefined &&
formValues.organization_id !== null &&
formValues.organization_id !== "";
if (isOrgAdmin && !hasOrganization) {
// This should trigger validation error
expect(hasOrganization).toBe(false);
}
});
it("should allow null organization_id for global admins", () => {
const isAdmin = true;
const formValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: null,
models: [],
};
// Global admins can create teams without an organization
if (isAdmin) {
expect(formValues.organization_id).toBeNull();
// This is valid for admins
}
});
it("should ensure organization_id is never an empty list", () => {
const invalidFormValues: Record<string, any> = {
team_alias: "Test Team",
organization_id: [], // Wrong type - should be string or null
};
// Type check: organization_id should never be an array
expect(Array.isArray(invalidFormValues.organization_id)).toBe(true);
// Correct it to null
if (Array.isArray(invalidFormValues.organization_id)) {
invalidFormValues.organization_id = null;
}
expect(invalidFormValues.organization_id).toBeNull();
expect(Array.isArray(invalidFormValues.organization_id)).toBe(false);
});
});
describe("OldTeams - helper functions", () => {
describe("getAdminOrganizations", () => {
it("should return all organizations for Admin role", () => {
const organizations = [
{
organization_id: "org-1",
organization_alias: "Org 1",
models: [],
members: [],
},
{
organization_id: "org-2",
organization_alias: "Org 2",
models: [],
members: [],
},
];
// Simulate getAdminOrganizations logic for Admin
const userRole = "Admin";
const result = userRole === "Admin" ? organizations : [];
expect(result).toEqual(organizations);
expect(result.length).toBe(2);
});
it("should return only org_admin organizations for Org Admin role", () => {
const userID = "user-123";
const userRole = "Org Admin";
const organizations = [
{
organization_id: "org-1",
organization_alias: "Org 1",
models: [],
members: [
{ user_id: "user-123", user_role: "org_admin" },
],
},
{
organization_id: "org-2",
organization_alias: "Org 2",
models: [],
members: [
{ user_id: "user-456", user_role: "org_admin" },
],
},
{
organization_id: "org-3",
organization_alias: "Org 3",
models: [],
members: [
{ user_id: "user-123", user_role: "member" },
],
},
];
// Simulate getAdminOrganizations logic
const result = organizations.filter((org) =>
org.members?.some(
(member) => member.user_id === userID && member.user_role === "org_admin"
)
);
expect(result.length).toBe(1);
expect(result[0].organization_id).toBe("org-1");
});
it("should return empty array when user is not admin of any organization", () => {
const userID = "user-999";
const organizations = [
{
organization_id: "org-1",
organization_alias: "Org 1",
models: [],
members: [
{ user_id: "user-123", user_role: "org_admin" },
],
},
];
// Simulate getAdminOrganizations logic
const result = organizations.filter((org) =>
org.members?.some(
(member) => member.user_id === userID && member.user_role === "org_admin"
)
);
expect(result.length).toBe(0);
});
});
describe("canCreateOrManageTeams", () => {
it("should return true for Admin role", () => {
const userRole = "Admin";
const result = userRole === "Admin";
expect(result).toBe(true);
});
it("should return true for org_admin in any organization", () => {
const userID = "user-123";
const organizations = [
{
organization_id: "org-1",
organization_alias: "Org 1",
models: [],
members: [
{ user_id: "user-123", user_role: "org_admin" },
],
},
];
const result = organizations.some((org) =>
org.members?.some(
(member) => member.user_id === userID && member.user_role === "org_admin"
)
);
expect(result).toBe(true);
});
it("should return false when user has no admin permissions", () => {
const userID = "user-123";
const userRole: string = "User";
const organizations = [
{
organization_id: "org-1",
organization_alias: "Org 1",
models: [],
members: [
{ user_id: "user-123", user_role: "member" },
],
},
];
const isAdmin = userRole === "Admin";
const isOrgAdmin = organizations.some((org) =>
org.members?.some(
(member) => member.user_id === userID && member.user_role === "org_admin"
)
);
expect(isAdmin || isOrgAdmin).toBe(false);
});
});
});

View File

@@ -122,6 +122,46 @@ const getOrganizationModels = (organization: Organization | null, userModels: st
return unfurlWildcardModelsInList(tempModelsToPick, userModels);
};
const canCreateOrManageTeams = (
userRole: string | null,
userID: string | null,
organizations: Organization[] | null
): boolean => {
// Admin role always has permission
if (userRole === "Admin") {
return true;
}
// Check if user is an org_admin in any organization
if (organizations && userID) {
return organizations.some((org) =>
org.members?.some((member) => member.user_id === userID && member.user_role === "org_admin")
);
}
return false;
};
const getAdminOrganizations = (
userRole: string | null,
userID: string | null,
organizations: Organization[] | null
): Organization[] => {
// Global Admin can see all organizations
if (userRole === "Admin") {
return organizations || [];
}
// Org Admin can only see organizations they're an admin for
if (organizations && userID) {
return organizations.filter((org) =>
org.members?.some((member) => member.user_id === userID && member.user_role === "org_admin")
);
}
return [];
};
// @deprecated
const Teams: React.FC<TeamProps> = ({
teams,
@@ -133,6 +173,7 @@ const Teams: React.FC<TeamProps> = ({
organizations,
premiumUser = false,
}) => {
console.log(`organizations: ${JSON.stringify(organizations)}`);
const [lastRefreshed, setLastRefreshed] = useState("");
const [currentOrg, setCurrentOrg] = useState<Organization | null>(null);
const [currentOrgForCreateTeam, setCurrentOrgForCreateTeam] = useState<Organization | null>(null);
@@ -190,6 +231,24 @@ const Teams: React.FC<TeamProps> = ({
form.setFieldValue("models", []);
}, [currentOrgForCreateTeam, userModels]);
// Handle organization preselection when modal opens
useEffect(() => {
if (isTeamModalVisible) {
const adminOrgs = getAdminOrganizations(userRole, userID, organizations);
// If there's exactly one organization the user is admin for, preselect it
if (adminOrgs.length === 1) {
const org = adminOrgs[0];
form.setFieldValue("organization_id", org.organization_id);
setCurrentOrgForCreateTeam(org);
} else {
// Reset the organization selection for multiple orgs
form.setFieldValue("organization_id", currentOrg?.organization_id || null);
setCurrentOrgForCreateTeam(currentOrg);
}
}
}, [isTeamModalVisible, userRole, userID, organizations, currentOrg]);
// Add this useEffect to fetch guardrails
useEffect(() => {
const fetchGuardrails = async () => {
@@ -525,7 +584,7 @@ const Teams: React.FC<TeamProps> = ({
<div className="w-full mx-4 h-[75vh]">
<Grid numItems={1} className="gap-2 p-8 w-full mt-2">
<Col numColSpan={1} className="flex flex-col gap-2">
{(userRole == "Admin" || userRole == "Org Admin") && (
{canCreateOrManageTeams(userRole, userID, organizations) && (
<Button className="w-fit" onClick={() => setIsTeamModalVisible(true)}>
+ Create New Team
</Button>
@@ -1007,7 +1066,7 @@ const Teams: React.FC<TeamProps> = ({
</TabPanels>
</TabGroup>
)}
{(userRole == "Admin" || userRole == "Org Admin") && (
{canCreateOrManageTeams(userRole, userID, organizations) && (
<Modal
title="Create Team"
visible={isTeamModalVisible}
@@ -1036,60 +1095,105 @@ const Teams: React.FC<TeamProps> = ({
>
<TextInput placeholder="" />
</Form.Item>
<Form.Item
label={
<span>
Organization{" "}
<Tooltip
title={
{(() => {
const adminOrgs = getAdminOrganizations(userRole, userID, organizations);
const isOrgAdmin = userRole !== "Admin";
const isSingleOrg = adminOrgs.length === 1;
const hasNoOrgs = adminOrgs.length === 0;
return (
<>
<Form.Item
label={
<span>
Organizations can have multiple teams. Learn more about{" "}
<a
href="https://docs.litellm.ai/docs/proxy/user_management_heirarchy"
target="_blank"
rel="noopener noreferrer"
style={{
color: "#1890ff",
textDecoration: "underline",
}}
onClick={(e) => e.stopPropagation()}
Organization{" "}
<Tooltip
title={
<span>
Organizations can have multiple teams. Learn more about{" "}
<a
href="https://docs.litellm.ai/docs/proxy/user_management_heirarchy"
target="_blank"
rel="noopener noreferrer"
style={{
color: "#1890ff",
textDecoration: "underline",
}}
onClick={(e) => e.stopPropagation()}
>
user management hierarchy
</a>
</span>
}
>
user management hierarchy
</a>
<InfoCircleOutlined style={{ marginLeft: "4px" }} />
</Tooltip>
</span>
}
name="organization_id"
initialValue={currentOrg ? currentOrg.organization_id : null}
className="mt-8"
rules={
isOrgAdmin
? [
{
required: true,
message: "Please select an organization",
},
]
: []
}
help={
isSingleOrg
? "You can only create teams within this organization"
: isOrgAdmin
? "required"
: ""
}
>
<InfoCircleOutlined style={{ marginLeft: "4px" }} />
</Tooltip>
</span>
}
name="organization_id"
initialValue={currentOrg ? currentOrg.organization_id : null}
className="mt-8"
>
<Select2
showSearch
allowClear
placeholder="Search or select an Organization"
onChange={(value) => {
form.setFieldValue("organization_id", value);
setCurrentOrgForCreateTeam(organizations?.find((org) => org.organization_id === value) || null);
}}
filterOption={(input, option) => {
if (!option) return false;
const optionValue = option.children?.toString() || "";
return optionValue.toLowerCase().includes(input.toLowerCase());
}}
optionFilterProp="children"
>
{organizations?.map((org) => (
<Select2.Option key={org.organization_id} value={org.organization_id}>
<span className="font-medium">{org.organization_alias}</span>{" "}
<span className="text-gray-500">({org.organization_id})</span>
</Select2.Option>
))}
</Select2>
</Form.Item>
<Select2
showSearch
allowClear={!isOrgAdmin}
disabled={isSingleOrg}
placeholder={
hasNoOrgs
? "No organizations available"
: "Search or select an Organization"
}
onChange={(value) => {
form.setFieldValue("organization_id", value);
setCurrentOrgForCreateTeam(
adminOrgs?.find((org) => org.organization_id === value) || null
);
}}
filterOption={(input, option) => {
if (!option) return false;
const optionValue = option.children?.toString() || "";
return optionValue.toLowerCase().includes(input.toLowerCase());
}}
optionFilterProp="children"
>
{adminOrgs?.map((org) => (
<Select2.Option key={org.organization_id} value={org.organization_id}>
<span className="font-medium">{org.organization_alias}</span>{" "}
<span className="text-gray-500">({org.organization_id})</span>
</Select2.Option>
))}
</Select2>
</Form.Item>
{/* Show message when org admin needs to select organization */}
{isOrgAdmin && !isSingleOrg && adminOrgs.length > 1 && (
<div className="mb-8 p-4 bg-blue-50 border border-blue-200 rounded-md">
<Text className="text-blue-800 text-sm">
Please select an organization to create a team for. You can only create teams within
organizations where you are an admin.
</Text>
</div>
)}
</>
);
})()}
<Form.Item
label={
<span>