DEV Community

Cover image for Pre-Cloud Development Chatbot with Streamlit, Langchain, OpenAI and MongoDB Atlas Vector Search
Amanda Ruzza
Amanda Ruzza

Posted on • Updated on

Pre-Cloud Development Chatbot with Streamlit, Langchain, OpenAI and MongoDB Atlas Vector Search

Introduction

In this blog, I’ll discuss how I built a Retrieval-Augmented Generation (RAG) system capable of processing and retrieving information from multiple PDFs on my local machine, with the end goal of deploying it at a production level in AWS and GCP.

With cost, security, and performance in mind, I explored affordable alternatives for handling terabytes of data in a real world scenario. It's crucial to recognize that not all PDFs are created equal. Developers must handle various PDF text extraction challenges, such as AES encryption, watermarks, or slow processing times, to ensure a smooth user experience.

While powerful and costly AWS and GCP services could handle PDF processing, they are not feasible for production due to cost concerns. Therefore, I developed a solution using two open-source tools: PyPDF and PyTesseract.

Additionally, I implemented what I call 'pre-cloud-development-observability' features, such as OpenAI Token usage and API costs, application execution time, and MongoDB specific operation metrics, all logged for analysis. - After all, who doesn't enjoy delving into log files to optimize performance? 🙋🏻‍♀️

Note: This blog is an in depth explanation of this application. For the Setup Guide and Python/Application Script, refer to the Github repository.

Application stack:

  • Streamlit - Front End
  • OpenAi - LLM/Foundation Model
  • Langchain - NLP Orchestration
  • MongoDB Atlas Vector Search - Cloud-based Vector Database
  • Dotenv - Local secret management
  • PyPDF - PDF text extraction
  • PyTesseract - OCR on AES Encrypted PDFs or PDFs with images in the background that would result in an empty text extraction

Key Features

  • Secure API/TOKEN keys connection hidden in the .env file
  • Processes multiple files - up to 200MB - within 1 single upload operation
  • Capability to answer questions based on pre-processed documents stored in the database - no need to reupload the same PDFs
  • Text extraction from AES-encrypted PDFs or those with background images
  • Parallel text extraction for PDFs > 5MB for improved performance
  • A 'Clear Chat History' button
  • Observability/logging features for future Cloud Development considerations:
    • Langchain callback function that calculates OpenAi token usage.
    • MongoDB operation specific logs recorded through the pymongo driver
    • Script execution time measurement ## Application Demo Video

System Architecture Overview

Architecture Diagram

The entire application runs from one Python file named chatbot-app.py. The UI, built with Streamlit, processes PDFs using either simple text extraction or OCR. Langchain serves as the application's 'master brain,' creating vector embeddings, sending them to the database, and communicating with the foundation model, OpenAI.

PDF upload and text extraction


Two Python packages are used for text extraction:

