Building an Enterprise-Grade RAG System with Open Source Tools

Organizations today face a significant challenge: extracting valuable insights from vast document repositories while ensuring the accuracy and relevance of AI-generated responses. As the volume of enterprise data grows exponentially, traditional search systems struggle to deliver precise information when needed. This is where Retrieval Augmented Generation (RAG) systems become invaluable.

In April 2025, IBM made a significant contribution to the open-source community by donating three powerful tools to the Linux Foundation: Docling, Data Prep Kit, and BeeAI. Together, these tools provide a comprehensive framework for building enterprise-grade RAG systems that can transform how organizations interact with their document collections.

This article will guide you through building a complete end-to-end RAG system using these newly open-sourced tools. We’ll cover everything from document processing and data preparation to creating intelligent agents that can answer complex queries with accurate, contextually relevant information.

Understanding the Components

Before diving into implementation, let’s understand the three key components that make up our RAG system.

Docling: Document Processing Engine

Docling is a powerful open-source document processing library originally developed by IBM Research Zurich. Its primary purpose is to simplify document handling for generative AI applications through a unified approach to parsing diverse document formats.

Key features include:

  • Advanced PDF understanding and processing
  • Unified parsing for various document formats (PDF, DOCX, XLSX, HTML, images)
  • Extraction of document structure, including headings, sections, and tables
  • OCR support for scanned documents
  • Integration capabilities with LangChain and other AI frameworks

Docling transforms unstructured documents into structured formats that can be easily processed by language models. This is particularly valuable for RAG systems, which need to extract and understand content from diverse document sources.

Data Prep Kit: Unstructured Data Preparation

Data Prep Kit is designed to accelerate unstructured data preparation for Large Language Model applications. Developed by IBM Research, it helps developers cleanse, transform, and enrich use case-specific unstructured data.

The toolkit provides over 30 transformation modules for various aspects of data preparation:

  • Ingestion modules for converting HTML, code, and web content to processable formats
  • Cleaning modules for text normalization, PII redaction, and deduplication
  • Filtering modules for quality assessment and content filtering
  • Language processing modules for language identification and tokenization
  • Analysis modules for code quality assessment and content profiling

In the RAG pipeline, Data Prep Kit complements Docling by taking the structured output and performing essential transformations to optimize it for retrieval and generation tasks. This includes crucial steps like chunking, which divides documents into manageable segments that can be efficiently indexed and retrieved.

BeeAI: Multi-Agent Framework

BeeAI is an open-source platform for discovering, running, and composing AI agents across different frameworks and programming languages. As part of the Linux Foundation AI & Data program, it aims to create a unified ecosystem for AI agent development and deployment.

Key capabilities include:

  • Framework-agnostic agent integration
  • Complex multi-agent workflow creation
  • Agent composition and orchestration
  • First-class ecosystem support for Python and TypeScript
  • Open standards for interoperability

In our RAG system, BeeAI provides the framework for creating intelligent agents that can query the processed document collection, synthesize information, and generate coherent responses. The agent-based approach allows for more sophisticated reasoning and response generation compared to simple retrieval systems.

Prerequisites and Setup

Before we begin implementing our RAG system, let’s ensure we have the necessary prerequisites in place.

System Requirements

For an optimal development experience, your system should meet these minimum requirements:

  • CPU: 4+ cores recommended for performance
  • RAM: 8GB minimum, 16GB recommended for larger document collections
  • Storage: At least 5GB of free space for installation and document processing
  • Operating System: Linux, macOS, or Windows
  • Python: Version 3.10-3.12

Required Dependencies

Our implementation relies on several Python libraries:

pip install langchain chromadb openai pydantic tiktoken tqdm pillow matplotlib

Setting Up the Python Environment

It’s best practice to create a dedicated virtual environment for our project:

# Create a virtual environment
python -m venv enterprise-rag-env

# Activate the environment (Linux/macOS)
source enterprise-rag-env/bin/activate

# Activate the environment (Windows)
enterprise-rag-env\Scripts\activate

# Create a project directory
mkdir enterprise-rag
cd enterprise-rag

Installing the Tools

Now, let’s install the three core components of our RAG system.

Installing Docling

Docling can be installed directly via pip:

pip install docling

For the latest development version, you can install directly from the GitHub repository:

pip install git+https://github.com/docling-project/docling.git

After installation, verify that Docling is working correctly:

from docling import Document

# Check version
print(f"Docling version: {Document.__version__}")

# Simple test with a sample document
try:
    doc = Document.from_text("This is a test document.")
    print("Docling installation successful!")
except Exception as e:
    print(f"Docling installation issue: {e}")

Installing Data Prep Kit

For Data Prep Kit, we’ll install the full package with all optional dependencies:

pip install 'data-prep-toolkit-transforms[all]'

If you prefer a minimal installation without extra features:

pip install data-prep-toolkit-transforms

Verify the installation:

from data_prep_toolkit import pipeline

# Check version
print(f"Data Prep Kit version: {pipeline.__version__}")

# Create a simple pipeline to verify installation
try:
    test_pipeline = pipeline.Pipeline([])
    print("Data Prep Kit installation successful!")
except Exception as e:
    print(f"Data Prep Kit installation issue: {e}")

Installing BeeAI

BeeAI can be installed via several methods. For our purposes, we’ll use the Python package:

pip install beeai

Alternatively, for macOS/Linux users, Homebrew provides a convenient installation option:

brew install beeai

To verify the installation:

beeai --version

You should also initialize BeeAI for your project:

beeai init

Stage 1: Document Processing with Docling

With our tools installed, let’s begin building our RAG system, starting with document processing using Docling.

