Sync Backend Setup for Batch Processing
This is batch processing part where it automates syncing Kubernetes documentation by cloning or copying and preparing it for text embeddings.
Make sure to install required libraries and following is requiremnts.txt file.
langchain-openai
langchain_community
langchain_core
fastapi
uvicorn
python-dotenv
scikit-learn
requests
Go to the project directory and install libraries,
(venv)C:/<project-directory>/rag_chatbot>pip install -r requirements.txt
Import the following libraries,
import subprocess
import shutil
from pathlib import Path
import os
import requests
import glob
import time
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings
from dotenv import load_dotenv
import json
import hashlib
Declare constants,
HASH_DB_PATH = Path("hash_files.json")
REPO_URL = os.environ["K8_URL"]
VECTOR_DB_URL = os.environ["VECTOR_DB_URL"]
# Constants
TEMP_DIR = Path(os.path.abspath("./temp-docs"))
TARGET_DIR = Path(os.path.abspath("./k8_docs/en"))
# Batch configuration
EMBEDDING_BATCH_SIZE = 100 # Reduced batch size for embeddings
STORE_BATCH_SIZE = 100 # Batch size for vector store uploads
BATCH_DELAY = 2 # Delay between batches in seconds
RATE_LIMIT_DELAY = 60 # Delay when hitting rate limits in seconds
Start with Git operations,
clone_or_pull_repo()
: Clones the Kubernetes docs repo if not present. Otherwise, pulls the latest changes.
def clone_or_pull_repo():
if not TEMP_DIR.exists():
print("β
Cloning Kubernetes docs repo...")
subprocess.run(["git", "clone", REPO_URL, str(TEMP_DIR)], check=True)
else:
print("β
Pulling latest changes...")
subprocess.run(["git", "-C", str(TEMP_DIR), "pull"], check=True)
copy_docs()
: It copies selected subdirectories from the repo to temporary directorytemp_dir/content/en/docs
. Then copies thetemp_dir/
of.md
files into the target folderk8_docs/
.
def copy_docs():
base_dir = TEMP_DIR / "content" / "en" / "docs"
selected_subdirs = ["concepts"]
for subdir in selected_subdirs:
source_subdir = base_dir / subdir
if not source_subdir.exists():
print(f"β οΈ Source directory does not exist: {source_subdir}")
continue
# Create the target subdirectory
target_subdir = TARGET_DIR / subdir
target_subdir.mkdir(parents=True, exist_ok=True)
# Copy all markdown files with their directory structure
for file in source_subdir.glob("**/*.md"):
relative_path = file.relative_to(source_subdir)
dest_file = target_subdir / relative_path
dest_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(file, dest_file)
print(f"π Copied: {file} -> {dest_file}")
Load markdown files, load_md_files()
reads copied .md
files and stores them in a list.
def load_md_files():
md_files = []
try:
search_path = os.path.join(TARGET_DIR, "concepts", "**", "*.md")
for filepath in glob.glob(search_path, recursive=True):
with open(filepath, 'r', encoding='utf-8') as f:
text = f.read()
md_files.append({
'filename': os.path.basename(filepath),
'content': text,
})
except Exception as e:
print(f"Error reading files: {str(e)}")
return md_files
Text splitting (chunking) the documents,
call_text_splitter()
: Use's LangChain'sRecursiveCharacterTextSplitter
to chunk large text files into smaller pieces and wraps each chunk in aDocument
object with metadata.
def call_text_splitter(md_docs):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=50)
documents = []
try:
for doc in md_docs:
split_texts = text_splitter.split_text(doc['content'])
for i, chunk in enumerate(split_texts):
document = Document(
page_content=chunk,
metadata={
'source': f"{doc['filename']}-{i}"
}
)
documents.append(document)
except Exception as e:
print(f"Error in text splitting: {e}")
raise
Hashing is used to prevent re-processing unchanged files. We don't have to process text embedding for the old files when new files were found in Kubernetes repo.
def get_file_hash(content):
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def load_existing_hashes():
if HASH_DB_PATH.exists():
with open(HASH_DB_PATH, "r") as f:
return json.load(f)
return {}
def save_hashes(hashes):
with open(HASH_DB_PATH, "w") as f:
json.dump(hashes, f, indent=2)
Embedding in batch process,
process_and_store_batch
: It converts each document chunk into embeddings using Azure's OpenAI GPT-4o.
def process_and_store_batch(batch_documents):
contents = []
for doc in batch_documents:
try:
contents.append(doc.page_content)
except Exception as e:
print(f"Error accessing document content: {e}")
continue
if not contents:
return []
try:
embeddings = embedding_model.embed_documents(contents)
payload = []
for doc, embedding in zip(batch_documents, embeddings):
try:
payload.append({
"embedding": embedding,
"metadata": doc.metadata,
"content": doc.page_content
})
except Exception as e:
print(f"Error creating payload item: {e}")
continue
return payload
except Exception as e:
print(f"Error generating embeddings: {e}")
if "429" in str(e):
print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
time.sleep(RATE_LIMIT_DELAY)
return []
store_embeddings_batch(payload_batch)
: Sends the payload to the vector store using a POST request.
def store_embeddings_batch(payload_batch):
try:
response = session.post(f"{VECTOR_DB_URL}/store", json=payload_batch)
response.raise_for_status()
return True
except Exception as e:
if "429" in str(e):
print(f"Rate limit hit, waiting {RATE_LIMIT_DELAY} seconds...")
time.sleep(RATE_LIMIT_DELAY)
return False
You can also add logic for batch data failure, it will use retry strategy and delay logic for rate limits. I have included this logic, you can visit this github link for more code details.
rerun_embeddings()
: This is the main embedding logic which runs the above functions.
if __name__ == "__main__": try:
clone_or_pull_repo()
copy_docs()
rerun_embeddings()
print("β
Successfully stored embeddings... ")
except Exception as e:
print(f"β Error: {e}")
To run the code,
C:/rag_chatbot_k8>source venv/Scripts/activate
(venv)C:/rag_chatbot_k8>cd sync_backend
(venv)C:/rag_chatbot_k8/sync_backend>python index.py