Users upload multiple files (up to Streamlit's 200MB limit) in the UI's Sidebar and click 'Process'. Streamlit then invokes the get_pdf_text function, which is part of the process_pdf logic. process_pdf attempts text extraction in the following order:

  • Simple extraction with PyPDF
  • IF text extraction fails or an error occurs (e.g., due to encryption, or a watermark in the background)
  • ELSE ocr_on_pdf is invoked for OCR processing, using parallel processing for files > 5MB through a ThreadPoolExecutor.
def process_pdf(pdf):
    try:
        with tempfile.NamedTemporaryFile(delete=False) as temp_pdf:
            temp_pdf.write(pdf.read())
            temp_pdf_path = temp_pdf.name

        file_size = getsize(temp_pdf_path) / (1024 * 1024)  # Size in MB
        logging.info(f"Processing PDF: {pdf.name}, Size: {file_size:.2f} MB")

        if file_size == 0:
            logging.warning(f"The PDF file '{pdf.name}' is empty.")
            return ""

        pdf_reader = PdfReader(temp_pdf_path)

        try:
            text_from_pdf = "".join(page.extract_text() or "" for page in pdf_reader.pages)
        except Exception as e:
            # Catch specific exception for AES encryption
            if "cryptography>=3.1 is required for AES algorithm" in str(e):
                logging.warning(f"PDF '{pdf.name}' is AES encrypted. Performing OCR.")
                return ocr_on_pdf(temp_pdf_path)
            else:
                raise e

        if not text_from_pdf:
            logging.warning(f"No text extracted from '{pdf.name}'. Performing OCR.")
            return ocr_on_pdf(temp_pdf_path)

        logging.info(f"Processed PDF: {pdf.name}")
        return text_from_pdf

    except Exception as e:
        logging.error(f"Error processing PDF: {pdf.name}. Error: {e}")
        return ""



def get_pdf_text(pdf_docs):
    return "".join(process_pdf(pdf) for pdf in pdf_docs)
Enter fullscreen mode Exit fullscreen mode



Below is the ocr_on_pdf function with pytesseract and the ThreadPoolExecutor:

def ocr_on_pdf(pdf_path):
    try:
        pytesseract.pytesseract.tesseract_cmd = getenv("TESSERACT_PATH")
        images = convert_from_path(pdf_path)
        file_size = path.getsize(pdf_path) / (1024 * 1024)  # Size in MB

        if file_size > 5:  # If file is larger than 5MB
            with ThreadPoolExecutor() as executor:
                extracted_texts = list(executor.map(ocr_single_page, images))
            extracted_text = "\n".join(extracted_texts)
            logging.info(f"Parallel OCR completed for large file: {pdf_path}")
        else:
            extracted_text = "\n".join(ocr_single_page(image) for image in images)
            logging.info(f"Sequential OCR completed for small file: {pdf_path}")

        return extracted_text
    except Exception as e:
        logging.error(f"Error during OCR on PDF: {e}")
        return ""
Enter fullscreen mode Exit fullscreen mode

Text conversion into vectors, storage and retrieval

Once extracted, langchain begins to do its 'orchestration magic' by splitting up the texts into chunks of 1000 characters each through the CharacterTextSplitter class:

def get_text_chunks(text):
    text_splitter = CharacterTextSplitter(
        separator="\n", 
        chunk_size=1000,
        chunk_overlap=200,
        length_function=len
    )
    chunks = [chunk for chunk in text_splitter.split_text(text)]
Enter fullscreen mode Exit fullscreen mode



The text chunks are vectorized using Langchain's OpenAIEmbeddings class and stored in the Vector Database:

def get_vectorstore(text_chunks: List[str], metadatas: List[Dict[str, Any]] = None) -> MongoDBAtlasVectorSearch:
    embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")

    mongo_client = MongoClient(ATLAS_URI)
    db = mongo_client[MONGODB_DB]
    collection = db[MONGODB_COLLECTION]

    vector_search = MongoDBAtlasVectorSearch(
        collection=collection,
        embedding=embeddings,
        index_name="vector_index",
        text_key="text",
        embedding_key="embedding",
        relevance_score_fn="cosine"
    )

    ids = [vector_search.add_texts([chunk], [metadata] if metadatas else None)[0] 
           for chunk, metadata in zip(text_chunks, metadatas or [None] * len(text_chunks))]
    logging.info(f"Added {len(ids)} embeddings to the vector store")
    return vector_search

Enter fullscreen mode Exit fullscreen mode



This is how vectorized texts appear in MongoDB's GUI:


mdb-embbedings-1


MongoDB Atlas Vector Search organizes text chunks and vectors into ObjectIDs, adhering to the Document Database Model, simplifying integration with larger applications already using this model:
mdb-embbedings-2


The get_conversation_chain function retrieves text from MongoDB, sending it to OpenAI for question answering.:


def get_conversation_chain(vectorstore):
    llm = ChatOpenAI(model_name="gpt-4")
    memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
    conversation_chain = ConversationalRetrievalChain.from_llm(
        llm=llm,
        retriever=vectorstore.as_retriever(),
        memory=memory
    )
    return conversation_chain
Enter fullscreen mode Exit fullscreen mode



The geeky 🤓 Cloud Developer in me was thrilled to see how MongoDB's use of the K-nearest neighbors (KNN) ML algorithm provided accurate answers. - On a side note, as this algorithm requires a lot of compute power from a database, it would be interesting to explore its performance in a production environment with terabytes of data, but that should be a discussion for another blog. 📖 👩🏻‍💻


mdb-embbedings-3

Streamlit Setup and 'Gotchas'

Throughout the application flow, st.session_state manages conversation states, vector retrieval, OpenAI token usage, and chat history clearing. Both session state initialization and page configuration must be done at the beginning of the script to avoid potential errors:



st.set_page_config(page_title="Chat with PDF Manuals", page_icon=":telephone_receiver:")
if 'chat_history' not in st.session_state:
    st.session_state.chat_history = []

Enter fullscreen mode Exit fullscreen mode



In the handle_user_input function, session_state manages interactions, tracks OpenAI token usage, and appends chat history, enabling the 'user' the option to ask follow up questions:


def handle_userinput(user_question):
    if st.session_state.vectorstore is None:
        st.warning("Please upload PDFs first or wait until the database is initialized.")
    else:
        with get_openai_callback() as cb:
            response = st.session_state.conversation.invoke({"question": user_question})
            st.session_state.chat_history.append({"type": "user", "content": user_question})
            st.session_state.chat_history.append({"type": "bot", "content": response["answer"]})
            logging.info(f"\n\tOpenAI Token Usage:\n\t{cb}")
Enter fullscreen mode Exit fullscreen mode



The clear_chat_history function, triggered by a button in the main function, resets the conversation state:


def clear_chat_history():
    logging.info("Clearing chat history")
    st.session_state.chat_history = []
    st.session_state.conversation = None
    st.rerun()
Enter fullscreen mode Exit fullscreen mode



Streamlit's default sidebar in the main function facilitates multiple PDF uploads:


    with st.sidebar:
        st.subheader("Your PDF Manuals")
        uploaded_files = st.file_uploader("Upload your PDFs here and click on 'Process'", accept_multiple_files=True, type=["pdf"])
        if st.button("Process") and uploaded_files:
            with st.spinner("Processing..."):
                raw_text = get_pdf_text(uploaded_files)
                text_chunks = get_text_chunks(raw_text)
                vectorstore = get_vectorstore(text_chunks)
                st.session_state.vectorstore = vectorstore
                st.session_state.conversation = get_conversation_chain(vectorstore)
            st.success("Processing complete.")
Enter fullscreen mode Exit fullscreen mode



While building the UI, I experimented with real-time text extraction display and a progress bar, but these features cluttered the UI. I opted for simplicity, relying on the default st.spinner for processing feedback.



Observability

Understanding application behavior is crucial before deploying to the Cloud. I set up two loggers, all written to a .log file:

  • A standard python logger to observe the application activity
  • A specific MongoDB performance logger out of the pymongo monitoring module.

Application Observability

While processing large PDFs, I monitored the script execution time by measuring the duration from the start to the end of the main function. Two key observations were made:

  • OCR on PDFs larger than 5MB took considerable time on my M1 MacBook Pro, prompting the addition of 'Parallel Processing' through a ThreadPoolExecutor as a way to avoid performance issues in the Cloud.
  • Cloud functions like AWS Lambda or GCP Cloud Functions may not handle this application. Since I don't plan on maintaining a constantly running VM, this observation indicated that an architecture using Serverless Containers — such as AWS ECS with Fargate or GCP Cloud Run — would be the optimal deployment approach. These containers would only run when the application is invoked, offering cost-efficiency with the option to autoscale. More on this in future blogs 📝.

To gauge the cost implications of using OpenAI's foundation model, I tracked API usage using Langchain's get_openai_callback functionality. This made it easier to understand the actual costs associated with each application usage:

openai-token-usage-mdb-logs-screenshot

MongoDB Logs

Coming from a DevOps world 👩🏻‍🏭, and having a passion for understanding databases under the hood, I leveraged this chatbot application to implement pymongo event_loggers. I created a class to aggregate the count of successful and failed operations and their average duration each time the program ran:

# MongoDB Event Listeners
class AggregatedCommandLogger(monitoring.CommandListener):
    def __init__(self):
        self.operation_counts = defaultdict(int)
        self.total_duration = 0
        self.total_operations = 0

    def started(self, event):
        pass  

    def succeeded(self, event):
        self.operation_counts[event.database_name] += 1
        self.total_duration += event.duration_micros
        self.total_operations += 1

    def failed(self, event):
        database_name = event.__dict__.get('database_name', 'unknown')
        logging.info(f"Command failed: operation_id={event.operation_id}, duration_micros={event.duration_micros}, database_name={database_name}")

    def summarize_and_reset(self):
        if self.total_operations > 0:
            avg_duration = self.total_duration / self.total_operations
            summary = f"MongoDB operations summary: {self.total_operations} total operations, "
            summary += f"average duration: {avg_duration:.2f} microseconds. "
            summary += "Operations per database: " + ", ".join(f"{db}: {count}" for db, count in self.operation_counts.items())
            logging.info(summary)

        # Reset counters
        self.operation_counts.clear()
        self.total_duration = 0
        self.total_operations = 0


aggregated_logger = AggregatedCommandLogger()

monitoring.register(aggregated_logger)

def log_mongodb_summary():
    aggregated_logger.summarize_and_reset()
Enter fullscreen mode Exit fullscreen mode



The results aligned with expectations — no errors occurred, and operations between my local machine and MongoDB Atlas were swift and reliable. By building these pymongo.monitoring event_loggers, I preemptively simplified potential troubleshooting in a Cloud infrastructure, while also gaining insights into the appropriate MongoDB database size for real-world use.

Security

All of the environment variables such as the OpenAI API keys, Tesseract CLI location, MongoDB connection string, database and collection name were securely stored in the .env file - I added a sample .env in the Github repository.
For Cloud deployment, these variables will be managed via a Secrets Manager — either AWS or GCP — ensuring consistent security practices across environments.

Conclusion

This application showcases a blend of open-source tools, observability practices, and database management, offering a blueprint for scaling in AWS or GCP. Building it from scratch with a cloud-centric vision helped identify and address potential issues early on. The main challenge was handling different types of PDFs, balancing cost-efficiency, speed, and security.

Future improvements for the 🤖 include:

  • Adding a 'Web URL Input' for users to upload a file or provide a PDF URL
  • Implementing PDF metadata extraction and storing it in a separate MongoDB Atlas Database, allowing users to track previously vectorized PDFs and ask questions about them
  • Introducing a dropdown box in the UI to view available PDF file names

Top comments (0)