Setting Up the Document Corpus

First, we need to organize our document collection. Let’s create a structured directory for our example documents:

mkdir -p data/raw/pdfs
mkdir -p data/raw/docx
mkdir -p data/raw/html
mkdir -p data/processed

For demonstration purposes, you can populate these directories with sample documents:

import os
from docling import Document

# Create a sample document for testing
sample_text = """
# Enterprise Document Management

## Introduction
This document outlines best practices for enterprise document management.

## Key Principles
1. Centralized storage
2. Consistent metadata
3. Access control
4. Version tracking

## Implementation Guidelines
Organizations should establish clear policies for document creation, storage, and retention.
"""

# Save as various formats for testing
os.makedirs("data/raw/pdfs", exist_ok=True)
os.makedirs("data/raw/docx", exist_ok=True)
os.makedirs("data/raw/html", exist_ok=True)

# Create a sample PDF
doc = Document.from_text(sample_text)
doc.export_to_pdf("data/raw/pdfs/sample_document.pdf")

# Create a sample DOCX
doc.export_to_docx("data/raw/docx/sample_document.docx")

# Create a sample HTML
doc.export_to_html("data/raw/html/sample_document.html")

Processing Documents with Docling

Now, let’s process these documents using Docling. We’ll create a utility function to handle different document types:

import os
from glob import glob
from docling import Document
import logging

# Set up logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("docling-processor")

def process_documents(input_dir, output_dir, use_ocr=False):
    """
    Process all documents in the input directory and save the results.

    Args:
        input_dir: Directory containing documents to process
        output_dir: Directory to save processed documents
        use_ocr: Whether to use OCR for scanned documents

    Returns:
        List of processed document objects
    """
    os.makedirs(output_dir, exist_ok=True)

    # Get all document files recursively
    pdf_files = glob(os.path.join(input_dir, "**", "*.pdf"), recursive=True)
    docx_files = glob(os.path.join(input_dir, "**", "*.docx"), recursive=True)
    html_files = glob(os.path.join(input_dir, "**", "*.html"), recursive=True)

    all_files = pdf_files + docx_files + html_files
    logger.info(f"Found {len(all_files)} documents to process")

    processed_docs = []

    for file_path in all_files:
        try:
            filename = os.path.basename(file_path)
            logger.info(f"Processing {filename}")

            # Load document with appropriate settings
            doc = Document.from_file(file_path, use_ocr=use_ocr)

            # Save structured representation
            output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.json")
            doc.export_to_json(output_path)

            # Extract text
            text_output_path = os.path.join(output_dir, f"{os.path.splitext(filename)[0]}.txt")
            with open(text_output_path, "w", encoding="utf-8") as f:
                f.write(doc.get_text())

            processed_docs.append(doc)
            logger.info(f"Successfully processed {filename}")

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")

    return processed_docs

# Process all our documents
docs = process_documents("data/raw", "data/processed")
print(f"Successfully processed {len(docs)} documents")

Handling Document Structure

One of Docling’s strengths is its ability to maintain document structure during processing. This is crucial for RAG systems, as understanding the document’s hierarchy helps in providing more contextually relevant information.

Let’s create a function to extract structural elements:

def extract_document_structure(documents):
    """
    Extract and print structural information from processed documents.

    Args:
        documents: List of Document objects from Docling
    """
    for i, doc in enumerate(documents):
        print(f"\nDocument {i+1}: {doc.metadata.get('title', 'Untitled')}")

        # Extract document structure
        structure = doc.extract_structure()

        # Print hierarchy of sections
        print("Document Structure:")
        for section in structure.sections:
            indent = "  " * section.level
            print(f"{indent}- {section.title}")

        # Extract tables if present
        tables = doc.extract_tables()
        if tables:
            print(f"\nTables found: {len(tables)}")
            for j, table in enumerate(tables):
                print(f"  Table {j+1}: {len(table.rows)} rows x {len(table.columns)} columns")

        # Extract images if present
        images = doc.extract_images()
        if images:
            print(f"\nImages found: {len(images)}")

# Extract structure from our processed documents
extract_document_structure(docs)

When dealing with complex layouts, Docling offers configuration options to improve processing:

# For complex layouts, use the advanced processing mode
doc = Document.from_file(
    "complex_document.pdf",
    processing_mode="advanced",
    maintain_tables=True,
    extract_images=True,
    detect_orientation=True
)

# For documents with tables
tables = doc.extract_tables(
    detection_method="grid",
    confidence_threshold=0.7
)

# For documents with images
images = doc.extract_images(
    min_size=100,  # Minimum size in pixels
    include_charts=True
)

Stage 2: Data Preparation with Data Prep Kit

After processing our documents with Docling, we need to prepare the extracted data for our RAG system using Data Prep Kit.

Data Cleaning Techniques

Let’s start by cleaning the extracted document content:

import os
import pandas as pd
from data_prep_toolkit.transforms import (
    TextCleaner,
    PiiRedactor,
    Deduplicator,
    HtmlCleaner
)
from data_prep_toolkit import pipeline

