Building an Enterprise-Grade RAG System with Open Source Tools
Organizations today face a significant challenge: extracting valuable insights from vast document repositories while ensuring the accuracy and relevance of AI-generated responses. As the volume of enterprise data grows exponentially, traditional search systems struggle to deliver precise information when needed. This is where Retrieval Augmented Generation (RAG) systems become invaluable.
In April 2025, IBM made a significant contribution to the open-source community by donating three powerful tools to the Linux Foundation: Docling, Data Prep Kit, and BeeAI. Together, these tools provide a comprehensive framework for building enterprise-grade RAG systems that can transform how organizations interact with their document collections.
This article will guide you through building a complete end-to-end RAG system using these newly open-sourced tools. We’ll cover everything from document processing and data preparation to creating intelligent agents that can answer complex queries with accurate, contextually relevant information.
Understanding the Components
Before diving into implementation, let’s understand the three key components that make up our RAG system.
Docling: Document Processing Engine
Docling is a powerful open-source document processing library originally developed by IBM Research Zurich. Its primary purpose is to simplify document handling for generative AI applications through a unified approach to parsing diverse document formats.
Key features include:
- Advanced PDF understanding and processing
- Unified parsing for various document formats (PDF, DOCX, XLSX, HTML, images)
- Extraction of document structure, including headings, sections, and tables
- OCR support for scanned documents
- Integration capabilities with LangChain and other AI frameworks
Docling transforms unstructured documents into structured formats that can be easily processed by language models. This is particularly valuable for RAG systems, which need to extract and understand content from diverse document sources.
Data Prep Kit: Unstructured Data Preparation
Data Prep Kit is designed to accelerate unstructured data preparation for Large Language Model applications. Developed by IBM Research, it helps developers cleanse, transform, and enrich use case-specific unstructured data.
The toolkit provides over 30 transformation modules for various aspects of data preparation:
- Ingestion modules for converting HTML, code, and web content to processable formats
- Cleaning modules for text normalization, PII redaction, and deduplication
- Filtering modules for quality assessment and content filtering
- Language processing modules for language identification and tokenization
- Analysis modules for code quality assessment and content profiling
In the RAG pipeline, Data Prep Kit complements Docling by taking the structured output and performing essential transformations to optimize it for retrieval and generation tasks. This includes crucial steps like chunking, which divides documents into manageable segments that can be efficiently indexed and retrieved.
BeeAI: Multi-Agent Framework
BeeAI is an open-source platform for discovering, running, and composing AI agents across different frameworks and programming languages. As part of the Linux Foundation AI & Data program, it aims to create a unified ecosystem for AI agent development and deployment.
Key capabilities include:
- Framework-agnostic agent integration
- Complex multi-agent workflow creation
- Agent composition and orchestration
- First-class ecosystem support for Python and TypeScript
- Open standards for interoperability
In our RAG system, BeeAI provides the framework for creating intelligent agents that can query the processed document collection, synthesize information, and generate coherent responses. The agent-based approach allows for more sophisticated reasoning and response generation compared to simple retrieval systems.
Prerequisites and Setup
Before we begin implementing our RAG system, let’s ensure we have the necessary prerequisites in place.
System Requirements
For an optimal development experience, your system should meet these minimum requirements:
- CPU: 4+ cores recommended for performance
- RAM: 8GB minimum, 16GB recommended for larger document collections
- Storage: At least 5GB of free space for installation and document processing
- Operating System: Linux, macOS, or Windows
- Python: Version 3.10-3.12
Required Dependencies
Our implementation relies on several Python libraries:
pip install langchain chromadb openai pydantic tiktoken tqdm pillow matplotlib
Setting Up the Python Environment
It’s best practice to create a dedicated virtual environment for our project:
# Create a virtual environment
python -m venv enterprise-rag-env
# Activate the environment (Linux/macOS)
source enterprise-rag-env/bin/activate
# Activate the environment (Windows)
enterprise-rag-env\Scripts\activate
# Create a project directory
mkdir enterprise-rag
cd enterprise-rag
Installing the Tools
Now, let’s install the three core components of our RAG system.
Installing Docling
Docling can be installed directly via pip:
pip install docling
For the latest development version, you can install directly from the GitHub repository:
pip install git+https://github.com/docling-project/docling.git
After installation, verify that Docling is working correctly:
from docling import Document
# Check version
print(f"Docling version: {Document.__version__}")
# Simple test with a sample document
try:
doc = Document.from_text("This is a test document.")
print("Docling installation successful!")
except Exception as e:
print(f"Docling installation issue: {e}")
Installing Data Prep Kit
For Data Prep Kit, we’ll install the full package with all optional dependencies:
pip install 'data-prep-toolkit-transforms[all]'
If you prefer a minimal installation without extra features:
pip install data-prep-toolkit-transforms
Verify the installation:
from data_prep_toolkit import pipeline
# Check version
print(f"Data Prep Kit version: {pipeline.__version__}")
# Create a simple pipeline to verify installation
try:
test_pipeline = pipeline.Pipeline([])
print("Data Prep Kit installation successful!")
except Exception as e:
print(f"Data Prep Kit installation issue: {e}")
Installing BeeAI
BeeAI can be installed via several methods. For our purposes, we’ll use the Python package:
pip install beeai
Alternatively, for macOS/Linux users, Homebrew provides a convenient installation option:
brew install beeai
To verify the installation:
beeai --version
You should also initialize BeeAI for your project:
beeai init
Stage 1: Document Processing with Docling
With our tools installed, let’s begin building our RAG system, starting with document processing using Docling.
Setting Up the Document Corpus
First, we need to organize our document collection. Let’s create a structured directory for our example documents:
mkdir -p data/raw/pdfs
mkdir -p data/raw/docx
mkdir -p data/raw/html
mkdir -p data/processed
For demonstration purposes, you can populate these directories with sample documents:
import os
from docling import Document
# Create a sample document for testing
sample_text = """
# Enterprise Document Management
## Introduction
This document outlines best practices for enterprise document management.
## Key Principles
1. Centralized storage
2. Consistent metadata
3. Access control
4. Version tracking
## Implementation Guidelines
Organizations should establish clear policies for document creation, storage, and retention.
"""
# Save as various formats for testing
os.makedirs("data/raw/pdfs", exist_ok=True)
os.makedirs("data/raw/docx", exist_ok=True)
os.makedirs("data/raw/html", exist_ok=True)
# Create a sample PDF
doc = Document.from_text(sample_text)
doc.export_to_pdf("data/raw/pdfs/sample_document.pdf")
# Create a sample DOCX
doc.export_to_docx("data/raw/docx/sample_document.docx")
# Create a sample HTML
doc.export_to_html("data/raw/html/sample_document.html")
Processing Documents with Docling
Now, let’s process these documents using Docling. We’ll create a utility function to handle different document types:
import os
from glob import glob
from docling import Document
import logging
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("docling-processor")
def process_documents(input_dir, output_dir, use_ocr=False):
"""
Process all documents in the input directory and save the results.
Args:
input_dir: Directory containing documents to process
output_dir: Directory to save processed documents
use_ocr: Whether to use OCR for scanned documents
Returns:
List of processed document objects
"""
os.makedirs(output_dir, exist_ok=True)
# Get all document files recursively
pdf_files = glob(os.path.join(input_dir, "**", "*.pdf"), recursive=True)
docx_files = glob(os.path.join(input_dir, "**", "*.docx"), recursive=True)
html_files = glob(os.path.join(input_dir, "**", "*.html"), recursive=True)
all_files = pdf_files + docx_files + html_files
logger.info(f"Found {len(all_files)} documents to process")
processed_docs = []
for file_path in all_files:
try:
filename = os.path.basename(file_path)
logger.info(f"Processing {filename}")
# Load document with appropriate settings
doc = Document.from_file(file_path, use_ocr=use_ocr)
# Save structured representation
output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.json")
doc.export_to_json(output_path)
# Extract text
text_output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
with open(text_output_path, "w", encoding="utf-8") as f:
f.write(doc.get_text())
processed_docs.append(doc)
logger.info(f"Successfully processed {filename}")
except Exception as e:
logger.error(f"Error processing {file_path}: {e}")
return processed_docs
# Process all our documents
docs = process_documents("data/raw", "data/processed")
print(f"Successfully processed {len(docs)} documents")
Handling Document Structure
One of Docling’s strengths is its ability to maintain document structure during processing. This is crucial for RAG systems, as understanding the document’s hierarchy helps in providing more contextually relevant information.
Let’s create a function to extract structural elements:
def extract_document_structure(documents):
"""
Extract and print structural information from processed documents.
Args:
documents: List of Document objects from Docling
"""
for i, doc in enumerate(documents):
print(f"\nDocument {i+1}: {doc.metadata.get('title', 'Untitled')}")
# Extract document structure
structure = doc.extract_structure()
# Print hierarchy of sections
print("Document Structure:")
for section in structure.sections:
indent = " " * section.level
print(f"{indent}- {section.title}")
# Extract tables if present
tables = doc.extract_tables()
if tables:
print(f"\nTables found: {len(tables)}")
for j, table in enumerate(tables):
print(f" Table {j+1}: {len(table.rows)} rows x {len(table.columns)} columns")
# Extract images if present
images = doc.extract_images()
if images:
print(f"\nImages found: {len(images)}")
# Extract structure from our processed documents
extract_document_structure(docs)
When dealing with complex layouts, Docling offers configuration options to improve processing:
# For complex layouts, use the advanced processing mode
doc = Document.from_file(
"complex_document.pdf",
processing_mode="advanced",
maintain_tables=True,
extract_images=True,
detect_orientation=True
)
# For documents with tables
tables = doc.extract_tables(
detection_method="grid",
confidence_threshold=0.7
)
# For documents with images
images = doc.extract_images(
min_size=100, # Minimum size in pixels
include_charts=True
)
Stage 2: Data Preparation with Data Prep Kit
After processing our documents with Docling, we need to prepare the extracted data for our RAG system using Data Prep Kit.
Data Cleaning Techniques
Let’s start by cleaning the extracted document content:
import os
import pandas as pd
from data_prep_toolkit.transforms import (
TextCleaner,
PiiRedactor,
Deduplicator,
HtmlCleaner
)
from data_prep_toolkit import pipeline
def clean_document_content(input_dir, output_dir):
"""
Clean the document content using Data Prep Kit.
Args:
input_dir: Directory containing processed document text files
output_dir: Directory to save cleaned documents
"""
os.makedirs(output_dir, exist_ok=True)
# Get all text files
text_files = [f for f in os.listdir(input_dir) if f.endswith(".txt")]
# Create a cleaning pipeline
cleaning_pipeline = pipeline.Pipeline([
TextCleaner(
remove_urls=False, # Keep URLs as they might be references
remove_emails=False, # Keep emails as they might be contacts
fix_unicode=True, # Fix unicode issues
normalize_whitespace=True, # Standardize whitespace
remove_accents=False, # Keep accents for names and terms
lowercase=False # Maintain case for proper nouns and acronyms
),
PiiRedactor(
redact_names=False, # Don't redact names as they might be important
redact_locations=False, # Don't redact locations
redact_phone_numbers=True, # Redact phone numbers
redact_emails=True, # Redact emails for privacy
redact_financial=True # Redact financial information
),
Deduplicator(
method="semantic", # Use semantic deduplication for similar content
threshold=0.85 # Set similarity threshold
)
])
# Process each file
for text_file in text_files:
input_path = os.path.join(input_dir, text_file)
output_path = os.path.join(output_dir, text_file)
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
# Clean the content
cleaned_content = cleaning_pipeline.process_text(content)
# Save cleaned content
with open(output_path, "w", encoding="utf-8") as f:
f.write(cleaned_content)
print(f"Cleaned {len(text_files)} document files")
# Create directory for cleaned content
os.makedirs("data/cleaned", exist_ok=True)
# Clean our document content
clean_document_content("data/processed", "data/cleaned")
Data Transformation Steps
Next, we need to transform our cleaned documents into chunks suitable for embedding and retrieval:
from data_prep_toolkit.transforms import (
TextChunker,
MetadataExtractor,
QualityFilter
)
import json
def transform_documents_for_rag(input_dir, output_dir):
"""
Transform cleaned documents into chunks suitable for RAG.
Args:
input_dir: Directory containing cleaned document text files
output_dir: Directory to save transformed chunks
"""
os.makedirs(output_dir, exist_ok=True)
# Get all text files
text_files = [f for f in os.listdir(input_dir) if f.endswith(".txt")]
# Create transformation pipeline
transform_pipeline = pipeline.Pipeline([
# Filter for quality content
QualityFilter(
min_length=50, # Minimum chunk length
max_length=8000, # Maximum chunk length
min_info_density=0.3 # Minimum information density
),
# Extract metadata
MetadataExtractor(
extract_entities=True, # Extract named entities
extract_keywords=True, # Extract keywords
extract_sentiment=True # Extract sentiment
),
# Chunk the text
TextChunker(
chunk_size=1000, # Target chunk size
chunk_overlap=200, # Overlap between chunks
chunking_method="semantic" # Use semantic chunking
)
])
all_chunks = []
# Process each file
for text_file in text_files:
input_path = os.path.join(input_dir, text_file)
document_id = os.path.splitext(text_file)[0]
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
# Transform the content
chunks_with_metadata = transform_pipeline.process_text_with_metadata(
content,
{"document_id": document_id, "source": input_path}
)
all_chunks.extend(chunks_with_metadata)
# Save all chunks to a single file
chunks_df = pd.DataFrame(all_chunks)
chunks_df.to_parquet(os.path.join(output_dir, "document_chunks.parquet"))
# Also save as JSON for easy inspection
with open(os.path.join(output_dir, "document_chunks.json"), "w") as f:
json.dump(all_chunks, f, indent=2)
print(f"Created {len(all_chunks)} chunks from {len(text_files)} documents")
return chunks_df
# Create directory for transformed content
os.makedirs("data/transformed", exist_ok=True)
# Transform our documents
chunks_df = transform_documents_for_rag("data/cleaned", "data/transformed")
Creating a Processed Dataset
Once we have our transformed chunks, we need to prepare them for embedding and indexing:
def create_vector_dataset(chunks_df, output_dir):
"""
Create a dataset suitable for vector database indexing.
Args:
chunks_df: DataFrame containing document chunks
output_dir: Directory to save the dataset
"""
os.makedirs(output_dir, exist_ok=True)
# Prepare data for embedding
vector_data = []
for i, row in chunks_df.iterrows():
# Combine text with relevant metadata for improved retrieval
if 'keywords' in row and row['keywords']:
enriched_text = f"Keywords: {', '.join(row['keywords'])}\n\n{row['text']}"
else:
enriched_text = row['text']
# Create metadata dictionary
metadata = {
"document_id": row['document_id'],
"source": row['source'],
"chunk_id": i,
}
# Add optional metadata if available
for field in ['entities', 'sentiment', 'section_title']:
if field in row and row[field]:
metadata[field] = row[field]
vector_data.append({
"id": f"chunk_{i}",
"text": enriched_text,
"metadata": metadata
})
# Save the dataset
with open(os.path.join(output_dir, "vector_dataset.json"), "w") as f:
json.dump(vector_data, f, indent=2)
print(f"Created vector dataset with {len(vector_data)} entries")
return vector_data
# Create directory for vector data
os.makedirs("data/vector", exist_ok=True)
# Create vector dataset
vector_data = create_vector_dataset(chunks_df, "data/vector")
Stage 3: Building an Agent Interface with BeeAI
Now that we have our processed document data, let’s use BeeAI to create an agent interface for our RAG system.
Designing the Agent Architecture
First, let’s design our agent architecture in a YAML configuration file:
# Save agent configuration to a file
agent_config = """
agents:
- name: retriever
type: vector_search
description: "Searches the document repository for relevant information"
config:
vector_db_type: "chroma"
vector_db_path: "./data/vector_db"
embedding_model: "openai"
similarity_top_k: 5
- name: reader
type: llm
description: "Reads and comprehends the retrieved document chunks"
config:
model: "gpt-4"
temperature: 0.2
max_tokens: 500
- name: synthesizer
type: llm
description: "Synthesizes information and generates a coherent response"
config:
model: "claude-3-sonnet"
temperature: 0.3
max_tokens: 1000
system_prompt: "You are a helpful assistant that provides accurate information based on the given context. Always cite your sources."
workflow:
- from: retriever
to: reader
condition: "always"
data_mapping:
documents: retriever_results
- from: reader
to: synthesizer
condition: "always"
data_mapping:
context: comprehended_documents
query: original_query
"""
# Write the configuration to a file
with open("rag_agents.yaml", "w") as f:
f.write(agent_config)
Implementing Query Agents
Next, let’s create a script to set up our vector database and implement the query agent:
import os
import json
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
def setup_vector_db(vector_data, db_dir):
"""
Set up a ChromaDB vector database with our document chunks.
Args:
vector_data: List of document chunks with metadata
db_dir: Directory to store the vector database
"""
os.makedirs(db_dir, exist_ok=True)
# Prepare documents for Chroma
texts = [item["text"] for item in vector_data]
metadatas = [item["metadata"] for item in vector_data]
ids = [item["id"] for item in vector_data]
# Initialize embeddings
embedding_function = OpenAIEmbeddings()
# Create and persist the vector database
vectordb = Chroma.from_texts(
texts=texts,
metadatas=metadatas,
ids=ids,
embedding=embedding_function,
persist_directory=db_dir
)
vectordb.persist()
print(f"Vector database created at {db_dir}")
return vectordb
# Set up our vector database
db_dir = "data/vector_db"
vectordb = setup_vector_db(vector_data, db_dir)
Now, let’s implement the retriever agent using BeeAI:
from beeai.agents import VectorSearchAgent, LLMAgent
from beeai.messaging import Message
import yaml
class RAGRetrieverAgent(VectorSearchAgent):
"""Custom retriever agent for RAG system."""
def __init__(self, vector_db, **kwargs):
super().__init__(**kwargs)
self.vector_db = vector_db
async def process(self, message):
"""Process incoming query and retrieve relevant documents."""
query = message.content
# Log the query
print(f"Processing query: {query}")
# Retrieve relevant documents
docs = self.vector_db.similarity_search(query, k=5)
# Format results
results = []
for i, doc in enumerate(docs):
results.append({
"content": doc.page_content,
"metadata": doc.metadata,
"relevance_score": 1.0 - (i * 0.1) # Simple scoring for illustration
})
# Return results
return Message(
sender=self.name,
content=results,
metadata={"original_query": query}
)
# Initialize retriever agent
retriever_agent = RAGRetrieverAgent(
name="retriever",
vector_db=vectordb,
description="Searches the document repository for relevant information"
)
Implementing Response Synthesis Agents
Now, let’s implement the reader and synthesizer agents:
class RAGReaderAgent(LLMAgent):
"""Agent for comprehending retrieved documents."""
async def process(self, message):
"""Process retrieved documents to comprehend their meaning."""
documents = message.content
query = message.metadata.get("original_query", "")
# Prepare context from documents
context = ""
for i, doc in enumerate(documents):
context += f"\nDocument {i+1}:\n"
context += f"Source: {doc['metadata'].get('source', 'Unknown')}\n"
context += f"Content: {doc['content']}\n"
context += "-" * 40 + "\n"
# Create prompt for the LLM
prompt = f"""
Query: {query}
You have been provided with the following document extracts:
{context}
Please comprehend these documents carefully, focusing on information relevant to the query.
Extract key facts, concepts, and information that address the query.
"""
# Use the LLM to comprehend the documents
response = await self.llm.generate(prompt)
# Return comprehended information
return Message(
sender=self.name,
content=response.content,
metadata={
"original_query": query,
"document_sources": [doc["metadata"].get("source") for doc in documents]
}
)
class RAGSynthesizerAgent(LLMAgent):
"""Agent for synthesizing information and generating responses."""
async def process(self, message):
"""Synthesize comprehended information into a coherent response."""
comprehended_info = message.content
query = message.metadata.get("original_query", "")
document_sources = message.metadata.get("document_sources", [])
# Create prompt for the LLM
prompt = f"""
Query: {query}
Based on the analysis of relevant documents, synthesize a comprehensive response that addresses the query.
Information extracted from documents:
{comprehended_info}
Your response should:
1. Be clear, concise, and directly answer the query
2. Integrate information from multiple documents when relevant
3. Include citations to the original documents where appropriate
4. Acknowledge any limitations or uncertainties in the available information
Sources: {', '.join(document_sources)}
"""
# Use the LLM to generate the final response
response = await self.llm.generate(prompt)
# Return synthesized response
return Message(
sender=self.name,
content=response.content,
metadata={
"original_query": query,
"document_sources": document_sources
}
)
# Initialize reader and synthesizer agents
reader_agent = RAGReaderAgent(
name="reader",
description="Reads and comprehends the retrieved document chunks",
model="gpt-4",
temperature=0.2,
max_tokens=500
)
synthesizer_agent = RAGSynthesizerAgent(
name="synthesizer",
description="Synthesizes information and generates a coherent response",
model="claude-3-sonnet",
temperature=0.3,
max_tokens=1000,
system_prompt="You are a helpful assistant that provides accurate information based on the given context. Always cite your sources."
)
Integrating the Components
Now, let’s integrate all the components into a complete RAG system.
Building the Complete Pipeline
Here’s how our components fit together in a unified pipeline:
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ │ │ │ │ │
│ Document Corpus │────▶│ Docling │────▶│ Processed Docs │
│ │ │ │ │ │
└───────────────────┘ └───────────────────┘ └─────────┬─────────┘
│
▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ │ │ │ │ │
│ Vector Store │◀────│ Data Prep Kit │◀────│ Cleaned Docs │
│ │ │ │ │ │
└─────────┬─────────┘ └───────────────────┘ └───────────────────┘
│
▼
┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐
│ │ │ │ │ │
│ Retriever Agent │────▶│ Reader Agent │────▶│ Synthesizer Agent │
│ │ │ │ │ │
└───────────────────┘ └───────────────────┘ └───────────────────┘
▲ │
│ │
│ ▼
┌───────────────────┐ ┌───────────────────┐
│ │ │ │
│ User Query │ │ Response │
│ │ │ │
└───────────────────┘ └───────────────────┘
Integration Code
Let’s create a unified script that ties everything together:
import os
import json
import asyncio
from beeai.workflow import Workflow
async def rag_pipeline(query, agent_config_path="rag_agents.yaml"):
"""
Run the complete RAG pipeline to answer a query.
Args:
query: The user's question
agent_config_path: Path to the agent configuration file
Returns:
The synthesized response to the query
"""
# Load agent configuration
with open(agent_config_path, "r") as f:
config = yaml.safe_load(f)
# Create workflow
workflow = Workflow.from_config(config)
# Add our agents to the workflow
workflow.add_agent(retriever_agent)
workflow.add_agent(reader_agent)
workflow.add_agent(synthesizer_agent)
# Process the query
print(f"Processing query: {query}")
result = await workflow.run(query)
return result.content
# Example usage
async def main():
# Process a sample query
query = "What are the key principles of enterprise document management?"
response = await rag_pipeline(query)
print("\nQuery:", query)
print("\nResponse:", response)
if __name__ == "__main__":
asyncio.run(main())
For production environments, we’d want to add more robust error handling, logging, and configuration options:
import logging
import argparse
import yaml
from beeai.config import Config
def setup_production_rag(config_path):
"""
Set up a production-ready RAG system.
Args:
config_path: Path to configuration file
Returns:
Configured RAG pipeline function
"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("rag_system.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("enterprise-rag")
# Load configuration
with open(config_path, "r") as f:
config = yaml.safe_load(f)
# Extract configuration parameters
doc_processing_config = config.get("document_processing", {})
data_prep_config = config.get("data_preparation", {})
vector_db_config = config.get("vector_database", {})
agent_config = config.get("agents", {})
# Set up document processing
logger.info("Setting up document processing...")
# (Setup code here)
# Set up data preparation
logger.info("Setting up data preparation...")
# (Setup code here)
# Set up vector database
logger.info("Setting up vector database...")
# (Setup code here)
# Set up agents
logger.info("Setting up agents...")
# (Setup code here)
# Return query function
async def query_rag(query_text):
try:
logger.info(f"Processing query: {query_text}")
# Process query
# (Query processing code here)
return response
except Exception as e:
logger.error(f"Error processing query: {e}")
return f"An error occurred while processing your query: {str(e)}"
return query_rag
# Command-line interface for production deployment
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Enterprise RAG System")
parser.add_argument("--config", default="config.yaml", help="Path to configuration file")
parser.add_argument("--query", help="Query to process")
args = parser.parse_args()
rag_function = setup_production_rag(args.config)
if args.query:
response = asyncio.run(rag_function(args.query))
print(response)
else:
# Start interactive mode or API server
print("Starting interactive mode...")
# (Interactive mode code here)
Testing and Evaluation
After building our RAG system, it’s essential to test and evaluate its performance.
Sample Queries and Results
Let’s test our system with several sample queries:
import asyncio
async def test_rag_system():
"""Test the RAG system with sample queries."""
test_queries = [
"What are the key principles of enterprise document management?",
"How should organizations implement document retention policies?",
"What are the best practices for access control in document management?",
"How does centralized storage benefit enterprise document management?",
"What are the challenges in implementing version tracking for documents?"
]
for query in test_queries:
print("\n" + "="*80)
print(f"Query: {query}")
print("-"*80)
response = await rag_pipeline(query)
print(f"Response: {response}")
# Simple evaluation - request user feedback
print("-"*80)
print("Evaluation (1-5, where 5 is excellent):")
print("1. Relevance: Did the answer address the question?")
print("2. Accuracy: Was the information correct?")
print("3. Completeness: Did the answer cover all aspects?")
print("4. Citation: Were sources properly cited?")
print("5. Clarity: Was the answer clear and understandable?")
if __name__ == "__main__":
asyncio.run(test_rag_system())
Performance Metrics
For a more systematic evaluation, we can implement automated metrics:
def evaluate_rag_performance(queries, ground_truth, responses):
"""
Evaluate RAG system performance using multiple metrics.
Args:
queries: List of test queries
ground_truth: List of expected answers
responses: List of system responses
Returns:
Dictionary of performance metrics
"""
from rouge import Rouge
import numpy as np
rouge = Rouge()
metrics = {
"relevance_scores": [],
"citation_rates": [],
"response_times": [],
"rouge_scores": []
}
for query, truth, response in zip(queries, ground_truth, responses):
# Calculate ROUGE scores (lexical overlap)
try:
rouge_scores = rouge.get_scores(response, truth)
rouge_l_f = rouge_scores[0]["rouge-l"]["f"]
metrics["rouge_scores"].append(rouge_l_f)
except Exception as e:
print(f"Error calculating ROUGE: {e}")
metrics["rouge_scores"].append(0)
# Count citations
citation_count = response.lower().count("source") + response.lower().count("document")
metrics["citation_rates"].append(citation_count)
# Simple heuristic for relevance (keyword matching)
query_keywords = set(query.lower().split())
response_words = set(response.lower().split())
keyword_overlap = len(query_keywords.intersection(response_words)) / len(query_keywords)
metrics["relevance_scores"].append(keyword_overlap)
# Calculate aggregate metrics
results = {
"avg_rouge_l_f": np.mean(metrics["rouge_scores"]),
"avg_citation_rate": np.mean(metrics["citation_rates"]),
"avg_relevance": np.mean(metrics["relevance_scores"]),
}
return results
Troubleshooting Common Issues
When building an enterprise-grade RAG system, you might encounter various challenges. Here are solutions for common issues:
Document Processing Failures with Docling
def troubleshoot_docling(doc_path):
"""
Troubleshoot issues with Docling document processing.
Args:
doc_path: Path to the problematic document
"""
try:
# Attempt basic loading
doc = Document.from_file(doc_path)
print("Basic loading successful")
except Exception as e:
print(f"Basic loading failed: {e}")
# Try with different options
try:
print("Attempting with OCR...")
doc = Document.from_file(doc_path, use_ocr=True)
print("OCR loading successful")
except Exception as e:
print(f"OCR loading failed: {e}")
try:
print("Attempting with minimal processing...")
doc = Document.from_file(
doc_path,
processing_mode="minimal",
maintain_tables=False,
extract_images=False
)
print("Minimal processing successful")
except Exception as e:
print(f"Minimal processing failed: {e}")
Data Transformation Challenges with Data Prep Kit
def troubleshoot_data_prep(text, chunk_size=1000):
"""
Troubleshoot issues with Data Prep Kit.
Args:
text: Text that's causing issues
chunk_size: Current chunk size setting
"""
from data_prep_toolkit.transforms import TextChunker
print(f"Text length: {len(text)} characters")
# Try different chunking strategies
strategies = ["fixed", "sentence", "paragraph", "semantic"]
for strategy in strategies:
print(f"\nTesting {strategy} chunking:")
try:
chunker = TextChunker(
chunk_size=chunk_size,
chunking_method=strategy
)
chunks = chunker.process_text(text)
print(f"Success! Created {len(chunks)} chunks")
print(f"Average chunk size: {sum(len(c) for c in chunks) / len(chunks)}")
# Show sample
if chunks:
print(f"Sample chunk: {chunks[0][:100]}...")
except Exception as e:
print(f"Failed: {e}")
Agent Communication Problems in BeeAI
def troubleshoot_agent_communication(workflow):
"""
Troubleshoot issues with agent communication in BeeAI.
Args:
workflow: The BeeAI workflow object
"""
print("Validating agent configuration...")
# Check agent definitions
for agent in workflow.agents:
print(f"\nAgent: {agent.name}")
print(f" Type: {agent.__class__.__name__}")
print(f" Required inputs: {agent.required_inputs if hasattr(agent, 'required_inputs') else 'Not specified'}")
print(f" Output format: {agent.output_format if hasattr(agent, 'output_format') else 'Not specified'}")
# Check connections
print("\nValidating connections:")
for connection in workflow.connections:
print(f" {connection.source} -> {connection.target}")
print(f" Condition: {connection.condition}")
# Validate data mapping
if hasattr(connection, "data_mapping") and connection.data_mapping:
print(f" Data mapping: {connection.data_mapping}")
# Check source agent outputs against target agent inputs
source_agent = workflow.get_agent(connection.source)
target_agent = workflow.get_agent(connection.target)
if hasattr(source_agent, "output_format") and hasattr(target_agent, "required_inputs"):
for target_input, source_output in connection.data_mapping.items():
if source_output not in source_agent.output_format:
print(f" WARNING: {source_output} not in {connection.source}'s outputs")
if target_input not in target_agent.required_inputs:
print(f" WARNING: {target_input} not in {connection.target}'s required inputs")
Memory Management for Large Document Collections
def optimize_memory_usage(doc_dir, chunk_size, batch_size=10):
"""
Optimize memory usage for large document collections.
Args:
doc_dir: Directory containing documents
chunk_size: Size of chunks to process
batch_size: Number of documents to process in a batch
"""
import os
import gc
import psutil
process = psutil.Process(os.getpid())
# List all documents
doc_files = [f for f in os.listdir(doc_dir) if os.path.isfile(os.path.join(doc_dir, f))]
total_docs = len(doc_files)
print(f"Total documents: {total_docs}")
print(f"Processing in batches of {batch_size}")
# Process in batches
for i in range(0, total_docs, batch_size):
batch = doc_files[i:i+batch_size]
print(f"\nBatch {i//batch_size + 1}/{(total_docs-1)//batch_size + 1}")
print(f"Memory usage before processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")
# Process batch
# (Your processing code here)
# Force garbage collection
gc.collect()
print(f"Memory usage after processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")
Handling Unsupported Document Formats
def handle_unsupported_format(file_path):
"""
Handle unsupported document formats.
Args:
file_path: Path to the unsupported document
"""
import os
import subprocess
import mimetypes
# Determine file type
mime_type, _ = mimetypes.guess_type(file_path)
extension = os.path.splitext(file_path)[1].lower()
print(f"File: {file_path}")
print(f"MIME type: {mime_type}")
print(f"Extension: {extension}")
if mime_type and mime_type.startswith('image/'):
# Handle image files
print("Detected image file, using OCR...")
# Use Docling with OCR
from docling import Document
doc = Document.from_file(file_path, use_ocr=True)
return doc.get_text()
elif extension in ['.epub', '.mobi', '.azw']:
# Handle e-book formats
print("Detected e-book format, converting...")
# Use calibre's ebook-convert if available
output_path = os.path.splitext(file_path)[0] + ".txt"
try:
subprocess.run(['ebook-convert', file_path, output_path], check=True)
with open(output_path, 'r', encoding='utf-8') as f:
return f.read()
except (subprocess.SubprocessError, FileNotFoundError) as e:
print(f"Conversion failed: {e}")
return None
Advanced Customization and Extensions
As your RAG system matures, you may want to extend and customize it for specific needs.
Custom Transformation Modules
You can create custom transformation modules for Data Prep Kit:
from data_prep_toolkit.transforms import BaseTransform
import re
class DomainSpecificTransform(BaseTransform):
"""Custom transformation for a specific domain."""
def __init__(self, domain_terms_file=None, **kwargs):
super().__init__(**kwargs)
self.domain_terms = {}
if domain_terms_file:
self.load_domain_terms(domain_terms_file)
def load_domain_terms(self, file_path):
"""Load domain-specific terms and their explanations."""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
parts = line.strip().split('|')
if len(parts) >= 2:
term, explanation = parts[0].strip(), parts[1].strip()
self.domain_terms[term] = explanation
def process_text(self, text):
"""Enhance text with domain-specific term explanations."""
enhanced_text = text
for term, explanation in self.domain_terms.items():
# Add explanations for domain terms
pattern = r'\b' + re.escape(term) + r'\b'
if re.search(pattern, text, re.IGNORECASE):
footnote = f"\n[{term}]: {explanation}"
if footnote not in enhanced_text:
enhanced_text += footnote
return enhanced_text
# Usage
domain_transform = DomainSpecificTransform(domain_terms_file="financial_terms.txt")
enhanced_text = domain_transform.process_text("The company reported significant EBITDA growth.")
Specialized Agents for Domain-Specific Tasks
For specific domains, you can create specialized agents:
class FinancialAnalysisAgent(LLMAgent):
"""Specialized agent for financial document analysis."""
async def process(self, message):
"""Extract and analyze financial metrics from documents."""
documents = message.content
query = message.metadata.get("original_query", "")
# Extract financial metrics
metrics = self._extract_financial_metrics(documents)
# Format for the LLM
prompt = f"""
Query: {query}
I've extracted the following financial metrics from the documents:
{metrics}
Analyze these metrics to answer the query. Consider trends, comparisons,
and implications for the business. Be precise with numbers and
calculations.
"""
# Generate analysis
response = await self.llm.generate(prompt)
return Message(
sender=self.name,
content=response.content,
metadata={
"original_query": query,
"financial_metrics": metrics
}
)
def _extract_financial_metrics(self, documents):
"""Extract financial metrics from documents."""
import re
# Patterns for common financial metrics
patterns = {
"revenue": r'\$?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)(?:\s*million|\s*billion)?\s*(?:in\s*revenue|revenue)',
"profit": r'\$?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)(?:\s*million|\s*billion)?\s*(?:in\s*profit|profit)',
"growth": r'(\d+(?:\.\d+)?)%\s*(?:growth|increase|decrease)'
}
metrics = {}
for doc in documents:
content = doc["content"]
for metric_name, pattern in patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches and metric_name not in metrics:
metrics[metric_name] = matches
# Format extracted metrics
formatted_metrics = ""
for metric, values in metrics.items():
formatted_metrics += f"{metric.capitalize()}: {', '.join(values)}\n"
return formatted_metrics
Integration with Other Open Source Tools
Our RAG system can be extended to work with various vector databases and LLM backends:
def setup_vector_database(vector_data, db_type="chroma", connection_params=None):
"""
Set up different vector database backends.
Args:
vector_data: Processed document chunks
db_type: Type of vector database to use
connection_params: Connection parameters for the database
Returns:
Configured vector database instance
"""
from langchain.vectorstores import Chroma, FAISS, Milvus, Qdrant, PGVector
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
# Set up embeddings
embeddings = OpenAIEmbeddings()
# Extract data
texts = [item["text"] for item in vector_data]
metadatas = [item["metadata"] for item in vector_data]
# Configure database based on type
if db_type == "chroma":
db = Chroma.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embeddings,
persist_directory=connection_params.get("persist_directory", "./chroma_db")
)
db.persist()
elif db_type == "faiss":
db = FAISS.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embeddings
)
# Optionally save
if "save_path" in connection_params:
db.save_local(connection_params["save_path"])
elif db_type == "milvus":
db = Milvus.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embeddings,
connection_args=connection_params
)
elif db_type == "qdrant":
db = Qdrant.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embeddings,
url=connection_params.get("url"),
collection_name=connection_params.get("collection_name", "documents")
)
elif db_type == "pgvector":
db = PGVector.from_texts(
texts=texts,
metadatas=metadatas,
embedding=embeddings,
connection_string=connection_params.get("connection_string")
)
else:
raise ValueError(f"Unsupported vector database type: {db_type}")
return db
For LLM backends:
def setup_llm_backend(backend_type="openai", model_params=None):
"""
Set up different LLM backends.
Args:
backend_type: Type of LLM backend
model_params: Parameters for the model
Returns:
Configured LLM instance
"""
if backend_type == "openai":
from langchain.llms import OpenAI
return OpenAI(
model_name=model_params.get("model_name", "gpt-4"),
temperature=model_params.get("temperature", 0.2),
max_tokens=model_params.get("max_tokens", 500)
)
elif backend_type == "anthropic":
from langchain.llms import Anthropic
return Anthropic(
model=model_params.get("model", "claude-3-sonnet"),
temperature=model_params.get("temperature", 0.3),
max_tokens_to_sample=model_params.get("max_tokens", 1000)
)
elif backend_type == "huggingface":
from langchain.llms import HuggingFacePipeline
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
model_id = model_params.get("model_id", "gpt2")
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(model_id)
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=model_params.get("max_new_tokens", 512),
temperature=model_params.get("temperature", 0.7)
)
return HuggingFacePipeline(pipeline=pipe)
elif backend_type == "ollama":
from langchain.llms import Ollama
return Ollama(
model=model_params.get("model", "llama3"),
temperature=model_params.get("temperature", 0.5)
)
else:
raise ValueError(f"Unsupported LLM backend: {backend_type}")
Conclusion
Summary of Approach and Benefits
In this article, we’ve built a comprehensive enterprise-grade RAG system using three powerful open-source tools from IBM, now hosted by the Linux Foundation:
- Docling handles document processing, extracting structured information from diverse document formats.
- Data Prep Kit transforms and optimizes the extracted content for retrieval and generation tasks.
- BeeAI provides the agent framework for creating intelligent interfaces that can query the document collection and generate coherent responses.
This approach offers several key benefits for enterprises:
- Scalability: The system can handle large document collections through efficient processing and memory management.
- Flexibility: Components can be swapped out or extended to meet specific requirements.
- Accuracy: By retrieving relevant information and grounding responses in source documents, the system delivers more accurate and trustworthy answers.
- Control: Organizations maintain full control over their data and the deployment environment.
Potential Use Cases and Applications
This enterprise-grade RAG system can be applied to various use cases:
- Knowledge Management: Enabling employees to quickly access information from corporate documentation, policies, and procedures.
- Customer Support: Providing agents with accurate information from product manuals, troubleshooting guides, and support documentation.
- Legal and Compliance: Assisting legal teams in retrieving relevant information from contracts, regulations, and case law.
- Research and Development: Helping researchers find relevant information across research papers, patents, and technical documentation.
- Financial Analysis: Supporting analysts in extracting insights from financial reports, market analyses, and industry research.
Future Directions for This Technology Stack
As these tools continue to evolve, several promising directions emerge:
- Multi-modal RAG: Extending the system to handle images, diagrams, and other non-textual content.
- Collaborative Agents: Creating more sophisticated agent architectures that collaborate to solve complex queries.
- Domain-Specific Optimization: Fine-tuning the pipeline for specific industries and use cases.
- Self-Improving Systems: Implementing feedback mechanisms that allow the system to learn from user interactions.
- Real-time Updates: Developing capabilities to continuously update the knowledge base as new documents become available.
The contribution of Docling, Data Prep Kit, and BeeAI to the Linux Foundation marks a significant step toward democratizing advanced AI capabilities in the enterprise space. By combining these tools, organizations can build powerful, customizable RAG systems that unlock the value hidden in their document repositories.
As the open source AI ecosystem continues to mature, we can expect even greater integration between these tools and other Linux Foundation projects, creating a robust foundation for the next generation of enterprise AI applications.