Add vector store support for ragflow

This commit is contained in:
Sameer Kankute
2025-12-03 15:29:47 +05:30
parent 52090c3f3e
commit 8eaabb4ad7
9 changed files with 1010 additions and 5 deletions

View File

@@ -21,6 +21,7 @@ LiteLLM integrates with vector stores, allowing your models to access your organ
- [Azure Vector Stores](https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/file-search?tabs=python#vector-stores) (Cannot be directly queried. Only available for calling in Assistants messages. We will be adding Azure AI Search Vector Store API support soon.)
- [Vertex AI RAG API](https://cloud.google.com/vertex-ai/generative-ai/docs/rag-overview)
- [Gemini File Search](https://ai.google.dev/gemini-api/docs/file-search)
- [RAGFlow Datasets](/docs/providers/ragflow_vector_store.md) (Dataset management only, search not supported)
## Quick Start

View File

@@ -0,0 +1,349 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
import Image from '@theme/IdealImage';
# RAGFlow Vector Stores
Litellm support creation and management of datasets for document processing and knowledge base management in Ragflow.
| Property | Details |
|----------|---------|
| Description | RAGFlow datasets enable document processing, chunking, and knowledge base management for RAG applications. |
| Provider Route on LiteLLM | `ragflow` in the litellm vector_store_registry |
| Provider Doc | [RAGFlow API Documentation ↗](https://ragflow.io/docs) |
| Supported Operations | Dataset Management (Create, List, Update, Delete) |
| Search/Retrieval | ❌ Not supported (management only) |
## Quick Start
### LiteLLM Python SDK
```python showLineNumbers title="Example using LiteLLM Python SDK"
import os
import litellm
# Set RAGFlow credentials
os.environ["RAGFLOW_API_KEY"] = "your-ragflow-api-key"
os.environ["RAGFLOW_API_BASE"] = "http://localhost:9380" # Optional, defaults to localhost:9380
# Create a RAGFlow dataset
response = litellm.vector_stores.create(
name="my-dataset",
custom_llm_provider="ragflow",
metadata={
"description": "My knowledge base dataset",
"embedding_model": "BAAI/bge-large-zh-v1.5@BAAI",
"chunk_method": "naive"
}
)
print(f"Created dataset ID: {response.id}")
print(f"Dataset name: {response.name}")
```
### LiteLLM Proxy
#### 1. Configure your vector_store_registry
<Tabs>
<TabItem value="config-yaml" label="config.yaml">
```yaml
model_list:
- model_name: gpt-4o-mini
litellm_params:
model: gpt-4o-mini
api_key: os.environ/OPENAI_API_KEY
vector_store_registry:
- vector_store_name: "ragflow-knowledge-base"
litellm_params:
vector_store_id: "your-dataset-id"
custom_llm_provider: "ragflow"
api_key: os.environ/RAGFLOW_API_KEY
api_base: os.environ/RAGFLOW_API_BASE # Optional
vector_store_description: "RAGFlow dataset for knowledge base"
vector_store_metadata:
source: "Company documentation"
```
</TabItem>
<TabItem value="litellm-ui" label="LiteLLM UI">
On the LiteLLM UI, Navigate to Experimental > Vector Stores > Create Vector Store. On this page you can create a vector store with a name, vector store id and credentials.
<Image
img={require('../../img/kb_2.png')}
style={{width: '50%'}}
/>
</TabItem>
</Tabs>
#### 2. Create a dataset via Proxy
<Tabs>
<TabItem value="curl" label="Curl">
```bash
curl http://localhost:4000/v1/vector_stores \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LITELLM_API_KEY" \
-d '{
"name": "my-ragflow-dataset",
"custom_llm_provider": "ragflow",
"metadata": {
"description": "Test dataset",
"chunk_method": "naive"
}
}'
```
</TabItem>
<TabItem value="openai-sdk" label="OpenAI Python SDK">
```python
from openai import OpenAI
# Initialize client with your LiteLLM proxy URL
client = OpenAI(
base_url="http://localhost:4000",
api_key="your-litellm-api-key"
)
# Create a RAGFlow dataset
response = client.vector_stores.create(
name="my-ragflow-dataset",
custom_llm_provider="ragflow",
metadata={
"description": "Test dataset",
"chunk_method": "naive"
}
)
print(f"Created dataset: {response.id}")
```
</TabItem>
</Tabs>
## Configuration
### Environment Variables
RAGFlow vector stores support configuration via environment variables:
- `RAGFLOW_API_KEY` - Your RAGFlow API key (required)
- `RAGFLOW_API_BASE` - RAGFlow API base URL (optional, defaults to `http://localhost:9380`)
### Parameters
You can also pass these via `litellm_params`:
- `api_key` - RAGFlow API key (overrides `RAGFLOW_API_KEY` env var)
- `api_base` - RAGFlow API base URL (overrides `RAGFLOW_API_BASE` env var)
## Dataset Creation Options
### Basic Dataset Creation
```python
response = litellm.vector_stores.create(
name="basic-dataset",
custom_llm_provider="ragflow"
)
```
### Dataset with Chunk Method
RAGFlow supports various chunk methods for different document types:
<Tabs>
<TabItem value="naive" label="Naive (General)">
```python
response = litellm.vector_stores.create(
name="general-dataset",
custom_llm_provider="ragflow",
metadata={
"chunk_method": "naive",
"parser_config": {
"chunk_token_num": 512,
"delimiter": "\n",
"html4excel": False,
"layout_recognize": "DeepDOC"
}
}
)
```
</TabItem>
<TabItem value="book" label="Book">
```python
response = litellm.vector_stores.create(
name="book-dataset",
custom_llm_provider="ragflow",
metadata={
"chunk_method": "book",
"parser_config": {
"raptor": {
"use_raptor": False
}
}
}
)
```
</TabItem>
<TabItem value="qa" label="Q&A">
```python
response = litellm.vector_stores.create(
name="qa-dataset",
custom_llm_provider="ragflow",
metadata={
"chunk_method": "qa",
"parser_config": {
"raptor": {
"use_raptor": False
}
}
}
)
```
</TabItem>
<TabItem value="paper" label="Paper">
```python
response = litellm.vector_stores.create(
name="paper-dataset",
custom_llm_provider="ragflow",
metadata={
"chunk_method": "paper",
"parser_config": {
"raptor": {
"use_raptor": False
}
}
}
)
```
</TabItem>
</Tabs>
### Dataset with Ingestion Pipeline
Instead of using a chunk method, you can use an ingestion pipeline:
```python
response = litellm.vector_stores.create(
name="pipeline-dataset",
custom_llm_provider="ragflow",
metadata={
"parse_type": 2, # Number of parsers in your pipeline
"pipeline_id": "d0bebe30ae2211f0970942010a8e0005" # 32-character hex ID
}
)
```
**Note**: `chunk_method` and `pipeline_id` are mutually exclusive. Use one or the other.
### Advanced Parser Configuration
```python
response = litellm.vector_stores.create(
name="advanced-dataset",
custom_llm_provider="ragflow",
metadata={
"chunk_method": "naive",
"description": "Advanced dataset with custom parser config",
"embedding_model": "BAAI/bge-large-zh-v1.5@BAAI",
"permission": "me", # or "team"
"parser_config": {
"chunk_token_num": 1024,
"delimiter": "\n!?;。;!?",
"html4excel": True,
"layout_recognize": "DeepDOC",
"auto_keywords": 5,
"auto_questions": 3,
"task_page_size": 12,
"raptor": {
"use_raptor": True
},
"graphrag": {
"use_graphrag": False
}
}
}
)
```
## Supported Chunk Methods
RAGFlow supports the following chunk methods:
- `naive` - General purpose (default)
- `book` - For book documents
- `email` - For email documents
- `laws` - For legal documents
- `manual` - Manual chunking
- `one` - Single chunk
- `paper` - For academic papers
- `picture` - For image documents
- `presentation` - For presentation documents
- `qa` - Q&A format
- `table` - For table documents
- `tag` - Tag-based chunking
## RAGFlow-Specific Parameters
All RAGFlow-specific parameters should be passed via the `metadata` field:
| Parameter | Type | Description |
|-----------|------|-------------|
| `avatar` | string | Base64 encoding of the avatar (max 65535 chars) |
| `description` | string | Brief description of the dataset (max 65535 chars) |
| `embedding_model` | string | Embedding model name (e.g., "BAAI/bge-large-zh-v1.5@BAAI") |
| `permission` | string | Access permission: "me" (default) or "team" |
| `chunk_method` | string | Chunking method (see supported methods above) |
| `parser_config` | object | Parser configuration (varies by chunk_method) |
| `parse_type` | int | Number of parsers in pipeline (required with pipeline_id) |
| `pipeline_id` | string | 32-character hex pipeline ID (required with parse_type) |
## Error Handling
RAGFlow returns error responses in the following format:
```json
{
"code": 101,
"message": "Dataset name 'my-dataset' already exists"
}
```
LiteLLM automatically maps these to appropriate exceptions:
- `code != 0` → Raises exception with the error message
- Missing required fields → Raises `ValueError`
- Mutually exclusive parameters → Raises `ValueError`
## Limitations
- **Search/Retrieval**: RAGFlow vector stores support dataset management only. Search operations are not supported and will raise `NotImplementedError`.
- **List/Update/Delete**: These operations are not yet implemented through the standard vector store API. Use RAGFlow's native API endpoints directly.
## Further Reading
Vector Stores:
- [Vector Store Creation](../vector_stores/create.md)
- [Using Vector Stores with Completions](../completion/knowledgebase.md)
- [Vector Store Registry](../completion/knowledgebase.md#vectorstoreregistry)

View File

@@ -14,6 +14,7 @@ Create a vector store which can be used to store and search document chunks for
| End-user Tracking | ✅ | |
| Support LLM Providers (OpenAI `/vector_stores` API) | **OpenAI** | Full vector stores API support across providers |
| Support LLM Providers (Passthrough API) | [**Azure AI**](/docs/providers/azure_ai/azure_ai_vector_stores_passthrough) | Full vector stores API support across providers |
| Support LLM Providers (Dataset Management) | [**RAGFlow**](/docs/providers/ragflow_vector_store.md) | Dataset creation and management (search not supported) |
## Usage

View File

@@ -0,0 +1,2 @@
# RAGFlow vector stores module

View File

@@ -0,0 +1,249 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
import httpx
from litellm.llms.base_llm.vector_store.transformation import BaseVectorStoreConfig
from litellm.secret_managers.main import get_secret_str
from litellm.types.router import GenericLiteLLMParams
from litellm.types.vector_stores import (
BaseVectorStoreAuthCredentials,
VectorStoreCreateOptionalRequestParams,
VectorStoreCreateResponse,
VectorStoreFileCounts,
VectorStoreIndexEndpoints,
VectorStoreSearchOptionalRequestParams,
VectorStoreSearchResponse,
)
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
else:
LiteLLMLoggingObj = Any
class RAGFlowVectorStoreConfig(BaseVectorStoreConfig):
"""Vector store configuration for RAGFlow datasets."""
def get_auth_credentials(
self, litellm_params: dict
) -> BaseVectorStoreAuthCredentials:
api_key = litellm_params.get("api_key")
if api_key is None:
# Try to get from environment variable
api_key = get_secret_str("RAGFLOW_API_KEY")
if api_key is None:
raise ValueError("api_key is required (set RAGFLOW_API_KEY env var or pass in litellm_params)")
return {
"headers": {
"Authorization": f"Bearer {api_key}",
},
}
def get_vector_store_endpoints_by_type(self) -> VectorStoreIndexEndpoints:
"""RAGFlow vector stores are management-only, no search support."""
return {
"read": [],
"write": [],
}
def validate_environment(
self, headers: dict, litellm_params: Optional[GenericLiteLLMParams]
) -> dict:
"""Validate environment and set headers for RAGFlow API."""
litellm_params = litellm_params or GenericLiteLLMParams()
api_key = (
litellm_params.api_key
or get_secret_str("RAGFLOW_API_KEY")
)
if api_key is None:
raise ValueError("RAGFLOW_API_KEY is required (set env var or pass in litellm_params)")
headers.update(
{
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
)
return headers
def get_complete_url(
self,
api_base: Optional[str],
litellm_params: dict,
) -> str:
"""
Get the complete URL for RAGFlow datasets API.
Supports:
- RAGFLOW_API_BASE env var
- api_base in litellm_params
- Default: http://localhost:9380
"""
api_base = (
api_base
or litellm_params.get("api_base")
or get_secret_str("RAGFLOW_API_BASE")
or "http://localhost:9380"
)
# Remove trailing slashes
api_base = api_base.rstrip("/")
# RAGFlow datasets API endpoint
return f"{api_base}/api/v1/datasets"
def transform_search_vector_store_request(
self,
vector_store_id: str,
query: Union[str, List[str]],
vector_store_search_optional_params: VectorStoreSearchOptionalRequestParams,
api_base: str,
litellm_logging_obj: LiteLLMLoggingObj,
litellm_params: dict,
) -> Tuple[str, Dict]:
"""RAGFlow vector stores are management-only, search is not supported."""
raise NotImplementedError(
"RAGFlow vector stores support dataset management only, not search/retrieval"
)
def transform_search_vector_store_response(
self, response: httpx.Response, litellm_logging_obj: LiteLLMLoggingObj
) -> VectorStoreSearchResponse:
"""RAGFlow vector stores are management-only, search is not supported."""
raise NotImplementedError(
"RAGFlow vector stores support dataset management only, not search/retrieval"
)
def transform_create_vector_store_request(
self,
vector_store_create_optional_params: VectorStoreCreateOptionalRequestParams,
api_base: str,
) -> Tuple[str, Dict]:
"""
Transform create request to RAGFlow POST /api/v1/datasets format.
Maps LiteLLM params to RAGFlow dataset creation parameters.
RAGFlow-specific fields can be passed via metadata.
"""
url = api_base # Already includes /api/v1/datasets from get_complete_url
# Extract name (required by RAGFlow)
name = vector_store_create_optional_params.get("name")
if not name:
raise ValueError("name is required for RAGFlow dataset creation")
# Build request body
request_body: Dict[str, Any] = {
"name": name,
}
# Extract RAGFlow-specific fields from metadata
metadata = vector_store_create_optional_params.get("metadata")
if metadata:
# RAGFlow-specific fields that can be in metadata
ragflow_fields = [
"avatar",
"description",
"embedding_model",
"permission",
"chunk_method",
"parser_config",
"parse_type",
"pipeline_id",
]
for field in ragflow_fields:
if field in metadata:
request_body[field] = metadata[field]
# Validate: chunk_method and pipeline_id are mutually exclusive
if "chunk_method" in request_body and "pipeline_id" in request_body:
raise ValueError(
"chunk_method and pipeline_id are mutually exclusive. "
"Specify either chunk_method or pipeline_id, not both."
)
# If neither chunk_method nor pipeline_id is specified, default to naive
if "chunk_method" not in request_body and "pipeline_id" not in request_body:
request_body["chunk_method"] = "naive"
return url, request_body
def transform_create_vector_store_response(
self, response: httpx.Response
) -> VectorStoreCreateResponse:
"""
Transform RAGFlow response to VectorStoreCreateResponse format.
RAGFlow response format:
{
"code": 0,
"data": {
"id": "...",
"name": "...",
"create_time": 1745836841611, # milliseconds
...
}
}
"""
try:
response_json = response.json()
# Check for RAGFlow error response
if response_json.get("code") != 0:
error_message = response_json.get("message", "Unknown error")
raise self.get_error_class(
error_message=error_message,
status_code=response.status_code,
headers=response.headers,
)
data = response_json.get("data", {})
# Extract dataset ID
dataset_id = data.get("id")
if not dataset_id:
raise ValueError("RAGFlow response missing dataset id")
# Extract name
name = data.get("name")
# Convert create_time from milliseconds to seconds (Unix timestamp)
create_time_ms = data.get("create_time", 0)
created_at = int(create_time_ms / 1000) if create_time_ms else None
# Build VectorStoreCreateResponse
return VectorStoreCreateResponse(
id=dataset_id,
object="vector_store",
created_at=created_at or 0,
name=name,
bytes=0, # RAGFlow doesn't provide bytes in response
file_counts=VectorStoreFileCounts(
in_progress=0,
completed=0,
failed=0,
cancelled=0,
total=0,
),
status="completed",
expires_after=None,
expires_at=None,
last_active_at=None,
metadata=None,
)
except Exception as e:
# If it's already a ValueError we raised, re-raise it
if isinstance(e, ValueError) and "RAGFlow response" in str(e):
raise
# If it's already our error class (has status_code), re-raise
if hasattr(e, "status_code"):
raise
# Otherwise, wrap in our error class
raise self.get_error_class(
error_message=str(e),
status_code=response.status_code,
headers=response.headers,
)

View File

@@ -3,17 +3,15 @@ from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from annotated_types import Ge
from pydantic import BaseModel
from typing_extensions import TypedDict
from litellm.types.router import CredentialLiteLLMParams, GenericLiteLLMParams
class SupportedVectorStoreIntegrations(str, Enum):
"""Supported vector store integrations."""
BEDROCK = "bedrock"
RAGFLOW = "ragflow"
class LiteLLM_VectorStoreConfig(TypedDict, total=False):

View File

@@ -7633,6 +7633,12 @@ class ProviderConfigManager:
)
return GeminiVectorStoreConfig()
elif litellm.LlmProviders.RAGFLOW == provider:
from litellm.llms.ragflow.vector_stores.transformation import (
RAGFlowVectorStoreConfig,
)
return RAGFlowVectorStoreConfig()
return None
@staticmethod

View File

@@ -9,9 +9,12 @@ sys.path.insert(
) # Adds the parent directory to the system path
import litellm
from litellm.utils import ProviderConfigManager
from litellm.llms.openai.vector_stores.transformation import OpenAIVectorStoreConfig
from litellm.llms.vertex_ai.vector_stores.rag_api.transformation import VertexVectorStoreConfig
from litellm.llms.ragflow.vector_stores.transformation import RAGFlowVectorStoreConfig
from litellm.llms.vertex_ai.vector_stores.rag_api.transformation import (
VertexVectorStoreConfig,
)
from litellm.utils import ProviderConfigManager
def test_vector_store_create_with_simple_provider_name():
@@ -100,3 +103,40 @@ def test_vector_store_create_with_provider_api_type():
print("✅ Test passed: Provider with api_type 'vertex_ai/rag_api' handled correctly")
def test_vector_store_create_with_ragflow_provider():
"""
Test that vector store create correctly handles RAGFlow provider.
This should:
- Return correct RAGFlowVectorStoreConfig
- Support dataset management operations
"""
custom_llm_provider = "ragflow"
# Simulate the logic from vector_stores/main.py create function
if "/" in custom_llm_provider:
pytest.fail("Should not enter this branch for RAGFlow provider")
else:
api_type = None
custom_llm_provider = custom_llm_provider # Keep as-is
# Verify api_type is None
assert api_type is None, "api_type should be None for RAGFlow provider"
# Verify custom_llm_provider is unchanged
assert custom_llm_provider == "ragflow", "custom_llm_provider should remain 'ragflow'"
# Verify ProviderConfigManager returns correct config
vector_store_provider_config = ProviderConfigManager.get_provider_vector_stores_config(
provider=litellm.LlmProviders(custom_llm_provider),
api_type=api_type,
)
assert vector_store_provider_config is not None, "Should return a config for RAGFlow"
assert isinstance(
vector_store_provider_config, RAGFlowVectorStoreConfig
), "Should return RAGFlowVectorStoreConfig for RAGFlow provider"
print("✅ Test passed: RAGFlow provider handled correctly")

View File

@@ -0,0 +1,359 @@
"""
Test RAGFlow Vector Store helper functions and transformation.
"""
import os
import sys
import json
import pytest
from unittest.mock import Mock, patch, MagicMock
import httpx
sys.path.insert(0, os.path.abspath("../.."))
import litellm
from tests.vector_store_tests.base_vector_store_test import BaseVectorStoreTest
from litellm.llms.ragflow.vector_stores.transformation import RAGFlowVectorStoreConfig
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.types.vector_stores import VectorStoreCreateOptionalRequestParams
class TestRAGFlowVectorStore(BaseVectorStoreTest):
"""
Test the RAGFlow vector store transformation functionality.
"""
def get_base_create_vector_store_args(self) -> dict:
"""Must return the base create vector store args"""
return {
"custom_llm_provider": "ragflow",
"api_key": os.getenv("RAGFLOW_API_KEY", "test-api-key"),
"api_base": os.getenv("RAGFLOW_API_BASE", "http://localhost:9380")
}
def get_base_request_args(self):
# RAGFlow doesn't support search, so we'll skip search tests
return {
"vector_store_id": "test-dataset-id",
"custom_llm_provider": "ragflow",
"query": "test query"
}
def test_get_auth_credentials(self):
"""Test that auth credentials are correctly extracted."""
config = RAGFlowVectorStoreConfig()
# Test with api_key in params
litellm_params = {"api_key": "test-api-key-123"}
credentials = config.get_auth_credentials(litellm_params)
assert "headers" in credentials
assert credentials["headers"]["Authorization"] == "Bearer test-api-key-123"
# Test with missing api_key (should raise ValueError)
with pytest.raises(ValueError, match="api_key is required"):
config.get_auth_credentials({})
def test_get_complete_url(self):
"""Test that complete URL is correctly constructed."""
config = RAGFlowVectorStoreConfig()
# Test with api_base in params
litellm_params = {"api_base": "http://custom-host:9999"}
url = config.get_complete_url(api_base=None, litellm_params=litellm_params)
assert url == "http://custom-host:9999/api/v1/datasets"
# Test with api_base parameter
url = config.get_complete_url(api_base="http://test-host:8888", litellm_params={})
assert url == "http://test-host:8888/api/v1/datasets"
# Test with default (no api_base provided)
with patch.dict(os.environ, {}, clear=True):
url = config.get_complete_url(api_base=None, litellm_params={})
assert url == "http://localhost:9380/api/v1/datasets"
# Test with trailing slash removal
url = config.get_complete_url(api_base="http://test-host:8888/", litellm_params={})
assert url == "http://test-host:8888/api/v1/datasets"
def test_validate_environment(self):
"""Test environment validation and header setting."""
config = RAGFlowVectorStoreConfig()
from litellm.types.router import GenericLiteLLMParams
# Test with api_key in litellm_params
litellm_params = GenericLiteLLMParams(api_key="test-key")
headers = config.validate_environment({}, litellm_params)
assert headers["Authorization"] == "Bearer test-key"
assert headers["Content-Type"] == "application/json"
# Test with missing api_key
with pytest.raises(ValueError, match="RAGFLOW_API_KEY"):
config.validate_environment({}, GenericLiteLLMParams())
def test_get_vector_store_endpoints_by_type(self):
"""Test that endpoints are correctly configured (empty for management only)."""
config = RAGFlowVectorStoreConfig()
endpoints = config.get_vector_store_endpoints_by_type()
assert endpoints["read"] == []
assert endpoints["write"] == []
def test_transform_create_vector_store_request_basic(self):
"""Test basic dataset creation request transformation."""
config = RAGFlowVectorStoreConfig()
params: VectorStoreCreateOptionalRequestParams = {
"name": "test-dataset"
}
url, body = config.transform_create_vector_store_request(
params, "http://localhost:9380/api/v1/datasets"
)
assert url == "http://localhost:9380/api/v1/datasets"
assert body["name"] == "test-dataset"
assert body["chunk_method"] == "naive" # Default chunk method
def test_transform_create_vector_store_request_with_metadata(self):
"""Test dataset creation with RAGFlow-specific metadata."""
config = RAGFlowVectorStoreConfig()
params: VectorStoreCreateOptionalRequestParams = {
"name": "test-dataset-advanced",
"metadata": {
"description": "Test dataset",
"embedding_model": "BAAI/bge-large-zh-v1.5@BAAI",
"permission": "me",
"chunk_method": "naive",
"parser_config": {
"chunk_token_num": 512,
"delimiter": "\n"
}
}
}
url, body = config.transform_create_vector_store_request(
params, "http://localhost:9380/api/v1/datasets"
)
assert body["name"] == "test-dataset-advanced"
assert body["description"] == "Test dataset"
assert body["embedding_model"] == "BAAI/bge-large-zh-v1.5@BAAI"
assert body["permission"] == "me"
assert body["chunk_method"] == "naive"
assert "parser_config" in body
assert body["parser_config"]["chunk_token_num"] == 512
def test_transform_create_vector_store_request_missing_name(self):
"""Test that missing name raises ValueError."""
config = RAGFlowVectorStoreConfig()
params: VectorStoreCreateOptionalRequestParams = {}
with pytest.raises(ValueError, match="name is required"):
config.transform_create_vector_store_request(
params, "http://localhost:9380/api/v1/datasets"
)
def test_transform_create_vector_store_request_mutually_exclusive(self):
"""Test that chunk_method and pipeline_id are mutually exclusive."""
config = RAGFlowVectorStoreConfig()
params: VectorStoreCreateOptionalRequestParams = {
"name": "test-dataset",
"metadata": {
"chunk_method": "naive",
"pipeline_id": "d0bebe30ae2211f0970942010a8e0005"
}
}
with pytest.raises(ValueError, match="mutually exclusive"):
config.transform_create_vector_store_request(
params, "http://localhost:9380/api/v1/datasets"
)
def test_transform_create_vector_store_request_with_pipeline(self):
"""Test dataset creation with ingestion pipeline."""
config = RAGFlowVectorStoreConfig()
params: VectorStoreCreateOptionalRequestParams = {
"name": "test-pipeline-dataset",
"metadata": {
"parse_type": 2,
"pipeline_id": "d0bebe30ae2211f0970942010a8e0005"
}
}
url, body = config.transform_create_vector_store_request(
params, "http://localhost:9380/api/v1/datasets"
)
assert body["name"] == "test-pipeline-dataset"
assert body["parse_type"] == 2
assert body["pipeline_id"] == "d0bebe30ae2211f0970942010a8e0005"
assert "chunk_method" not in body
def test_transform_create_vector_store_response_success(self):
"""Test successful response transformation."""
config = RAGFlowVectorStoreConfig()
# Mock RAGFlow response
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {}
mock_response.json.return_value = {
"code": 0,
"data": {
"id": "3b4de7d4241d11f0a6a79f24fc270c7f",
"name": "test-dataset",
"create_time": 1745836841611,
"chunk_method": "naive",
"embedding_model": "BAAI/bge-large-zh-v1.5@BAAI"
}
}
response = config.transform_create_vector_store_response(mock_response)
assert response["id"] == "3b4de7d4241d11f0a6a79f24fc270c7f"
assert response["name"] == "test-dataset"
assert response["object"] == "vector_store"
assert response["status"] == "completed"
assert response["created_at"] == 1745836841 # Converted from milliseconds
assert response["bytes"] == 0
assert "file_counts" in response
def test_transform_create_vector_store_response_error(self):
"""Test error response transformation."""
config = RAGFlowVectorStoreConfig()
# Mock RAGFlow error response
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 400
mock_response.headers = {}
mock_response.json.return_value = {
"code": 101,
"message": "Dataset name 'test-dataset' already exists"
}
with pytest.raises(Exception): # Should raise BaseLLMException
config.transform_create_vector_store_response(mock_response)
def test_transform_create_vector_store_response_missing_id(self):
"""Test response with missing dataset ID."""
config = RAGFlowVectorStoreConfig()
mock_response = Mock(spec=httpx.Response)
mock_response.status_code = 200
mock_response.headers = {}
mock_response.json.return_value = {
"code": 0,
"data": {
"name": "test-dataset"
# Missing "id"
}
}
with pytest.raises(ValueError, match="missing dataset id"):
config.transform_create_vector_store_response(mock_response)
def test_transform_search_vector_store_request_not_implemented(self):
"""Test that search operations raise NotImplementedError."""
config = RAGFlowVectorStoreConfig()
logging_obj = MagicMock(spec=LiteLLMLoggingObj)
with pytest.raises(NotImplementedError, match="management only"):
config.transform_search_vector_store_request(
vector_store_id="test-id",
query="test query",
vector_store_search_optional_params={},
api_base="http://localhost:9380",
litellm_logging_obj=logging_obj,
litellm_params={}
)
def test_transform_search_vector_store_response_not_implemented(self):
"""Test that search response transformation raises NotImplementedError."""
config = RAGFlowVectorStoreConfig()
logging_obj = MagicMock(spec=LiteLLMLoggingObj)
mock_response = Mock(spec=httpx.Response)
with pytest.raises(NotImplementedError, match="management only"):
config.transform_search_vector_store_response(mock_response, logging_obj)
def _validate_vector_store_create_response(self, response):
"""Override to handle RAGFlow-specific response format."""
# RAGFlow IDs are hex strings (not OpenAI-style vs_* format)
# So we override the base validation to not check for vs_ prefix
assert isinstance(response, dict), f"Response should be a dict, got {type(response)}"
assert "id" in response, "Missing required field 'id' in create response"
assert "object" in response, "Missing required field 'object' in create response"
assert "created_at" in response, "Missing required field 'created_at' in create response"
assert response["object"] == "vector_store", \
f"Expected object to be 'vector_store', got '{response['object']}'"
assert isinstance(response["id"], str), \
f"id should be a string, got {type(response['id'])}"
assert len(response["id"]) > 0, "id should not be empty"
# RAGFlow IDs are hex strings, not OpenAI-style vs_* format
assert isinstance(response["created_at"], int), \
f"created_at should be an integer, got {type(response['created_at'])}"
assert response["created_at"] > 0, "created_at should be a positive timestamp"
print(f"✅ RAGFlow create response validation passed: Dataset '{response['id']}' created successfully")
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_basic_create_vector_store(self, sync_mode):
"""Override to handle RAGFlow-specific connection errors."""
litellm._turn_on_debug()
litellm.set_verbose = True
base_request_args = self.get_base_create_vector_store_args()
# Skip if no API key is set
if not os.getenv("RAGFLOW_API_KEY") and not base_request_args.get("api_key"):
pytest.skip("RAGFLOW_API_KEY not set, skipping integration test")
# Extract custom_llm_provider from base args if present
create_args = base_request_args
try:
if sync_mode:
response = litellm.vector_stores.create(
name=f"test-ragflow-{int(__import__('time').time())}",
**create_args
)
else:
response = await litellm.vector_stores.acreate(
name=f"test-ragflow-{int(__import__('time').time())}",
**create_args
)
except litellm.InternalServerError:
pytest.skip("Skipping test due to litellm.InternalServerError")
except Exception as e:
error_str = str(e).lower()
error_type = type(e).__name__
# Check if it's a connection error
if (isinstance(e, (ConnectionError, OSError)) or
"connection" in error_str or
"connect" in error_str or
"APIConnectionError" in error_type):
pytest.skip(f"Skipping test due to connection error (RAGFlow instance may not be running): {e}")
# If this is an authentication or permission error, skip the test
if "authentication" in error_str or "permission" in error_str or "unauthorized" in error_str:
pytest.skip(f"Skipping test due to authentication/permission error: {e}")
# Re-raise if it's not a handled error
raise
print("litellm create response=", json.dumps(response, indent=4, default=str))
# Validate response structure
self._validate_vector_store_create_response(response)
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_basic_search_vector_store(self, sync_mode):
"""Override search test - RAGFlow doesn't support search."""
pytest.skip("RAGFlow vector stores support dataset management only, not search")