def clean_document_content(input_dir, output_dir):
    """
    Clean the document content using Data Prep Kit.

    Args:
        input_dir: Directory containing processed document text files
        output_dir: Directory to save cleaned documents
    """
    os.makedirs(output_dir, exist_ok=True)

    # Get all text files
    text_files = [f for f in os.listdir(input_dir) if f.endswith(".txt")]

    # Create a cleaning pipeline
    cleaning_pipeline = pipeline.Pipeline([
        TextCleaner(
            remove_urls=False,  # Keep URLs as they might be references
            remove_emails=False,  # Keep emails as they might be contacts
            fix_unicode=True,    # Fix unicode issues
            normalize_whitespace=True,  # Standardize whitespace
            remove_accents=False,  # Keep accents for names and terms
            lowercase=False       # Maintain case for proper nouns and acronyms
        ),
        PiiRedactor(
            redact_names=False,  # Don't redact names as they might be important
            redact_locations=False,  # Don't redact locations
            redact_phone_numbers=True,  # Redact phone numbers
            redact_emails=True,   # Redact emails for privacy
            redact_financial=True  # Redact financial information
        ),
        Deduplicator(
            method="semantic",  # Use semantic deduplication for similar content
            threshold=0.85      # Set similarity threshold
        )
    ])

    # Process each file
    for text_file in text_files:
        input_path = os.path.join(input_dir, text_file)
        output_path = os.path.join(output_dir, text_file)

        with open(input_path, "r", encoding="utf-8") as f:
            content = f.read()

        # Clean the content
        cleaned_content = cleaning_pipeline.process_text(content)

        # Save cleaned content
        with open(output_path, "w", encoding="utf-8") as f:
            f.write(cleaned_content)

    print(f"Cleaned {len(text_files)} document files")

# Create directory for cleaned content
os.makedirs("data/cleaned", exist_ok=True)

# Clean our document content
clean_document_content("data/processed", "data/cleaned")

Data Transformation Steps

Next, we need to transform our cleaned documents into chunks suitable for embedding and retrieval:

from data_prep_toolkit.transforms import (
    TextChunker,
    MetadataExtractor,
    QualityFilter
)
import json

def transform_documents_for_rag(input_dir, output_dir):
    """
    Transform cleaned documents into chunks suitable for RAG.

    Args:
        input_dir: Directory containing cleaned document text files
        output_dir: Directory to save transformed chunks
    """
    os.makedirs(output_dir, exist_ok=True)

    # Get all text files
    text_files = [f for f in os.listdir(input_dir) if f.endswith(".txt")]

    # Create transformation pipeline
    transform_pipeline = pipeline.Pipeline([
        # Filter for quality content
        QualityFilter(
            min_length=50,      # Minimum chunk length
            max_length=8000,    # Maximum chunk length
            min_info_density=0.3  # Minimum information density
        ),
        # Extract metadata
        MetadataExtractor(
            extract_entities=True,   # Extract named entities
            extract_keywords=True,   # Extract keywords
            extract_sentiment=True   # Extract sentiment
        ),
        # Chunk the text
        TextChunker(
            chunk_size=1000,    # Target chunk size
            chunk_overlap=200,  # Overlap between chunks
            chunking_method="semantic"  # Use semantic chunking
        )
    ])

    all_chunks = []

    # Process each file
    for text_file in text_files:
        input_path = os.path.join(input_dir, text_file)
        document_id = os.path.splitext(text_file)[0]

        with open(input_path, "r", encoding="utf-8") as f:
            content = f.read()

        # Transform the content
        chunks_with_metadata = transform_pipeline.process_text_with_metadata(
            content,
            {"document_id": document_id, "source": input_path}
        )

        all_chunks.extend(chunks_with_metadata)

    # Save all chunks to a single file
    chunks_df = pd.DataFrame(all_chunks)
    chunks_df.to_parquet(os.path.join(output_dir, "document_chunks.parquet"))

    # Also save as JSON for easy inspection
    with open(os.path.join(output_dir, "document_chunks.json"), "w") as f:
        json.dump(all_chunks, f, indent=2)

    print(f"Created {len(all_chunks)} chunks from {len(text_files)} documents")
    return chunks_df

# Create directory for transformed content
os.makedirs("data/transformed", exist_ok=True)

# Transform our documents
chunks_df = transform_documents_for_rag("data/cleaned", "data/transformed")

Creating a Processed Dataset

Once we have our transformed chunks, we need to prepare them for embedding and indexing:

def create_vector_dataset(chunks_df, output_dir):
    """
    Create a dataset suitable for vector database indexing.

    Args:
        chunks_df: DataFrame containing document chunks
        output_dir: Directory to save the dataset
    """
    os.makedirs(output_dir, exist_ok=True)

    # Prepare data for embedding
    vector_data = []

    for i, row in chunks_df.iterrows():
        # Combine text with relevant metadata for improved retrieval
        if 'keywords' in row and row['keywords']:
            enriched_text = f"Keywords: {', '.join(row['keywords'])}\n\n{row['text']}"
        else:
            enriched_text = row['text']

        # Create metadata dictionary
        metadata = {
            "document_id": row['document_id'],
            "source": row['source'],
            "chunk_id": i,
        }

        # Add optional metadata if available
        for field in ['entities', 'sentiment', 'section_title']:
            if field in row and row[field]:
                metadata[field] = row[field]

        vector_data.append({
            "id": f"chunk_{i}",
            "text": enriched_text,
            "metadata": metadata
        })

    # Save the dataset
    with open(os.path.join(output_dir, "vector_dataset.json"), "w") as f:
        json.dump(vector_data, f, indent=2)

    print(f"Created vector dataset with {len(vector_data)} entries")
    return vector_data

# Create directory for vector data
os.makedirs("data/vector", exist_ok=True)

# Create vector dataset
vector_data = create_vector_dataset(chunks_df, "data/vector")

Stage 3: Building an Agent Interface with BeeAI

Now that we have our processed document data, let’s use BeeAI to create an agent interface for our RAG system.

Designing the Agent Architecture

First, let’s design our agent architecture in a YAML configuration file:

