In this blog, we show a simple way to turn long financial pdfs into instant answers and clean summaries—so your team makes faster, safer decisions with less manual work.
Reading long reports is slow and risky. Numbers get missed, copy-paste errors slip in, and decisions lag. This system reads any pdf (10 or 1,000+ pages), extracts tables/text/images, and gives your team direct answers and short summaries—with page/table context—so finance and ops move faster and avoid mistakes.
We focus on four outcomes that move the topline and protect the bottom line:
You don’t need heavy infrastructure. With python and postgres installed, setup is straightforward.
1) Create a virtual environment and install dependencies
python -m venv venv
# windows
venv\Scripts\activate
# linux / mac
source venv/bin/activate
pip install -r requirements.txt
2) Create a .env in the project root
include:
3) Run extraction
place a pdf in pdf_holder/, then run apps/pdf_extract.py. outputs land in output/:
4) Initialize database and ingest
run db.py to set up postgres + pgvector and tables. then run insert._to_db.py to load chunks, tables, and images (with embeddings).
5) Ask questions
run rag.py and type questions. you’ll get short, sourced answers.
import os
from dotenv import load_dotenv
from mistralai import Mispral
import base64
import time
import re
import json
from pathlib import Path
from extract_data_to_json import process_pdf_to_json # Import the new JSON processor
load_dotenv()
client = Mistral(api_key=os.getenv("MISTRAL_API_KEY"))
def clean_ocr_text(text: str) -> str:
"""
Remove LaTeX-style escapes from OCR text.
"""
text = text.replace(r'\%', '%')
text = text.replace(r'\$', '$')
text = re.sub(r'\$(.*?)\$', r'\1', text)
return text
class EnhancedOCRProcessor:
def __init__(self, client):
self.client = client
def process_with_retry(self, file_path, max_retries=3, delay=2):
"""Process PDF with retry mechanism"""
for attempt in range(max_retries):
try:
uploaded_file = self.client.files.upload(
file={
"file_name": f"document_attempt_{attempt}.pdf",
"content": open(file_path, "rb")
},
purpose="ocr"
)
file_url = self.client.files.get_signed_url(file_id=uploaded_file.id)
# Try different processing parameters
response = self.client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": file_url.url
},
include_image_base64=True,
)
return response
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(delay)
else:
raise
def process_page_by_page(self, file_path):
"""Process individual pages if full document processing fails"""
# You'd need to split PDF into individual pages first
pass
def validate_extraction(self, response):
"""Validate and analyze extraction completeness"""
total_chars = 0
pages_with_content = 0
pages_without_content = 0
for i, page in enumerate(response.pages):
char_count = len(page.markdown.strip())
total_chars += char_count
if char_count > 10: # Threshold for meaningful content
pages_with_content += 1
else:
pages_without_content += 1
print(f"Warning: Page {i+1} has minimal content ({char_count} chars)")
print(f"Total pages: {len(response.pages)}")
print(f"Pages with content: {pages_with_content}")
print(f"Pages with minimal content: {pages_without_content}")
print(f"Total characters extracted: {total_chars}")
return {
'total_pages': len(response.pages),
'content_pages': pages_with_content,
'empty_pages': pages_without_content,
'total_chars': total_chars
}
def enhanced_export(self, response, output_dir="output"):
"""Enhanced export with better organization and validation"""
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
# Export markdown with page separators
with open(output_path / 'complete_output.md', 'w', encoding='utf-8') as f:
for i, page in enumerate(response.pages):
f.write(f"\n\n--- PAGE {i+1} ---\n\n")
f.write(clean_ocr_text(page.markdown))
# f.write(page.markdown)
if len(page.markdown.strip()) < 10:
f.write(f"\n[WARNING: This page has minimal content]\n")
# Export individual pages
pages_dir = output_path / "pages"
pages_dir.mkdir(exist_ok=True)
for i, page in enumerate(response.pages):
with open(pages_dir / f'page_{i+1:03d}.md', 'w', encoding='utf-8') as f:
f.write(page.markdown)
# Export images
images_dir = output_path / "images"
images_dir.mkdir(exist_ok=True)
image_count = 0
for i, page in enumerate(response.pages):
for j, image in enumerate(page.images):
try:
parsed_image = self.data_uri_to_bytes(image.image_base64)
image_filename = f"page_{i+1:03d}_image_{j+1:03d}.png"
with open(images_dir / image_filename, 'wb') as f:
f.write(parsed_image)
image_count += 1
except Exception as e:
print(f"Failed to export image {j+1} from page {i+1}: {e}")
print(f"Exported {image_count} images to {images_dir}")
print("/n Converting to JSON format...")
document_data = process_pdf_to_json(response, output_dir)
return output_path, document_data
def data_uri_to_bytes(self, data_uri):
"""Convert data URI to bytes"""
_, encoded = data_uri.split(',', 1)
return base64.b64decode(encoded)
# Alternative approach using multiple processing strategies
def multi_strategy_processing(file_path):
"""Try multiple processing strategies to maximize extraction"""
processor = EnhancedOCRProcessor(client)
strategies = [
# Strategy 1: Standard processing with retry
lambda: processor.process_with_retry(file_path),
# Strategy 2: Process with different file names (sometimes helps)
lambda: processor.process_with_retry(file_path, max_retries=2),
]
best_result = None
best_char_count = 0
for i, strategy in enumerate(strategies):
try:
print(f"Trying strategy {i+1}...")
result = strategy()
# Count total characters to determine best result
total_chars = sum(len(page.markdown) for page in result.pages)
if total_chars > best_char_count:
best_result = result
best_char_count = total_chars
print(f"Strategy {i+1} extracted {total_chars} characters")
except Exception as e:
print(f"Strategy {i+1} failed: {e}")
return best_result
# Usage example
def main():
file_path = "../pdf_holder/test3.pdf"
# Try enhanced processing
processor = EnhancedOCRProcessor(client)
try:
print("=== Enhanced Single Processing ===")
response = processor.process_with_retry(file_path)
stats = processor.validate_extraction(response)
# Export with validation AND JSON conversion
output_dir, document_data = processor.enhanced_export(response)
print(f"Results exported to: {output_dir}")
if stats['empty_pages'] > stats['content_pages'] * 0.2: # If >20% pages are empty
print("\n=== Trying Multi-Strategy Processing ===")
better_response = multi_strategy_processing(file_path)
if better_response:
better_stats = processor.validate_extraction(better_response)
if better_stats['total_chars'] > stats['total_chars']:
print("Multi-strategy processing found more content!")
output_dir_enhanced, document_data_enhanced = processor.enhanced_export(better_response, "output_enhanced")
if document_data:
print(f"Document has {len(document_data['pages'])} pages")
if document_data['pages']:
first_page = document_data['pages'][0]
print(f"Page 1 structure:")
print(f" - Paragraphs: {len(first_page['paragraphs'])}")
print(f" - Tables: {len(first_page['tables'])}")
print(f" - Images: {len(first_page['images'])}")
if first_page['paragraphs']:
snippet = first_page['paragraphs'][0][:100] + "..." if len(first_page['paragraphs'][0]) > 100 else first_page['paragraphs'][0]
print(f" - First paragraph: '{snippet}'")
except Exception as e:
print(f"Processing failed: {e}")
if __name__ == "__main__":
main()
import json
import re
import base64
from pathlib import Path
from typing import Dict, List, Any
import hashlib
class PDFDataExtractor:
def __init__(self):
self.table_patterns = [
r'\|.*?\|', # Markdown table pattern
r'\+[-=]+\+', # ASCII table borders
r'(?:(?:\S+\s*){2,}(?:\n|$)){3,}', # Columnar data pattern
]
def extract_tables_from_text(self, text: str) -> List[Dict]:
"""Extract tables from markdown text"""
tables = []
lines = text.split('\n')
# Look for markdown tables
table_start = None
current_table_lines = []
for i, line in enumerate(lines):
# Check if line contains table markers
if '|' in line and line.strip().startswith('|') and line.strip().endswith('|'):
if table_start is None:
table_start = i
current_table_lines = [line]
else:
current_table_lines.append(line)
else:
# End of table or no table
if table_start is not None and len(current_table_lines) >= 2:
# Process the table
table_data = self.parse_markdown_table(current_table_lines)
if table_data:
tables.append({
'table_id': f'table_{len(tables) + 1}',
'type': 'markdown_table',
'headers': table_data.get('headers', []),
'rows': table_data.get('rows', []),
'raw_text': '\n'.join(current_table_lines)
})
# Reset table tracking
table_start = None
current_table_lines = []
# Handle last table if exists
if table_start is not None and len(current_table_lines) >= 2:
table_data = self.parse_markdown_table(current_table_lines)
if table_data:
tables.append({
'table_id': f'table_{len(tables) + 1}',
'type': 'markdown_table',
'headers': table_data.get('headers', []),
'rows': table_data.get('rows', []),
'raw_text': '\n'.join(current_table_lines)
})
return tables
def parse_markdown_table(self, table_lines: List[str]) -> Dict:
"""Parse markdown table into structured format"""
if len(table_lines) < 2:
return {}
# Remove leading/trailing pipes and split
def clean_row(line):
return [cell.strip() for cell in line.strip('|').split('|')]
# Parse headers (first line)
headers = clean_row(table_lines[0])
# Skip separator line (second line with dashes)
data_rows = []
for line in table_lines[2:]: # Skip header and separator
if line.strip() and '|' in line:
row = clean_row(line)
# Ensure row has same number of columns as headers
while len(row) < len(headers):
row.append('')
data_rows.append(row[:len(headers)]) # Truncate if too many columns
return {
'headers': headers,
'rows': data_rows
}
def extract_paragraphs(self, text: str, tables: List[Dict]) -> List[str]:
"""Extract paragraphs, excluding table content"""
# Remove table content from text
clean_text = text
for table in tables:
clean_text = clean_text.replace(table['raw_text'], '')
# Split into paragraphs
paragraphs = []
for para in clean_text.split('\n\n'):
para = para.strip()
if para and len(para) > 10: # Filter out very short content
# Clean up the paragraph
para = re.sub(r'\n+', ' ', para) # Replace multiple newlines with space
para = re.sub(r'\s+', ' ', para) # Replace multiple spaces with single space
paragraphs.append(para)
return paragraphs
def process_images(self, images: List, page_num: int) -> List[Dict]:
"""Process images from a page"""
processed_images = []
for i, image in enumerate(images):
try:
# Generate image hash for unique identification
image_data = base64.b64decode(image.image_base64.split(',')[1])
image_hash = hashlib.md5(image_data).hexdigest()
image_info = {
'image_id': f'page_{page_num}_image_{i + 1}',
'image_hash': image_hash,
'filename': f'page_{page_num:03d}_image_{i + 1:03d}.png',
'size_bytes': len(image_data),
'base64_data': image.image_base64, # Keep full base64 for storage
'position': {
'page': page_num,
'image_index': i
}
}
# Add bounding box if available
if hasattr(image, 'bbox') and image.bbox:
image_info['bbox'] = image.bbox
processed_images.append(image_info)
except Exception as e:
print(f"Error processing image {i + 1} on page {page_num}: {e}")
return processed_images
def extract_page_data(self, page, page_num: int) -> Dict:
"""Extract structured data from a single page"""
# Get raw text
raw_text = page.markdown
# Extract tables
tables = self.extract_tables_from_text(raw_text)
# Extract paragraphs (excluding table content)
paragraphs = self.extract_paragraphs(raw_text, tables)
# Process images
images = self.process_images(page.images, page_num)
# Create page data structure
page_data = {
'page_number': page_num,
'paragraphs': paragraphs,
'tables': tables,
'images': images,
'metadata': {
'paragraph_count': len(paragraphs),
'table_count': len(tables),
'image_count': len(images),
'total_characters': len(raw_text),
'has_content': len(paragraphs) > 0 or len(tables) > 0 or len(images) > 0
},
'raw_markdown': raw_text # Keep original for reference
}
return page_data
def extract_full_document(self, response) -> Dict:
"""Extract data from entire document"""
document_data = {
'document_metadata': {
'total_pages': len(response.pages),
'extraction_timestamp': None,
'total_paragraphs': 0,
'total_tables': 0,
'total_images': 0
},
'pages': []
}
# Process each page
for i, page in enumerate(response.pages):
page_num = i + 1
page_data = self.extract_page_data(page, page_num)
document_data['pages'].append(page_data)
# Update document metadata
document_data['document_metadata']['total_paragraphs'] += page_data['metadata']['paragraph_count']
document_data['document_metadata']['total_tables'] += page_data['metadata']['table_count']
document_data['document_metadata']['total_images'] += page_data['metadata']['image_count']
# Add timestamp
from datetime import datetime
document_data['document_metadata']['extraction_timestamp'] = datetime.now().isoformat()
return document_data
def save_to_json(self, document_data: Dict, output_path: str = "extracted_data.json"):
"""Save extracted data to JSON file"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(document_data, f, indent=2, ensure_ascii=False)
print(f" Data saved to {output_path}")
return True
except Exception as e:
print(f"Error saving JSON: {e}")
return False
def create_database_ready_json(self, document_data: Dict, output_path: str = "db_ready_data.json"):
"""Create a database-ready version with optimized structure"""
db_ready_data = []
for page in document_data['pages']:
page_record = {
'page_number': page['page_number'],
'paragraphs': page['paragraphs'],
'tables': page['tables'],
'images': [
{
'image_id': img['image_id'],
'filename': img['filename'],
'size_bytes': img['size_bytes'],
'image_hash': img['image_hash']
# Note: base64_data removed for DB efficiency - store separately if needed
} for img in page['images']
],
'metadata': page['metadata']
}
db_ready_data.append(page_record)
# Save database-ready version
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump({
'document_metadata': document_data['document_metadata'],
'pages': db_ready_data
}, f, indent=2, ensure_ascii=False)
print(f" Database-ready data saved to {output_path}")
return True
except Exception as e:
print(f" Error saving database-ready JSON: {e}")
return False
def process_pdf_to_json(response, output_dir="output"):
"""Main function to process PDF response and create JSON files"""
extractor = PDFDataExtractor()
# Create output directory
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
print(" Starting JSON extraction...")
# Extract all data
document_data = extractor.extract_full_document(response)
# Save complete JSON
complete_json_path = output_path / "extracted_data.json"
extractor.save_to_json(document_data, str(complete_json_path))
# Save database-ready JSON
db_json_path = output_path / "db_ready_data.json"
extractor.create_database_ready_json(document_data, str(db_json_path))
# Print summary
print("\n Extraction Summary:")
print(f"Total Pages: {document_data['document_metadata']['total_pages']}")
print(f"Total Paragraphs: {document_data['document_metadata']['total_paragraphs']}")
print(f"Total Tables: {document_data['document_metadata']['total_tables']}")
print(f"Total Images: {document_data['document_metadata']['total_images']}")
return document_data
# Example usage function
def main():
# This would be called from your main OCR script
# Example: process_pdf_to_json(ocr_response)
pass
if __name__ == "__main__":
main()
from sqlalchemy import (
Column,
Integer,
String,
Text,
ForeignKey,
DateTime,
)
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func
from pgvector.sqlalchemy import Vector
Base = declarative_base()
class Document(Base):
__tablename__ = 'documents'
doc_id = Column(Integer, primary_key=True)
company_name = Column(String(255))
report_year = Column(Integer)
file_path = Column(String(255), nullable=False)
processed_at = Column(DateTime, server_default=func.now())
chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
tables = relationship("ExtractedTable", back_populates="document", cascade="all, delete-orphan")
images = relationship("ExtractedImage", back_populates="document", cascade="all, delete-orphan")
def __repr__(self):
return f"<Document(id={self.doc_id}, name='{self.company_name} {self.report_year}')>"
class DocumentChunk(Base):
__tablename__ = 'document_chunks'
id = Column(Integer, primary_key=True)
doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
page_number = Column(Integer)
chunk_text = Column(Text)
embedding = Column(Vector(1536))
document = relationship("Document", back_populates="chunks")
class ExtractedTable(Base):
__tablename__ = 'extracted_tables'
table_id = Column(Integer, primary_key=True)
doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
page_number = Column(Integer)
table_data_json = Column(JSONB)
table_as_text = Column(Text)
embedding = Column(Vector(1536))
document = relationship("Document", back_populates="tables")
class ExtractedImage(Base):
__tablename__ = 'extracted_images'
image_id = Column(Integer, primary_key=True)
doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
page_number = Column(Integer)
image_filename = Column(String(255))
image_path = Column(String(500))
document = relationship("Document", back_populates="images")
import json
import psycopg2
from psycopg2.extras import execute_values
import openai
import os
from dotenv import load_dotenv
import hashlib
from pathlib import Path
import base64
from typing import List, Dict, Any
load_dotenv()
class DocumentInserter:
def __init__(self):
# Database configuration
self.db_config = {
'host': 'localhost',
'database': 'report_agent_11',
'user': 'postgres',
'password': os.getenv('PG_PASSWORD', 'your_password'),
'port': 5432
}
# OpenAI configuration
self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.embedding_model = "text-embedding-3-small"
def get_embedding(self, text: str) -> List[float]:
"""Get embedding vector for text"""
try:
# Clean the text
text = text.replace("\n", " ").strip()
if not text:
return None
response = self.openai_client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
except Exception as e:
print(f" Error getting embedding for text: {str(e)[:100]}...")
return None
def analyze_image_with_vision(self, base64_image: str, surrounding_text: str = "") -> Dict:
"""
Analyze image with OpenAI Vision API for comprehensive understanding
"""
try:
response = self.openai_client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": f"""Analyze this image in detail and provide:
1. Detailed description of what you see
2. Any text/numbers you can read (OCR)
3. Key data points, trends, or insights
4. Type of visual (chart, table, diagram, etc.)
5. Context: This image appears near: {surrounding_text[:300]}
Be very detailed and specific. Extract ALL visible information.
Format as JSON:
{{
"detailed_description": "comprehensive description",
"ocr_text": "all text found in image",
"key_insights": "important findings and data",
"visual_type": "chart/table/diagram/photo/etc",
"data_extracted": "specific numbers, percentages, values"
}}"""
},
{
"type": "image_url",
"image_url": {"url": base64_image}
}
]
}
],
max_tokens=800
)
# Try to parse JSON response
content = response.choices[0].message.content
try:
result = json.loads(content)
except:
# If JSON parsing fails, create structured response
result = {
"detailed_description": content[:500],
"ocr_text": "",
"key_insights": content[:300],
"visual_type": "image",
"data_extracted": ""
}
return result
except Exception as e:
print(f" Error analyzing image: {e}")
return {
"detailed_description": f"Image from document (analysis failed: {str(e)})",
"ocr_text": "",
"key_insights": "",
"visual_type": "unknown",
"data_extracted": ""
}
def insert_document(self, filename: str, company_name: str = None, report_year: int = None) -> int:
"""Insert document record and return doc_id"""
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO documents (company_name, report_year, file_path)
VALUES (%s, %s, %s) RETURNING doc_id
""", (company_name, report_year, filename))
doc_id = cur.fetchone()[0]
conn.commit()
print(f" Document inserted with doc_id: {doc_id}")
return doc_id
def process_and_insert_chunks(self, doc_id: int, db_ready_data: Dict):
"""
Process text chunks from db_ready_data.json with advanced chunking strategy
"""
pages = db_ready_data.get('pages', [])
total_chunks = 0
print(f" Processing {len(pages)} pages for text chunks...")
for page_num, page_data in enumerate(pages, 1):
paragraphs = page_data.get('paragraphs', [])
if not paragraphs:
continue
# Strategy 1: Individual paragraphs (for specific content)
paragraph_chunks = []
for i, paragraph in enumerate(paragraphs):
if len(paragraph.strip()) > 20: # Skip very short paragraphs
embedding = self.get_embedding(paragraph)
if embedding:
paragraph_chunks.append((
doc_id, page_num, paragraph, embedding
))
# Strategy 2: Combined context chunks (for broader understanding)
if len(paragraphs) > 1:
# Combine 2-3 paragraphs for context
for i in range(0, len(paragraphs), 2):
combined_text = " ".join(paragraphs[i:i+3]) # Take 2-3 paragraphs
if len(combined_text.strip()) > 50:
embedding = self.get_embedding(combined_text)
if embedding:
paragraph_chunks.append((
doc_id, page_num, combined_text, embedding
))
# Strategy 3: Full page context (for page-level queries)
full_page_text = " ".join(paragraphs)
if len(full_page_text.strip()) > 100:
# Split into chunks if too long (max ~400 words)
words = full_page_text.split()
if len(words) > 400:
# Split into overlapping chunks
chunk_size = 300
overlap = 50
for i in range(0, len(words), chunk_size - overlap):
chunk_words = words[i:i + chunk_size]
chunk_text = " ".join(chunk_words)
if len(chunk_text.strip()) > 100:
embedding = self.get_embedding(chunk_text)
if embedding:
paragraph_chunks.append((
doc_id, page_num, chunk_text, embedding
))
else:
embedding = self.get_embedding(full_page_text)
if embedding:
paragraph_chunks.append((
doc_id, page_num, full_page_text, embedding
))
# Bulk insert chunks for this page
if paragraph_chunks:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO document_chunks (doc_id, page_number, chunk_text, embedding)
VALUES %s
""", paragraph_chunks)
conn.commit()
total_chunks += len(paragraph_chunks)
print(f" Page {page_num}: {len(paragraph_chunks)} chunks inserted")
print(f"🎉 Total text chunks inserted: {total_chunks}")
def process_and_insert_tables(self, doc_id: int, db_ready_data: Dict):
"""
Process tables with multiple representation strategies
"""
pages = db_ready_data.get('pages', [])
total_tables = 0
print(f"Processing tables from {len(pages)} pages...")
for page_num, page_data in enumerate(pages, 1):
tables = page_data.get('tables', [])
for table_idx, table in enumerate(tables):
headers = table.get('headers', [])
rows = table.get('rows', [])
if not headers and not rows:
continue
# Strategy 1: Natural language description
table_description = f"Table from page {page_num} with columns: {', '.join(headers)}. "
# Strategy 2: Row-by-row natural language
table_text_parts = []
for row_idx, row in enumerate(rows):
row_text = []
for col_idx, cell in enumerate(row):
if col_idx < len(headers) and cell:
row_text.append(f"{headers[col_idx]}: {cell}")
if row_text:
table_text_parts.append(f"Row {row_idx + 1} - {', '.join(row_text)}")
# Strategy 3: Key-value format
key_value_text = []
if headers and rows:
for row in rows:
for col_idx, cell in enumerate(row):
if col_idx < len(headers) and cell:
key_value_text.append(f"{headers[col_idx]} is {cell}")
# Combine all strategies
comprehensive_text = table_description
if table_text_parts:
comprehensive_text += " " + ". ".join(table_text_parts)
if key_value_text:
comprehensive_text += " Additional details: " + ". ".join(key_value_text[:10]) # Limit for length
# Get embedding
embedding = self.get_embedding(comprehensive_text)
if embedding:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO extracted_tables
(doc_id, page_number, table_data_json, table_as_text, embedding)
VALUES (%s, %s, %s, %s, %s)
""", (
doc_id,
page_num,
json.dumps(table),
comprehensive_text,
embedding
))
conn.commit()
total_tables += 1
print(f" Page {page_num}, Table {table_idx + 1}: Inserted with comprehensive text")
print(f"🎉 Total tables inserted: {total_tables}")
def process_and_insert_images(self, doc_id: int, extracted_data: Dict):
"""
Process images with AI analysis for comprehensive search capability
"""
pages = extracted_data.get('pages', [])
total_images = 0
print(f" Processing images with AI analysis...")
for page_num, page_data in enumerate(pages, 1):
images = page_data.get('images', [])
paragraphs = page_data.get('paragraphs', [])
# Get surrounding text context
surrounding_text = " ".join(paragraphs[:3]) if paragraphs else ""
for image in images:
base64_data = image.get('base64_data', '')
if not base64_data:
continue
print(f" Analyzing image {image.get('image_id', 'unknown')} from page {page_num}...")
# Analyze image with Vision API
ai_analysis = self.analyze_image_with_vision(base64_data, surrounding_text)
# Create comprehensive searchable text
searchable_content = []
# Add all analysis components
if ai_analysis.get('detailed_description'):
searchable_content.append(f"Image description: {ai_analysis['detailed_description']}")
if ai_analysis.get('ocr_text'):
searchable_content.append(f"Text in image: {ai_analysis['ocr_text']}")
if ai_analysis.get('key_insights'):
searchable_content.append(f"Key insights: {ai_analysis['key_insights']}")
if ai_analysis.get('data_extracted'):
searchable_content.append(f"Data found: {ai_analysis['data_extracted']}")
# Add context
searchable_content.append(f"Page {page_num} context: {surrounding_text[:200]}")
# Combine all searchable content
full_searchable_text = ". ".join(searchable_content)
# Save image file
image_filename = image.get('filename', f'page_{page_num:03d}_image_{total_images + 1:03d}.png')
image_path = f"images/{image_filename}"
# Create images directory if it doesn't exist
os.makedirs('images', exist_ok=True)
# Save base64 as file
try:
image_bytes = base64.b64decode(base64_data.split(',')[1])
with open(image_path, 'wb') as f:
f.write(image_bytes)
except Exception as e:
print(f" Could not save image file: {e}")
image_path = ""
# Insert into database
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO extracted_images
(doc_id, page_number, image_filename, image_path)
VALUES (%s, %s, %s, %s)
""", (doc_id, page_num, image_filename, image_path))
conn.commit()
# ALSO insert image analysis as text chunks for searchability
embedding = self.get_embedding(full_searchable_text)
if embedding:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO document_chunks (doc_id, page_number, chunk_text, embedding)
VALUES (%s, %s, %s, %s)
""", (
doc_id,
page_num,
f"[IMAGE CONTENT] {full_searchable_text}",
embedding
))
conn.commit()
total_images += 1
print(f" Image processed and made searchable: {image_filename}")
print(f" Total images processed: {total_images}")
def insert_complete_document(self,
filename: str,
db_ready_path: str,
extracted_data_path: str,
company_name: str = None,
report_year: int = None):
"""
Complete document insertion with optimal search capability
"""
print(f" Starting complete document insertion for: {filename}")
# Load both JSON files
with open(db_ready_path, 'r', encoding='utf-8') as f:
db_ready_data = json.load(f)
with open(extracted_data_path, 'r', encoding='utf-8') as f:
extracted_data = json.load(f)
# Insert document record
doc_id = self.insert_document(filename, company_name, report_year)
# Process all content types
print("\n Processing text chunks...")
self.process_and_insert_chunks(doc_id, db_ready_data)
print("\n Processing tables...")
self.process_and_insert_tables(doc_id, db_ready_data)
print("\n Processing images...")
self.process_and_insert_images(doc_id, extracted_data)
print(f"\n Complete document insertion finished for doc_id: {doc_id}")
# Print summary
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM document_chunks WHERE doc_id = %s", (doc_id,))
chunk_count = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM extracted_tables WHERE doc_id = %s", (doc_id,))
table_count = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM extracted_images WHERE doc_id = %s", (doc_id,))
image_count = cur.fetchone()[0]
print(f"""
INSERTION SUMMARY:
Document ID: {doc_id}
Text Chunks: {chunk_count}
Tables: {table_count}
Images: {image_count}
Ready for 100% query coverage!
""")
return doc_id
def main():
"""
Main execution function
"""
inserter = DocumentInserter()
# Configuration
pdf_filename = "../pdf_holder/test3.pdf" # Change this
db_ready_json = "output/db_ready_data.json" # Path to your db_ready JSON
extracted_json = "output/extracted_data.json" # Path to your extracted JSON
company_name = "Example Corp" # Optional
report_year = 2023 # Optional
try:
# Insert complete document
doc_id = inserter.insert_complete_document(
filename=pdf_filename,
db_ready_path=db_ready_json,
extracted_data_path=extracted_json,
company_name=company_name,
report_year=report_year
)
print(f" SUCCESS! Document inserted with ID: {doc_id}")
print("🔍 Your database is now ready for ANY query type!")
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
import psycopg2
import openai
import os
from dotenv import load_dotenv
import json
import re
from typing import List, Dict, Any, Tuple
import numpy as np
load_dotenv()
class EnhancedRAG:
def __init__(self):
self.db_config = {
'host': 'localhost',
'database': 'report_agent_11',
'user': 'postgres',
'password': os.getenv('PG_PASSWORD'),
'port': 5432
}
self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def get_embedding(self, text: str) -> List[float]:
"""Get embedding for text with error handling"""
try:
if not text or not text.strip():
return None
# Clean text
text = text.replace('\n', ' ').strip()
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
except Exception as e:
print(f"Embedding error: {e}")
return None
def preprocess_query(self, query: str) -> Dict[str, Any]:
"""Analyze and preprocess the query to determine search strategy"""
query = query.lower().strip()
# Detect query patterns
patterns = {
'table_query': any(word in query for word in [
'table', 'performance', 'measure', 'indicator', 'result', 'target',
'actual', 'percentage', '%', 'score', 'rate', 'level', 'coverage'
]),
'numerical_query': bool(re.search(r'\d+|number|count|amount|total|sum', query)),
'comparison_query': any(word in query for word in [
'compare', 'vs', 'versus', 'difference', 'change', 'increase', 'decrease',
'better', 'worse', 'higher', 'lower'
]),
'temporal_query': any(word in query for word in [
'2020', '2021', '2022', '2023', '2024', 'year', 'fy', 'fiscal'
]),
'specific_metric': any(word in query for word in [
'service', 'accuracy', 'timeliness', 'satisfaction', 'inventory',
'collection', 'compliance', 'resolution'
])
}
return {
'original': query,
'patterns': patterns,
'is_complex': sum(patterns.values()) >= 2
}
def expand_query(self, query: str) -> List[str]:
"""Generate query variations for better recall"""
variations = [query]
# Add synonyms and related terms
replacements = {
'performance': ['result', 'outcome', 'achievement', 'metric'],
'target': ['goal', 'objective', 'aim'],
'actual': ['result', 'achieved', 'real'],
'measure': ['metric', 'indicator', 'kpi'],
'service': ['assistance', 'support', 'help'],
'accuracy': ['correctness', 'precision'],
'customer': ['taxpayer', 'caller', 'client']
}
for original, synonyms in replacements.items():
if original in query.lower():
for synonym in synonyms:
variations.append(query.lower().replace(original, synonym))
return list(set(variations))[:5] # Limit to 5 variations
def hybrid_search(self, query: str, limit: int = 10) -> List[Dict]:
"""Enhanced hybrid search combining semantic and keyword matching"""
query_analysis = self.preprocess_query(query)
query_variations = self.expand_query(query)
all_results = []
try:
with psycopg2.connect(**self.db_config) as conn:
with conn.cursor() as cur:
# 1. Semantic search with multiple query variations
for q_variant in query_variations:
query_embedding = self.get_embedding(q_variant)
if not query_embedding:
continue
embedding_str = '[' + ','.join(map(str, query_embedding)) + ']'
# Search text chunks
cur.execute("""
SELECT chunk_text, page_number, doc_id,
(embedding <-> %s::vector) as distance,
'text' as content_type
FROM document_chunks
WHERE doc_id IS NOT NULL
ORDER BY distance
LIMIT %s
""", (embedding_str, limit))
results = cur.fetchall()
for row in results:
all_results.append({
'content': row[0],
'page': row[1],
'doc_id': row[2],
'distance': row[3],
'type': 'text',
'score': 1 / (1 + row[3]), # Convert distance to similarity score
'query_variant': q_variant
})
# 2. Table-focused search (especially important for your data)
for q_variant in query_variations:
query_embedding = self.get_embedding(q_variant)
if not query_embedding:
continue
embedding_str = '[' + ','.join(map(str, query_embedding)) + ']'
cur.execute("""
SELECT table_as_text, page_number, doc_id, table_data_json,
(embedding <-> %s::vector) as distance,
'table' as content_type
FROM extracted_tables
WHERE doc_id IS NOT NULL
ORDER BY distance
LIMIT %s
""", (embedding_str, limit))
table_results = cur.fetchall()
for row in table_results:
# Parse table JSON for structured data
table_json = {}
try:
table_json = json.loads(row[3]) if row[3] else {}
except:
pass
all_results.append({
'content': row[0],
'page': row[1],
'doc_id': row[2],
'distance': row[4],
'type': 'table',
'score': 1 / (1 + row[4]),
'table_data': table_json,
'query_variant': q_variant
})
# 3. Keyword-based fallback search
keywords = query.lower().split()
for keyword in keywords:
if len(keyword) > 3: # Skip short words
cur.execute("""
SELECT chunk_text, page_number, doc_id,
'keyword_text' as content_type
FROM document_chunks
WHERE LOWER(chunk_text) LIKE %s
LIMIT 5
""", (f'%{keyword}%',))
keyword_results = cur.fetchall()
for row in keyword_results:
all_results.append({
'content': row[0],
'page': row[1],
'doc_id': row[2],
'distance': 0.5, # Fixed distance for keyword matches
'type': 'keyword_text',
'score': 0.7,
'matched_keyword': keyword
})
except Exception as e:
print(f"Search error: {e}")
return []
# Remove duplicates and rank results
unique_results = self.deduplicate_and_rank(all_results, query_analysis)
return unique_results[:limit]
def deduplicate_and_rank(self, results: List[Dict], query_analysis: Dict) -> List[Dict]:
"""Remove duplicates and rank results by relevance"""
seen = set()
unique_results = []
for result in results:
# Create a hash based on content and page
content_hash = hash(result['content'][:100] + str(result['page']))
if content_hash not in seen:
seen.add(content_hash)
# Boost score based on query patterns
boost_factor = 1.0
if query_analysis['patterns']['table_query'] and result['type'] == 'table':
boost_factor = 1.5
elif query_analysis['patterns']['numerical_query'] and any(c.isdigit() or c == '%' for c in result['content']):
boost_factor = 1.3
result['final_score'] = result['score'] * boost_factor
unique_results.append(result)
# Sort by final score (descending)
return sorted(unique_results, key=lambda x: x['final_score'], reverse=True)
def format_context_for_llm(self, results: List[Dict], query: str) -> str:
"""Format search results into optimal context for LLM"""
if not results:
return "No relevant information found."
context_parts = []
table_count = 0
text_count = 0
for i, result in enumerate(results):
if result['type'] == 'table':
table_count += 1
# Format table data specially
table_info = f"\n--- TABLE {table_count} (Page {result['page']}) ---\n"
table_info += result['content']
# If we have structured table data, add it
if 'table_data' in result and result['table_data']:
table_data = result['table_data']
if 'headers' in table_data and 'rows' in table_data:
table_info += "\n\nStructured Data:\n"
headers = table_data['headers']
for row_idx, row in enumerate(table_data['rows']):
row_info = []
for col_idx, cell in enumerate(row):
if col_idx < len(headers) and cell:
row_info.append(f"{headers[col_idx]}: {cell}")
if row_info:
table_info += f"• {' | '.join(row_info)}\n"
context_parts.append(table_info)
else:
text_count += 1
context_parts.append(f"\n--- TEXT {text_count} (Page {result['page']}) ---\n{result['content']}")
return "\n".join(context_parts)
def generate_enhanced_answer(self, query: str, results: List[Dict]) -> str:
"""Generate comprehensive answer with enhanced prompting"""
if not results:
return "I couldn't find any relevant information in the documents to answer your question. Please try rephrasing your query or asking about different topics."
context = self.format_context_for_llm(results, query)
query_analysis = self.preprocess_query(query)
# Determine response style based on query type
if query_analysis['patterns']['table_query']:
response_instruction = """Focus on extracting specific data points, numbers, percentages, and performance metrics.
Present the information in a clear, structured way. If comparing values, highlight the differences clearly."""
elif query_analysis['patterns']['comparison_query']:
response_instruction = """Compare the relevant data points clearly. Show changes over time,
highlight improvements or declines, and provide context for the changes."""
else:
response_instruction = """Provide a comprehensive answer that directly addresses the question.
Include specific details and explain their significance."""
prompt = f"""You are a document analyst. Answer the question directly and concisely.
QUESTION: {query}
RELEVANT INFORMATION:
{context}
INSTRUCTIONS:
- Give a direct, concise answer (1-3 sentences maximum)
- Include specific numbers/data if relevant
- No extra explanations unless specifically asked
- No formatting like bullet points or headers
ANSWER:"""
try:
response = self.openai_client.chat.completions.create(
model="gpt-4o-mini", # Using more capable model for better accuracy
messages=[{"role": "user", "content": prompt}],
max_tokens=800,
temperature=0.1 # Low temperature for factual accuracy
)
return response.choices[0].message.content
except Exception as e:
return f"I found relevant information but encountered an error generating the response: {e}"
def ask(self, question: str) -> str:
"""Main function to ask a question and get a direct answer"""
# Search for relevant documents
results = self.hybrid_search(question, limit=8)
# Generate answer
answer = self.generate_enhanced_answer(question, results)
print(f"\n**Question:** {question}")
print(f"**Answer:** {answer}")
return answer
def main():
"""Simple Q&A loop"""
rag = EnhancedRAG()
print("RAG System Ready. Ask your questions:")
while True:
question = input("\nQuestion: ").strip()
if not question:
continue
try:
rag.ask(question)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Let’s summarise the possible business impacts of this workflow.
Protecting Revenue
When a report lands, you don’t have hours to hunt for the right line item. This system pulls the exact numbers you ask for and shows where they came from—page and table—so budgets, forecasts, and board decks are built on verified data. Fewer copy-paste mistakes, fewer reworks, and less risk of decisions based on the wrong figure.
Shortening Time-to-Answer
Leaders ask direct questions (“what was net cash from operations in fy2024?”). Instead of a long memo, you get a clear 1–3 sentence reply with the value and its source. That means faster monthly closes, quicker board prep, and quicker responses to auditors and regulators—without spinning up a special analysis every time.
Reducing Manual Drudgery
Analysts shouldn’t spend days skimming 300-page PDFs. The AI does the reading and indexing, so your team can focus on judgment: why a metric moved, what to do next, and how to communicate it. The result is more analysis per person, less overtime, and a happier finance and ops team.
Strengthening Trust
Every answer is traceable. One click takes you back to the exact page or table in the original document. That level of auditability makes internal reviews smoother, speeds up external audits, and builds confidence with leadership that numbers are accurate and defensible.
The use cases are aplenty, across industries. Whether you are in healthcare or in legal, this applies to you.
Finance
Pulling key figures from annual reports, balance sheets, and invoices in seconds. Building budgets and forecasts on verified numbers with page-level citations, and answering auditor questions with evidence, not email chains.
Legal
Summarizing long contracts and agreements into clear briefs with links back to clauses. Speeding up review cycles, surfacing risky terms early, and cutting down on manual clause comparisons.
Research
Pinpointing and comparing specific sections in research papers or technical reports. Extracting tables, metrics, and conclusions with sources attached—ideal for competitive intelligence and due diligence.
Healthcare
Summarizing patient records, medical reports, and clinical trial documents while preserving traceability to the original text. Reducing review time and supporting compliance workflows that require precise citations.
Corporate Documentation
Centralizing internal reports, policies, and SOPs. Making it easy for teams to find the latest version, understand what changed, and reference the exact paragraph when creating decks or audits.
Github: https://github.com/piyushghughu-superteams-dotcom/pdf_agent
Superteams.ai builds AI workflows for data-heavy teams. We turn long PDFs and messy processes into reliable, explainable systems—fast. If you want this running on your data, let’s talk.