# Save agent configuration to a file
agent_config = """
agents:
  - name: retriever
    type: vector_search
    description: "Searches the document repository for relevant information"
    config:
      vector_db_type: "chroma"
      vector_db_path: "./data/vector_db"
      embedding_model: "openai"
      similarity_top_k: 5

  - name: reader
    type: llm
    description: "Reads and comprehends the retrieved document chunks"
    config:
      model: "gpt-4"
      temperature: 0.2
      max_tokens: 500

  - name: synthesizer
    type: llm
    description: "Synthesizes information and generates a coherent response"
    config:
      model: "claude-3-sonnet"
      temperature: 0.3
      max_tokens: 1000
      system_prompt: "You are a helpful assistant that provides accurate information based on the given context. Always cite your sources."

workflow:
  - from: retriever
    to: reader
    condition: "always"
    data_mapping:
      documents: retriever_results

  - from: reader
    to: synthesizer
    condition: "always"
    data_mapping:
      context: comprehended_documents
      query: original_query
"""

# Write the configuration to a file
with open("rag_agents.yaml", "w") as f:
    f.write(agent_config)

Implementing Query Agents

Next, let’s create a script to set up our vector database and implement the query agent:

import os
import json
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

def setup_vector_db(vector_data, db_dir):
    """
    Set up a ChromaDB vector database with our document chunks.

    Args:
        vector_data: List of document chunks with metadata
        db_dir: Directory to store the vector database
    """
    os.makedirs(db_dir, exist_ok=True)

    # Prepare documents for Chroma
    texts = [item["text"] for item in vector_data]
    metadatas = [item["metadata"] for item in vector_data]
    ids = [item["id"] for item in vector_data]

    # Initialize embeddings
    embedding_function = OpenAIEmbeddings()

    # Create and persist the vector database
    vectordb = Chroma.from_texts(
        texts=texts,
        metadatas=metadatas,
        ids=ids,
        embedding=embedding_function,
        persist_directory=db_dir
    )

    vectordb.persist()
    print(f"Vector database created at {db_dir}")
    return vectordb

# Set up our vector database
db_dir = "data/vector_db"
vectordb = setup_vector_db(vector_data, db_dir)

Now, let’s implement the retriever agent using BeeAI:

from beeai.agents import VectorSearchAgent, LLMAgent
from beeai.messaging import Message
import yaml

class RAGRetrieverAgent(VectorSearchAgent):
    """Custom retriever agent for RAG system."""

    def __init__(self, vector_db, **kwargs):
        super().__init__(**kwargs)
        self.vector_db = vector_db

    async def process(self, message):
        """Process incoming query and retrieve relevant documents."""
        query = message.content

        # Log the query
        print(f"Processing query: {query}")

        # Retrieve relevant documents
        docs = self.vector_db.similarity_search(query, k=5)

        # Format results
        results = []
        for i, doc in enumerate(docs):
            results.append({
                "content": doc.page_content,
                "metadata": doc.metadata,
                "relevance_score": 1.0 - (i * 0.1)  # Simple scoring for illustration
            })

        # Return results
        return Message(
            sender=self.name,
            content=results,
            metadata={"original_query": query}
        )

# Initialize retriever agent
retriever_agent = RAGRetrieverAgent(
    name="retriever",
    vector_db=vectordb,
    description="Searches the document repository for relevant information"
)

Implementing Response Synthesis Agents

Now, let’s implement the reader and synthesizer agents:

class RAGReaderAgent(LLMAgent):
    """Agent for comprehending retrieved documents."""

    async def process(self, message):
        """Process retrieved documents to comprehend their meaning."""
        documents = message.content
        query = message.metadata.get("original_query", "")

        # Prepare context from documents
        context = ""
        for i, doc in enumerate(documents):
            context += f"\nDocument {i+1}:\n"
            context += f"Source: {doc['metadata'].get('source', 'Unknown')}\n"
            context += f"Content: {doc['content']}\n"
            context += "-" * 40 + "\n"

        # Create prompt for the LLM
        prompt = f"""
        Query: {query}

        You have been provided with the following document extracts:

        {context}

        Please comprehend these documents carefully, focusing on information relevant to the query.
        Extract key facts, concepts, and information that address the query.
        """

        # Use the LLM to comprehend the documents
        response = await self.llm.generate(prompt)

        # Return comprehended information
        return Message(
            sender=self.name,
            content=response.content,
            metadata={
                "original_query": query,
                "document_sources": [doc["metadata"].get("source") for doc in documents]
            }
        )

class RAGSynthesizerAgent(LLMAgent):
    """Agent for synthesizing information and generating responses."""

    async def process(self, message):
        """Synthesize comprehended information into a coherent response."""
        comprehended_info = message.content
        query = message.metadata.get("original_query", "")
        document_sources = message.metadata.get("document_sources", [])

        # Create prompt for the LLM
        prompt = f"""
        Query: {query}

        Based on the analysis of relevant documents, synthesize a comprehensive response that addresses the query.

        Information extracted from documents:
        {comprehended_info}

        Your response should:
        1. Be clear, concise, and directly answer the query
        2. Integrate information from multiple documents when relevant
        3. Include citations to the original documents where appropriate
        4. Acknowledge any limitations or uncertainties in the available information

        Sources: {', '.join(document_sources)}
        """

        # Use the LLM to generate the final response
        response = await self.llm.generate(prompt)

        # Return synthesized response
        return Message(
            sender=self.name,
            content=response.content,
            metadata={
                "original_query": query,
                "document_sources": document_sources
            }
        )

# Initialize reader and synthesizer agents
reader_agent = RAGReaderAgent(
    name="reader",
    description="Reads and comprehends the retrieved document chunks",
    model="gpt-4",
    temperature=0.2,
    max_tokens=500
)

synthesizer_agent = RAGSynthesizerAgent(
    name="synthesizer",
    description="Synthesizes information and generates a coherent response",
    model="claude-3-sonnet",
    temperature=0.3,
    max_tokens=1000,
    system_prompt="You are a helpful assistant that provides accurate information based on the given context. Always cite your sources."
)

Integrating the Components

Now, let’s integrate all the components into a complete RAG system.

Building the Complete Pipeline

Here’s how our components fit together in a unified pipeline:

┌───────────────────┐     ┌───────────────────┐     ┌───────────────────┐
│                   │     │                   │     │                   │
│  Document Corpus  │────▶│     Docling       │────▶│  Processed Docs   │
│                   │     │                   │     │                   │
└───────────────────┘     └───────────────────┘     └─────────┬─────────┘
                                                              │
                                                              ▼
┌───────────────────┐     ┌───────────────────┐     ┌───────────────────┐
│                   │     │                   │     │                   │
│   Vector Store    │◀────│   Data Prep Kit   │◀────│   Cleaned Docs    │
│                   │     │                   │     │                   │
└─────────┬─────────┘     └───────────────────┘     └───────────────────┘
          │
          ▼
┌───────────────────┐     ┌───────────────────┐     ┌───────────────────┐
│                   │     │                   │     │                   │
│ Retriever Agent   │────▶│   Reader Agent    │────▶│ Synthesizer Agent │
│                   │     │                   │     │                   │
└───────────────────┘     └───────────────────┘     └───────────────────┘
          ▲                                                   │
          │                                                   │
          │                                                   ▼
┌───────────────────┐                              ┌───────────────────┐
│                   │                              │                   │
│   User Query      │                              │     Response      │
│                   │                              │                   │
└───────────────────┘                              └───────────────────┘

Integration Code

Let’s create a unified script that ties everything together:

import os
import json
import asyncio
from beeai.workflow import Workflow

async def rag_pipeline(query, agent_config_path="rag_agents.yaml"):
    """
    Run the complete RAG pipeline to answer a query.

    Args:
        query: The user's question
        agent_config_path: Path to the agent configuration file

    Returns:
        The synthesized response to the query
    """
    # Load agent configuration
    with open(agent_config_path, "r") as f:
        config = yaml.safe_load(f)

    # Create workflow
    workflow = Workflow.from_config(config)

    # Add our agents to the workflow
    workflow.add_agent(retriever_agent)
    workflow.add_agent(reader_agent)
    workflow.add_agent(synthesizer_agent)

    # Process the query
    print(f"Processing query: {query}")
    result = await workflow.run(query)

    return result.content

# Example usage
async def main():
    # Process a sample query
    query = "What are the key principles of enterprise document management?"
    response = await rag_pipeline(query)

    print("\nQuery:", query)
    print("\nResponse:", response)

if __name__ == "__main__":
    asyncio.run(main())

For production environments, we’d want to add more robust error handling, logging, and configuration options:

import logging
import argparse
import yaml
from beeai.config import Config

def setup_production_rag(config_path):
    """
    Set up a production-ready RAG system.

    Args:
        config_path: Path to configuration file

    Returns:
        Configured RAG pipeline function
    """
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("rag_system.log"),
            logging.StreamHandler()
        ]
    )
    logger = logging.getLogger("enterprise-rag")

    # Load configuration
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)

    # Extract configuration parameters
    doc_processing_config = config.get("document_processing", {})
    data_prep_config = config.get("data_preparation", {})
    vector_db_config = config.get("vector_database", {})
    agent_config = config.get("agents", {})

    # Set up document processing
    logger.info("Setting up document processing...")
    # (Setup code here)

    # Set up data preparation
    logger.info("Setting up data preparation...")
    # (Setup code here)

    # Set up vector database
    logger.info("Setting up vector database...")
    # (Setup code here)

    # Set up agents
    logger.info("Setting up agents...")
    # (Setup code here)

    # Return query function
    async def query_rag(query_text):
        try:
            logger.info(f"Processing query: {query_text}")
            # Process query
            # (Query processing code here)
            return response
        except Exception as e:
            logger.error(f"Error processing query: {e}")
            return f"An error occurred while processing your query: {str(e)}"

    return query_rag

# Command-line interface for production deployment
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Enterprise RAG System")
    parser.add_argument("--config", default="config.yaml", help="Path to configuration file")
    parser.add_argument("--query", help="Query to process")
    args = parser.parse_args()

    rag_function = setup_production_rag(args.config)

    if args.query:
        response = asyncio.run(rag_function(args.query))
        print(response)
    else:
        # Start interactive mode or API server
        print("Starting interactive mode...")
        # (Interactive mode code here)

Testing and Evaluation

After building our RAG system, it’s essential to test and evaluate its performance.

Sample Queries and Results

Let’s test our system with several sample queries:

import asyncio

async def test_rag_system():
    """Test the RAG system with sample queries."""
    test_queries = [
        "What are the key principles of enterprise document management?",
        "How should organizations implement document retention policies?",
        "What are the best practices for access control in document management?",
        "How does centralized storage benefit enterprise document management?",
        "What are the challenges in implementing version tracking for documents?"
    ]

    for query in test_queries:
        print("\n" + "="*80)
        print(f"Query: {query}")
        print("-"*80)

        response = await rag_pipeline(query)
        print(f"Response: {response}")

        # Simple evaluation - request user feedback
        print("-"*80)
        print("Evaluation (1-5, where 5 is excellent):")
        print("1. Relevance: Did the answer address the question?")
        print("2. Accuracy: Was the information correct?")
        print("3. Completeness: Did the answer cover all aspects?")
        print("4. Citation: Were sources properly cited?")
        print("5. Clarity: Was the answer clear and understandable?")

if __name__ == "__main__":
    asyncio.run(test_rag_system())

Performance Metrics

For a more systematic evaluation, we can implement automated metrics:

def evaluate_rag_performance(queries, ground_truth, responses):
    """
    Evaluate RAG system performance using multiple metrics.

    Args:
        queries: List of test queries
        ground_truth: List of expected answers
        responses: List of system responses

    Returns:
        Dictionary of performance metrics
    """
    from rouge import Rouge
    import numpy as np

    rouge = Rouge()

    metrics = {
        "relevance_scores": [],
        "citation_rates": [],
        "response_times": [],
        "rouge_scores": []
    }

    for query, truth, response in zip(queries, ground_truth, responses):
        # Calculate ROUGE scores (lexical overlap)
        try:
            rouge_scores = rouge.get_scores(response, truth)
            rouge_l_f = rouge_scores[0]["rouge-l"]["f"]
            metrics["rouge_scores"].append(rouge_l_f)
        except Exception as e:
            print(f"Error calculating ROUGE: {e}")
            metrics["rouge_scores"].append(0)

        # Count citations
        citation_count = response.lower().count("source") + response.lower().count("document")
        metrics["citation_rates"].append(citation_count)

        # Simple heuristic for relevance (keyword matching)
        query_keywords = set(query.lower().split())
        response_words = set(response.lower().split())
        keyword_overlap = len(query_keywords.intersection(response_words)) / len(query_keywords)
        metrics["relevance_scores"].append(keyword_overlap)

    # Calculate aggregate metrics
    results = {
        "avg_rouge_l_f": np.mean(metrics["rouge_scores"]),
        "avg_citation_rate": np.mean(metrics["citation_rates"]),
        "avg_relevance": np.mean(metrics["relevance_scores"]),
    }

    return results

Troubleshooting Common Issues

When building an enterprise-grade RAG system, you might encounter various challenges. Here are solutions for common issues:

Document Processing Failures with Docling

def troubleshoot_docling(doc_path):
    """
    Troubleshoot issues with Docling document processing.

    Args:
        doc_path: Path to the problematic document
    """
    try:
        # Attempt basic loading
        doc = Document.from_file(doc_path)
        print("Basic loading successful")
    except Exception as e:
        print(f"Basic loading failed: {e}")

        # Try with different options
        try:
            print("Attempting with OCR...")
            doc = Document.from_file(doc_path, use_ocr=True)
            print("OCR loading successful")
        except Exception as e:
            print(f"OCR loading failed: {e}")

        try:
            print("Attempting with minimal processing...")
            doc = Document.from_file(
                doc_path,
                processing_mode="minimal",
                maintain_tables=False,
                extract_images=False
            )
            print("Minimal processing successful")
        except Exception as e:
            print(f"Minimal processing failed: {e}")

Data Transformation Challenges with Data Prep Kit

def troubleshoot_data_prep(text, chunk_size=1000):
    """
    Troubleshoot issues with Data Prep Kit.

    Args:
        text: Text that's causing issues
        chunk_size: Current chunk size setting
    """
    from data_prep_toolkit.transforms import TextChunker

    print(f"Text length: {len(text)} characters")

    # Try different chunking strategies
    strategies = ["fixed", "sentence", "paragraph", "semantic"]

    for strategy in strategies:
        print(f"\nTesting {strategy} chunking:")
        try:
            chunker = TextChunker(
                chunk_size=chunk_size,
                chunking_method=strategy
            )
            chunks = chunker.process_text(text)
            print(f"Success! Created {len(chunks)} chunks")
            print(f"Average chunk size: {sum(len(c) for c in chunks) / len(chunks)}")

            # Show sample
            if chunks:
                print(f"Sample chunk: {chunks[0][:100]}...")
        except Exception as e:
            print(f"Failed: {e}")

Agent Communication Problems in BeeAI

def troubleshoot_agent_communication(workflow):
    """
    Troubleshoot issues with agent communication in BeeAI.

    Args:
        workflow: The BeeAI workflow object
    """
    print("Validating agent configuration...")

    # Check agent definitions
    for agent in workflow.agents:
        print(f"\nAgent: {agent.name}")
        print(f"  Type: {agent.__class__.__name__}")
        print(f"  Required inputs: {agent.required_inputs if hasattr(agent, 'required_inputs') else 'Not specified'}")
        print(f"  Output format: {agent.output_format if hasattr(agent, 'output_format') else 'Not specified'}")

    # Check connections
    print("\nValidating connections:")
    for connection in workflow.connections:
        print(f"  {connection.source} -> {connection.target}")
        print(f"  Condition: {connection.condition}")

        # Validate data mapping
        if hasattr(connection, "data_mapping") and connection.data_mapping:
            print(f"  Data mapping: {connection.data_mapping}")

            # Check source agent outputs against target agent inputs
            source_agent = workflow.get_agent(connection.source)
            target_agent = workflow.get_agent(connection.target)

            if hasattr(source_agent, "output_format") and hasattr(target_agent, "required_inputs"):
                for target_input, source_output in connection.data_mapping.items():
                    if source_output not in source_agent.output_format:
                        print(f"  WARNING: {source_output} not in {connection.source}'s outputs")
                    if target_input not in target_agent.required_inputs:
                        print(f"  WARNING: {target_input} not in {connection.target}'s required inputs")

Memory Management for Large Document Collections

def optimize_memory_usage(doc_dir, chunk_size, batch_size=10):
    """
    Optimize memory usage for large document collections.

    Args:
        doc_dir: Directory containing documents
        chunk_size: Size of chunks to process
        batch_size: Number of documents to process in a batch
    """
    import os
    import gc
    import psutil

    process = psutil.Process(os.getpid())

    # List all documents
    doc_files = [f for f in os.listdir(doc_dir) if os.path.isfile(os.path.join(doc_dir, f))]
    total_docs = len(doc_files)

    print(f"Total documents: {total_docs}")
    print(f"Processing in batches of {batch_size}")

    # Process in batches
    for i in range(0, total_docs, batch_size):
        batch = doc_files[i:i+batch_size]

        print(f"\nBatch {i//batch_size + 1}/{(total_docs-1)//batch_size + 1}")
        print(f"Memory usage before processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")

        # Process batch
        # (Your processing code here)

        # Force garbage collection
        gc.collect()

        print(f"Memory usage after processing: {process.memory_info().rss / 1024 / 1024:.2f} MB")

Handling Unsupported Document Formats

def handle_unsupported_format(file_path):
    """
    Handle unsupported document formats.

    Args:
        file_path: Path to the unsupported document
    """
    import os
    import subprocess
    import mimetypes

    # Determine file type
    mime_type, _ = mimetypes.guess_type(file_path)
    extension = os.path.splitext(file_path)[1].lower()

    print(f"File: {file_path}")
    print(f"MIME type: {mime_type}")
    print(f"Extension: {extension}")

    if mime_type and mime_type.startswith('image/'):
        # Handle image files
        print("Detected image file, using OCR...")
        # Use Docling with OCR
        from docling import Document
        doc = Document.from_file(file_path, use_ocr=True)
        return doc.get_text()

    elif extension in ['.epub', '.mobi', '.azw']:
        # Handle e-book formats
        print("Detected e-book format, converting...")
        # Use calibre's ebook-convert if available
        output_path = os.path.splitext(file_path)[0] + ".txt"
        try:
            subprocess.run(['ebook-convert', file_path, output_path], check=True)
            with open(output_path, 'r', encoding='utf-8') as f:
                return f.read()
        except (subprocess.SubprocessError, FileNotFoundError) as e:
            print(f"Conversion failed: {e}")
            return None

Advanced Customization and Extensions

As your RAG system matures, you may want to extend and customize it for specific needs.

Custom Transformation Modules

You can create custom transformation modules for Data Prep Kit:

from data_prep_toolkit.transforms import BaseTransform
import re

class DomainSpecificTransform(BaseTransform):
    """Custom transformation for a specific domain."""

    def __init__(self, domain_terms_file=None, **kwargs):
        super().__init__(**kwargs)
        self.domain_terms = {}

        if domain_terms_file:
            self.load_domain_terms(domain_terms_file)

    def load_domain_terms(self, file_path):
        """Load domain-specific terms and their explanations."""
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                parts = line.strip().split('|')
                if len(parts) >= 2:
                    term, explanation = parts[0].strip(), parts[1].strip()
                    self.domain_terms[term] = explanation

    def process_text(self, text):
        """Enhance text with domain-specific term explanations."""
        enhanced_text = text

        for term, explanation in self.domain_terms.items():
            # Add explanations for domain terms
            pattern = r'\b' + re.escape(term) + r'\b'
            if re.search(pattern, text, re.IGNORECASE):
                footnote = f"\n[{term}]: {explanation}"
                if footnote not in enhanced_text:
                    enhanced_text += footnote

        return enhanced_text

# Usage
domain_transform = DomainSpecificTransform(domain_terms_file="financial_terms.txt")
enhanced_text = domain_transform.process_text("The company reported significant EBITDA growth.")

Specialized Agents for Domain-Specific Tasks

For specific domains, you can create specialized agents:

class FinancialAnalysisAgent(LLMAgent):
    """Specialized agent for financial document analysis."""

    async def process(self, message):
        """Extract and analyze financial metrics from documents."""
        documents = message.content
        query = message.metadata.get("original_query", "")

        # Extract financial metrics
        metrics = self._extract_financial_metrics(documents)

        # Format for the LLM
        prompt = f"""
        Query: {query}

        I've extracted the following financial metrics from the documents:

        {metrics}

        Analyze these metrics to answer the query. Consider trends, comparisons,
        and implications for the business. Be precise with numbers and
        calculations.
        """

        # Generate analysis
        response = await self.llm.generate(prompt)

        return Message(
            sender=self.name,
            content=response.content,
            metadata={
                "original_query": query,
                "financial_metrics": metrics
            }
        )

    def _extract_financial_metrics(self, documents):
        """Extract financial metrics from documents."""
        import re

        # Patterns for common financial metrics
        patterns = {
            "revenue": r'\$?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)(?:\s*million|\s*billion)?\s*(?:in\s*revenue|revenue)',
            "profit": r'\$?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)(?:\s*million|\s*billion)?\s*(?:in\s*profit|profit)',
            "growth": r'(\d+(?:\.\d+)?)%\s*(?:growth|increase|decrease)'
        }

        metrics = {}

        for doc in documents:
            content = doc["content"]
            for metric_name, pattern in patterns.items():
                matches = re.findall(pattern, content, re.IGNORECASE)
                if matches and metric_name not in metrics:
                    metrics[metric_name] = matches

        # Format extracted metrics
        formatted_metrics = ""
        for metric, values in metrics.items():
            formatted_metrics += f"{metric.capitalize()}: {', '.join(values)}\n"

        return formatted_metrics

Integration with Other Open Source Tools

Our RAG system can be extended to work with various vector databases and LLM backends:

def setup_vector_database(vector_data, db_type="chroma", connection_params=None):
    """
    Set up different vector database backends.

    Args:
        vector_data: Processed document chunks
        db_type: Type of vector database to use
        connection_params: Connection parameters for the database

    Returns:
        Configured vector database instance
    """
    from langchain.vectorstores import Chroma, FAISS, Milvus, Qdrant, PGVector
    from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings

    # Set up embeddings
    embeddings = OpenAIEmbeddings()

    # Extract data
    texts = [item["text"] for item in vector_data]
    metadatas = [item["metadata"] for item in vector_data]

    # Configure database based on type
    if db_type == "chroma":
        db = Chroma.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embeddings,
            persist_directory=connection_params.get("persist_directory", "./chroma_db")
        )
        db.persist()

    elif db_type == "faiss":
        db = FAISS.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embeddings
        )
        # Optionally save
        if "save_path" in connection_params:
            db.save_local(connection_params["save_path"])

    elif db_type == "milvus":
        db = Milvus.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embeddings,
            connection_args=connection_params
        )

    elif db_type == "qdrant":
        db = Qdrant.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embeddings,
            url=connection_params.get("url"),
            collection_name=connection_params.get("collection_name", "documents")
        )

    elif db_type == "pgvector":
        db = PGVector.from_texts(
            texts=texts,
            metadatas=metadatas,
            embedding=embeddings,
            connection_string=connection_params.get("connection_string")
        )

    else:
        raise ValueError(f"Unsupported vector database type: {db_type}")

    return db

For LLM backends:

def setup_llm_backend(backend_type="openai", model_params=None):
    """
    Set up different LLM backends.

    Args:
        backend_type: Type of LLM backend
        model_params: Parameters for the model

    Returns:
        Configured LLM instance
    """
    if backend_type == "openai":
        from langchain.llms import OpenAI
        return OpenAI(
            model_name=model_params.get("model_name", "gpt-4"),
            temperature=model_params.get("temperature", 0.2),
            max_tokens=model_params.get("max_tokens", 500)
        )

    elif backend_type == "anthropic":
        from langchain.llms import Anthropic
        return Anthropic(
            model=model_params.get("model", "claude-3-sonnet"),
            temperature=model_params.get("temperature", 0.3),
            max_tokens_to_sample=model_params.get("max_tokens", 1000)
        )

    elif backend_type == "huggingface":
        from langchain.llms import HuggingFacePipeline
        import torch
        from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

        model_id = model_params.get("model_id", "gpt2")
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        model = AutoModelForCausalLM.from_pretrained(model_id)

        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            max_new_tokens=model_params.get("max_new_tokens", 512),
            temperature=model_params.get("temperature", 0.7)
        )

        return HuggingFacePipeline(pipeline=pipe)

    elif backend_type == "ollama":
        from langchain.llms import Ollama
        return Ollama(
            model=model_params.get("model", "llama3"),
            temperature=model_params.get("temperature", 0.5)
        )

    else:
        raise ValueError(f"Unsupported LLM backend: {backend_type}")

Conclusion

Summary of Approach and Benefits

In this article, we’ve built a comprehensive enterprise-grade RAG system using three powerful open-source tools from IBM, now hosted by the Linux Foundation:

  1. Docling handles document processing, extracting structured information from diverse document formats.
  2. Data Prep Kit transforms and optimizes the extracted content for retrieval and generation tasks.
  3. BeeAI provides the agent framework for creating intelligent interfaces that can query the document collection and generate coherent responses.

This approach offers several key benefits for enterprises:

  • Scalability: The system can handle large document collections through efficient processing and memory management.
  • Flexibility: Components can be swapped out or extended to meet specific requirements.
  • Accuracy: By retrieving relevant information and grounding responses in source documents, the system delivers more accurate and trustworthy answers.
  • Control: Organizations maintain full control over their data and the deployment environment.

Potential Use Cases and Applications

This enterprise-grade RAG system can be applied to various use cases:

  • Knowledge Management: Enabling employees to quickly access information from corporate documentation, policies, and procedures.
  • Customer Support: Providing agents with accurate information from product manuals, troubleshooting guides, and support documentation.
  • Legal and Compliance: Assisting legal teams in retrieving relevant information from contracts, regulations, and case law.
  • Research and Development: Helping researchers find relevant information across research papers, patents, and technical documentation.
  • Financial Analysis: Supporting analysts in extracting insights from financial reports, market analyses, and industry research.

Future Directions for This Technology Stack

As these tools continue to evolve, several promising directions emerge:

  • Multi-modal RAG: Extending the system to handle images, diagrams, and other non-textual content.
  • Collaborative Agents: Creating more sophisticated agent architectures that collaborate to solve complex queries.
  • Domain-Specific Optimization: Fine-tuning the pipeline for specific industries and use cases.
  • Self-Improving Systems: Implementing feedback mechanisms that allow the system to learn from user interactions.
  • Real-time Updates: Developing capabilities to continuously update the knowledge base as new documents become available.

The contribution of Docling, Data Prep Kit, and BeeAI to the Linux Foundation marks a significant step toward democratizing advanced AI capabilities in the enterprise space. By combining these tools, organizations can build powerful, customizable RAG systems that unlock the value hidden in their document repositories.

As the open source AI ecosystem continues to mature, we can expect even greater integration between these tools and other Linux Foundation projects, creating a robust foundation for the next generation of enterprise AI applications.