Academy
Updated on
Sep 26, 2025

Financial Report AI Analyst: A Guide for Finance & Operations Leaders

In this blog, we show a simple way to turn long financial pdfs into instant answers and clean summaries—so your team makes faster, safer decisions with less manual work.

Financial Report AI Analyst: A Guide for Finance & Operations Leaders
Ready to ship your own agentic-AI solution in 30 days? Book a free strategy call now.

What This Solves

Reading long reports is slow and risky. Numbers get missed, copy-paste errors slip in, and decisions lag. This system reads any pdf (10 or 1,000+ pages), extracts tables/text/images, and gives your team direct answers and short summaries—with page/table context—so finance and ops move faster and avoid mistakes.




What It Does

We focus on four outcomes that move the topline and protect the bottom line:

  1. Instant Search Across Long Reports
    Instead of paging through a 200-page annual report, you ask a plain-English question—“What’s total OPEX in FY2024?”—and get the number plus the exact page and table it came from. No hunting, no guesswork, no stale spreadsheets.
  1. Reliable Summaries
    Dense sections (MD&A, notes, footnotes) are turned into short, executive-ready briefs. You see the big picture—what changed, why it changed, and what to watch—without losing citations back to the source pages for anyone who wants to double-check.
  1. Structured Data You Can Trust
    Tables and figures are captured cleanly, turned into searchable data, and stored for reuse. That means the same metric doesn’t get re-keyed five different ways across Finance, FP&A, and Strategy—one source, consistent everywhere, with an audit trail.
  1. Scales to Your Document Load
    Whether you process one quarterly filing or hundreds of vendor reports, the workflow stays the same: drop in a PDF, ask questions, get answers with sources. Adding more documents increases coverage, not complexity.



How It Works (No Jargon)

  • OCR + Parsing
    The system “reads” your PDFs—text, tables, and images—and cleans them up so numbers and headings are usable. Think of it as turning a static report into a searchable reference library.
  • Postgres + pgvector
    Everything is stored in a secure database so search is fast and consistent. pgvector adds the ability to find “concepts,” not just exact keywords—useful when different reports label the same metric in different ways.
  • Embeddings + RAG
    When you ask a question, the system first finds the most relevant pages and tables (retrieval), then assembles a short answer (generation) in plain English. Every answer includes links back to the original source so your team can verify in one click.
  • API/CLI
    Analysts can use a simple command-line tool; your apps can use an API. That makes it easy to plug into close workflows, board-pack builders, or internal dashboards without changing how your teams already work.



Quickstart: Notes for the Tech Team

You don’t need heavy infrastructure. With python and postgres installed, setup is straightforward.

1) Create a virtual environment and install dependencies

python -m venv venv
# windows
venv\Scripts\activate
# linux / mac
source venv/bin/activate

pip install -r requirements.txt

2) Create a .env in the project root
include:

  • postgres: PG_HOST, PG_DB, PG_USER, PG_PASSWORD, PORT (if needed)
  • OPENAI_API_KEY (for embeddings/q&a)
  • MISTRAL_OCR_API_KEY (for ocr)

3) Run extraction
place a pdf in pdf_holder/, then run apps/pdf_extract.py. outputs land in output/:

  • extracted_data.json (everything)
  • db_ready_data.json (optimized for db)
  • per-page markdown + extracted images

4) Initialize database and ingest
run db.py to set up postgres + pgvector and tables. then run insert._to_db.py to load chunks, tables, and images (with embeddings).

5) Ask questions
run rag.py and type questions. you’ll get short, sourced answers.




Code 

pdf_extract.py

import os
from dotenv import load_dotenv
from mistralai import Mispral
import base64
import time
import re
import json
from pathlib import Path
from extract_data_to_json import process_pdf_to_json  # Import the new JSON processor


load_dotenv()
client = Mistral(api_key=os.getenv("MISTRAL_API_KEY"))




def clean_ocr_text(text: str) -> str:
   """
   Remove LaTeX-style escapes from OCR text.
   """
   text = text.replace(r'\%', '%')
   text = text.replace(r'\$', '$')
   text = re.sub(r'\$(.*?)\$', r'\1', text)
   return text




class EnhancedOCRProcessor:
   def __init__(self, client):
       self.client = client
   def process_with_retry(self, file_path, max_retries=3, delay=2):
       """Process PDF with retry mechanism"""
       for attempt in range(max_retries):
           try:
               uploaded_file = self.client.files.upload(
                   file={
                       "file_name": f"document_attempt_{attempt}.pdf",
                       "content": open(file_path, "rb")
                   },
                   purpose="ocr"
               )
              
               file_url = self.client.files.get_signed_url(file_id=uploaded_file.id)
              
               # Try different processing parameters
               response = self.client.ocr.process(
                   model="mistral-ocr-latest",
                   document={
                       "type": "document_url",
                       "document_url": file_url.url
                   },
                   include_image_base64=True,
               )
              
               return response
              
           except Exception as e:
               print(f"Attempt {attempt + 1} failed: {e}")
               if attempt < max_retries - 1:
                   time.sleep(delay)
               else:
                   raise


   def process_page_by_page(self, file_path):
       """Process individual pages if full document processing fails"""
       # You'd need to split PDF into individual pages first
     
       pass
  
   def validate_extraction(self, response):
       """Validate and analyze extraction completeness"""
       total_chars = 0
       pages_with_content = 0
       pages_without_content = 0
      
       for i, page in enumerate(response.pages):
           char_count = len(page.markdown.strip())
           total_chars += char_count
          
           if char_count > 10:  # Threshold for meaningful content
               pages_with_content += 1
           else:
               pages_without_content += 1
               print(f"Warning: Page {i+1} has minimal content ({char_count} chars)")
      
       print(f"Total pages: {len(response.pages)}")
       print(f"Pages with content: {pages_with_content}")
       print(f"Pages with minimal content: {pages_without_content}")
       print(f"Total characters extracted: {total_chars}")
      
       return {
           'total_pages': len(response.pages),
           'content_pages': pages_with_content,
           'empty_pages': pages_without_content,
           'total_chars': total_chars
       }






   def enhanced_export(self, response, output_dir="output"):
       """Enhanced export with better organization and validation"""
       output_path = Path(output_dir)
       output_path.mkdir(exist_ok=True)
      
       # Export markdown with page separators
       with open(output_path / 'complete_output.md', 'w', encoding='utf-8') as f:
           for i, page in enumerate(response.pages):
               f.write(f"\n\n--- PAGE {i+1} ---\n\n")
               f.write(clean_ocr_text(page.markdown))
               # f.write(page.markdown)


               if len(page.markdown.strip()) < 10:
                   f.write(f"\n[WARNING: This page has minimal content]\n")
      
       # Export individual pages
       pages_dir = output_path / "pages"
       pages_dir.mkdir(exist_ok=True)
      
       for i, page in enumerate(response.pages):
           with open(pages_dir / f'page_{i+1:03d}.md', 'w', encoding='utf-8') as f:
               f.write(page.markdown)
      
       # Export images
       images_dir = output_path / "images"
       images_dir.mkdir(exist_ok=True)
      
       image_count = 0
       for i, page in enumerate(response.pages):
           for j, image in enumerate(page.images):
               try:
                   parsed_image = self.data_uri_to_bytes(image.image_base64)
                   image_filename = f"page_{i+1:03d}_image_{j+1:03d}.png"
                   with open(images_dir / image_filename, 'wb') as f:
                       f.write(parsed_image)
                   image_count += 1
               except Exception as e:
                   print(f"Failed to export image {j+1} from page {i+1}: {e}")
      
       print(f"Exported {image_count} images to {images_dir}")
      
       print("/n Converting to JSON format...")
       document_data = process_pdf_to_json(response, output_dir)
      
       return output_path, document_data
  
   def data_uri_to_bytes(self, data_uri):
       """Convert data URI to bytes"""
       _, encoded = data_uri.split(',', 1)
       return base64.b64decode(encoded)


# Alternative approach using multiple processing strategies
def multi_strategy_processing(file_path):
   """Try multiple processing strategies to maximize extraction"""
   processor = EnhancedOCRProcessor(client)
  
   strategies = [
       # Strategy 1: Standard processing with retry
       lambda: processor.process_with_retry(file_path),
      
       # Strategy 2: Process with different file names (sometimes helps)
       lambda: processor.process_with_retry(file_path, max_retries=2),
   ]
  
   best_result = None
   best_char_count = 0
  
   for i, strategy in enumerate(strategies):
       try:
           print(f"Trying strategy {i+1}...")
           result = strategy()
          
           # Count total characters to determine best result
           total_chars = sum(len(page.markdown) for page in result.pages)
          
           if total_chars > best_char_count:
               best_result = result
               best_char_count = total_chars
               print(f"Strategy {i+1} extracted {total_chars} characters")
          
       except Exception as e:
           print(f"Strategy {i+1} failed: {e}")
  
   return best_result


# Usage example
def main():
   file_path = "../pdf_holder/test3.pdf"
  
   # Try enhanced processing
   processor = EnhancedOCRProcessor(client)
  
   try:


       print("=== Enhanced Single Processing ===")
       response = processor.process_with_retry(file_path)
       stats = processor.validate_extraction(response)
      
       # Export with validation AND JSON conversion
       output_dir, document_data = processor.enhanced_export(response)
       print(f"Results exported to: {output_dir}")
      


       if stats['empty_pages'] > stats['content_pages'] * 0.2:  # If >20% pages are empty
           print("\n=== Trying Multi-Strategy Processing ===")
           better_response = multi_strategy_processing(file_path)
           if better_response:
               better_stats = processor.validate_extraction(better_response)
               if better_stats['total_chars'] > stats['total_chars']:
                   print("Multi-strategy processing found more content!")
                   output_dir_enhanced, document_data_enhanced = processor.enhanced_export(better_response, "output_enhanced")
      
       if document_data:
           print(f"Document has {len(document_data['pages'])} pages")
          
     
           if document_data['pages']:
               first_page = document_data['pages'][0]
               print(f"Page 1 structure:")
               print(f"  - Paragraphs: {len(first_page['paragraphs'])}")
               print(f"  - Tables: {len(first_page['tables'])}")
               print(f"  - Images: {len(first_page['images'])}")
              
       
               if first_page['paragraphs']:
                   snippet = first_page['paragraphs'][0][:100] + "..." if len(first_page['paragraphs'][0]) > 100 else first_page['paragraphs'][0]
                   print(f"  - First paragraph: '{snippet}'")
      
   except Exception as e:
       print(f"Processing failed: {e}")


if __name__ == "__main__":
   main()



extract_data_to_json.py

import json
import re
import base64
from pathlib import Path
from typing import Dict, List, Any
import hashlib


class PDFDataExtractor:
   def __init__(self):
       self.table_patterns = [
           r'\|.*?\|',  # Markdown table pattern
           r'\+[-=]+\+',  # ASCII table borders
           r'(?:(?:\S+\s*){2,}(?:\n|$)){3,}',  # Columnar data pattern
       ]
      
   def extract_tables_from_text(self, text: str) -> List[Dict]:
       """Extract tables from markdown text"""
       tables = []
       lines = text.split('\n')
      
       # Look for markdown tables
       table_start = None
       current_table_lines = []
      
       for i, line in enumerate(lines):
           # Check if line contains table markers
           if '|' in line and line.strip().startswith('|') and line.strip().endswith('|'):
               if table_start is None:
                   table_start = i
                   current_table_lines = [line]
               else:
                   current_table_lines.append(line)
           else:
               # End of table or no table
               if table_start is not None and len(current_table_lines) >= 2:
                   # Process the table
                   table_data = self.parse_markdown_table(current_table_lines)
                   if table_data:
                       tables.append({
                           'table_id': f'table_{len(tables) + 1}',
                           'type': 'markdown_table',
                           'headers': table_data.get('headers', []),
                           'rows': table_data.get('rows', []),
                           'raw_text': '\n'.join(current_table_lines)
                       })
              
               # Reset table tracking
               table_start = None
               current_table_lines = []
      
       # Handle last table if exists
       if table_start is not None and len(current_table_lines) >= 2:
           table_data = self.parse_markdown_table(current_table_lines)
           if table_data:
               tables.append({
                   'table_id': f'table_{len(tables) + 1}',
                   'type': 'markdown_table',
                   'headers': table_data.get('headers', []),
                   'rows': table_data.get('rows', []),
                   'raw_text': '\n'.join(current_table_lines)
               })
      
       return tables
  
   def parse_markdown_table(self, table_lines: List[str]) -> Dict:
       """Parse markdown table into structured format"""
       if len(table_lines) < 2:
           return {}
      
       # Remove leading/trailing pipes and split
       def clean_row(line):
           return [cell.strip() for cell in line.strip('|').split('|')]
      
       # Parse headers (first line)
       headers = clean_row(table_lines[0])
      
       # Skip separator line (second line with dashes)
       data_rows = []
       for line in table_lines[2:]:  # Skip header and separator
           if line.strip() and '|' in line:
               row = clean_row(line)
               # Ensure row has same number of columns as headers
               while len(row) < len(headers):
                   row.append('')
               data_rows.append(row[:len(headers)])  # Truncate if too many columns
      
       return {
           'headers': headers,
           'rows': data_rows
       }
  
   def extract_paragraphs(self, text: str, tables: List[Dict]) -> List[str]:
       """Extract paragraphs, excluding table content"""
       # Remove table content from text
       clean_text = text
       for table in tables:
           clean_text = clean_text.replace(table['raw_text'], '')
      
       # Split into paragraphs
       paragraphs = []
       for para in clean_text.split('\n\n'):
           para = para.strip()
           if para and len(para) > 10:  # Filter out very short content
               # Clean up the paragraph
               para = re.sub(r'\n+', ' ', para)  # Replace multiple newlines with space
               para = re.sub(r'\s+', ' ', para)  # Replace multiple spaces with single space
               paragraphs.append(para)
      
       return paragraphs
  
   def process_images(self, images: List, page_num: int) -> List[Dict]:
       """Process images from a page"""
       processed_images = []
      
       for i, image in enumerate(images):
           try:
               # Generate image hash for unique identification
               image_data = base64.b64decode(image.image_base64.split(',')[1])
               image_hash = hashlib.md5(image_data).hexdigest()
              
               image_info = {
                   'image_id': f'page_{page_num}_image_{i + 1}',
                   'image_hash': image_hash,
                   'filename': f'page_{page_num:03d}_image_{i + 1:03d}.png',
                   'size_bytes': len(image_data),
                   'base64_data': image.image_base64,  # Keep full base64 for storage
                   'position': {
                       'page': page_num,
                       'image_index': i
                   }
               }
              
               # Add bounding box if available
               if hasattr(image, 'bbox') and image.bbox:
                   image_info['bbox'] = image.bbox
              
               processed_images.append(image_info)
              
           except Exception as e:
               print(f"Error processing image {i + 1} on page {page_num}: {e}")
      
       return processed_images
  
   def extract_page_data(self, page, page_num: int) -> Dict:
       """Extract structured data from a single page"""
       # Get raw text
       raw_text = page.markdown
      
       # Extract tables
       tables = self.extract_tables_from_text(raw_text)
      
       # Extract paragraphs (excluding table content)
       paragraphs = self.extract_paragraphs(raw_text, tables)
      
       # Process images
       images = self.process_images(page.images, page_num)
      
       # Create page data structure
       page_data = {
           'page_number': page_num,
           'paragraphs': paragraphs,
           'tables': tables,
           'images': images,
           'metadata': {
               'paragraph_count': len(paragraphs),
               'table_count': len(tables),
               'image_count': len(images),
               'total_characters': len(raw_text),
               'has_content': len(paragraphs) > 0 or len(tables) > 0 or len(images) > 0
           },
           'raw_markdown': raw_text  # Keep original for reference
       }
      
       return page_data
  
   def extract_full_document(self, response) -> Dict:
       """Extract data from entire document"""
       document_data = {
           'document_metadata': {
               'total_pages': len(response.pages),
               'extraction_timestamp': None,
               'total_paragraphs': 0,
               'total_tables': 0,
               'total_images': 0
           },
           'pages': []
       }
      
       # Process each page
       for i, page in enumerate(response.pages):
           page_num = i + 1
           page_data = self.extract_page_data(page, page_num)
           document_data['pages'].append(page_data)
          
           # Update document metadata
           document_data['document_metadata']['total_paragraphs'] += page_data['metadata']['paragraph_count']
           document_data['document_metadata']['total_tables'] += page_data['metadata']['table_count']
           document_data['document_metadata']['total_images'] += page_data['metadata']['image_count']
      
       # Add timestamp
       from datetime import datetime
       document_data['document_metadata']['extraction_timestamp'] = datetime.now().isoformat()
      
       return document_data
  
   def save_to_json(self, document_data: Dict, output_path: str = "extracted_data.json"):
       """Save extracted data to JSON file"""
       try:
           with open(output_path, 'w', encoding='utf-8') as f:
               json.dump(document_data, f, indent=2, ensure_ascii=False)
           print(f" Data saved to {output_path}")
           return True
       except Exception as e:
           print(f"Error saving JSON: {e}")
           return False
  
   def create_database_ready_json(self, document_data: Dict, output_path: str = "db_ready_data.json"):
       """Create a database-ready version with optimized structure"""
       db_ready_data = []
      
       for page in document_data['pages']:
           page_record = {
               'page_number': page['page_number'],
               'paragraphs': page['paragraphs'],
               'tables': page['tables'],
               'images': [
                   {
                       'image_id': img['image_id'],
                       'filename': img['filename'],
                       'size_bytes': img['size_bytes'],
                       'image_hash': img['image_hash']
                       # Note: base64_data removed for DB efficiency - store separately if needed
                   } for img in page['images']
               ],
               'metadata': page['metadata']
           }
           db_ready_data.append(page_record)
      
       # Save database-ready version
       try:
           with open(output_path, 'w', encoding='utf-8') as f:
               json.dump({
                   'document_metadata': document_data['document_metadata'],
                   'pages': db_ready_data
               }, f, indent=2, ensure_ascii=False)
           print(f" Database-ready data saved to {output_path}")
           return True
       except Exception as e:
           print(f" Error saving database-ready JSON: {e}")
           return False


def process_pdf_to_json(response, output_dir="output"):
   """Main function to process PDF response and create JSON files"""
   extractor = PDFDataExtractor()
  
   # Create output directory
   output_path = Path(output_dir)
   output_path.mkdir(exist_ok=True)
  
   print(" Starting JSON extraction...")
  
   # Extract all data
   document_data = extractor.extract_full_document(response)
  
   # Save complete JSON
   complete_json_path = output_path / "extracted_data.json"
   extractor.save_to_json(document_data, str(complete_json_path))
  
   # Save database-ready JSON
   db_json_path = output_path / "db_ready_data.json"
   extractor.create_database_ready_json(document_data, str(db_json_path))
  
   # Print summary
   print("\n Extraction Summary:")
   print(f"Total Pages: {document_data['document_metadata']['total_pages']}")
   print(f"Total Paragraphs: {document_data['document_metadata']['total_paragraphs']}")
   print(f"Total Tables: {document_data['document_metadata']['total_tables']}")
   print(f"Total Images: {document_data['document_metadata']['total_images']}")
  
   return document_data


# Example usage function
def main():
   # This would be called from your main OCR script
   # Example: process_pdf_to_json(ocr_response)
   pass


if __name__ == "__main__":
   main()




Model.py

from sqlalchemy import (
   Column,
   Integer,
   String,
   Text,
   ForeignKey,
   DateTime,
)
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func
from pgvector.sqlalchemy import Vector




Base = declarative_base()


class Document(Base):


   __tablename__ = 'documents'


   doc_id = Column(Integer, primary_key=True)
   company_name = Column(String(255))
   report_year = Column(Integer)
   file_path = Column(String(255), nullable=False)
   processed_at = Column(DateTime, server_default=func.now())




   chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
   tables = relationship("ExtractedTable", back_populates="document", cascade="all, delete-orphan")
   images = relationship("ExtractedImage", back_populates="document", cascade="all, delete-orphan")


   def __repr__(self):
       return f"<Document(id={self.doc_id}, name='{self.company_name} {self.report_year}')>"


class DocumentChunk(Base):


   __tablename__ = 'document_chunks'


   id = Column(Integer, primary_key=True)
   doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
   page_number = Column(Integer)
   chunk_text = Column(Text)
   embedding = Column(Vector(1536))
   document = relationship("Document", back_populates="chunks")


class ExtractedTable(Base):


   __tablename__ = 'extracted_tables'


   table_id = Column(Integer, primary_key=True)
   doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
   page_number = Column(Integer)
   table_data_json = Column(JSONB)
   table_as_text = Column(Text)
   embedding = Column(Vector(1536))
   document = relationship("Document", back_populates="tables")
class ExtractedImage(Base):


   __tablename__ = 'extracted_images'


   image_id = Column(Integer, primary_key=True)
   doc_id = Column(Integer, ForeignKey('documents.doc_id'), nullable=False)
   page_number = Column(Integer)
   image_filename = Column(String(255))
   image_path = Column(String(500))
   document = relationship("Document", back_populates="images")




insert._to_db.py

import json
import psycopg2
from psycopg2.extras import execute_values
import openai
import os
from dotenv import load_dotenv
import hashlib
from pathlib import Path
import base64
from typing import List, Dict, Any


load_dotenv()


class DocumentInserter:
   def __init__(self):
       # Database configuration
       self.db_config = {
           'host': 'localhost',
           'database': 'report_agent_11',
           'user': 'postgres',
           'password': os.getenv('PG_PASSWORD', 'your_password'),
           'port': 5432
       }
      
       # OpenAI configuration
       self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
       self.embedding_model = "text-embedding-3-small"
      
   def get_embedding(self, text: str) -> List[float]:
       """Get embedding vector for text"""
       try:
           # Clean the text
           text = text.replace("\n", " ").strip()
           if not text:
               return None
              
           response = self.openai_client.embeddings.create(
               model=self.embedding_model,
               input=text
           )
           return response.data[0].embedding
       except Exception as e:
           print(f" Error getting embedding for text: {str(e)[:100]}...")
           return None
  
   def analyze_image_with_vision(self, base64_image: str, surrounding_text: str = "") -> Dict:
       """
       Analyze image with OpenAI Vision API for comprehensive understanding
       """
       try:
           response = self.openai_client.chat.completions.create(
               model="gpt-4.1-mini",
               messages=[
                   {
                       "role": "user",
                       "content": [
                           {
                               "type": "text",
                               "text": f"""Analyze this image in detail and provide:
                               1. Detailed description of what you see
                               2. Any text/numbers you can read (OCR)
                               3. Key data points, trends, or insights
                               4. Type of visual (chart, table, diagram, etc.)
                               5. Context: This image appears near: {surrounding_text[:300]}
                              
                               Be very detailed and specific. Extract ALL visible information.
                              
                               Format as JSON:
                               {{
                                   "detailed_description": "comprehensive description",
                                   "ocr_text": "all text found in image",
                                   "key_insights": "important findings and data",
                                   "visual_type": "chart/table/diagram/photo/etc",
                                   "data_extracted": "specific numbers, percentages, values"
                               }}"""
                           },
                           {
                               "type": "image_url",
                               "image_url": {"url": base64_image}
                           }
                       ]
                   }
               ],
               max_tokens=800
           )
          
           # Try to parse JSON response
           content = response.choices[0].message.content
           try:
               result = json.loads(content)
           except:
               # If JSON parsing fails, create structured response
               result = {
                   "detailed_description": content[:500],
                   "ocr_text": "",
                   "key_insights": content[:300],
                   "visual_type": "image",
                   "data_extracted": ""
               }
          
           return result
          
       except Exception as e:
           print(f" Error analyzing image: {e}")
           return {
               "detailed_description": f"Image from document (analysis failed: {str(e)})",
               "ocr_text": "",
               "key_insights": "",
               "visual_type": "unknown",
               "data_extracted": ""
           }
  
   def insert_document(self, filename: str, company_name: str = None, report_year: int = None) -> int:
       """Insert document record and return doc_id"""
       with psycopg2.connect(**self.db_config) as conn:
           with conn.cursor() as cur:
               cur.execute("""
                   INSERT INTO documents (company_name, report_year, file_path)
                   VALUES (%s, %s, %s) RETURNING doc_id
               """, (company_name, report_year, filename))
              
               doc_id = cur.fetchone()[0]
               conn.commit()
      
       print(f" Document inserted with doc_id: {doc_id}")
       return doc_id
  
   def process_and_insert_chunks(self, doc_id: int, db_ready_data: Dict):
       """
       Process text chunks from db_ready_data.json with advanced chunking strategy
       """
       pages = db_ready_data.get('pages', [])
       total_chunks = 0
      
       print(f" Processing {len(pages)} pages for text chunks...")
      
       for page_num, page_data in enumerate(pages, 1):
           paragraphs = page_data.get('paragraphs', [])
          
           if not paragraphs:
               continue
          
           # Strategy 1: Individual paragraphs (for specific content)
           paragraph_chunks = []
           for i, paragraph in enumerate(paragraphs):
               if len(paragraph.strip()) > 20:  # Skip very short paragraphs
                   embedding = self.get_embedding(paragraph)
                   if embedding:
                       paragraph_chunks.append((
                           doc_id, page_num, paragraph, embedding
                       ))
          
           # Strategy 2: Combined context chunks (for broader understanding)
           if len(paragraphs) > 1:
               # Combine 2-3 paragraphs for context
               for i in range(0, len(paragraphs), 2):
                   combined_text = " ".join(paragraphs[i:i+3])  # Take 2-3 paragraphs
                   if len(combined_text.strip()) > 50:
                       embedding = self.get_embedding(combined_text)
                       if embedding:
                           paragraph_chunks.append((
                               doc_id, page_num, combined_text, embedding
                           ))
          
           # Strategy 3: Full page context (for page-level queries)
           full_page_text = " ".join(paragraphs)
           if len(full_page_text.strip()) > 100:
               # Split into chunks if too long (max ~400 words)
               words = full_page_text.split()
               if len(words) > 400:
                   # Split into overlapping chunks
                   chunk_size = 300
                   overlap = 50
                   for i in range(0, len(words), chunk_size - overlap):
                       chunk_words = words[i:i + chunk_size]
                       chunk_text = " ".join(chunk_words)
                       if len(chunk_text.strip()) > 100:
                           embedding = self.get_embedding(chunk_text)
                           if embedding:
                               paragraph_chunks.append((
                                   doc_id, page_num, chunk_text, embedding
                               ))
               else:
                   embedding = self.get_embedding(full_page_text)
                   if embedding:
                       paragraph_chunks.append((
                           doc_id, page_num, full_page_text, embedding
                       ))
          
           # Bulk insert chunks for this page
           if paragraph_chunks:
               with psycopg2.connect(**self.db_config) as conn:
                   with conn.cursor() as cur:
                       execute_values(cur, """
                           INSERT INTO document_chunks (doc_id, page_number, chunk_text, embedding)
                           VALUES %s
                       """, paragraph_chunks)
                   conn.commit()
              
               total_chunks += len(paragraph_chunks)
               print(f"   Page {page_num}: {len(paragraph_chunks)} chunks inserted")
      
       print(f"🎉 Total text chunks inserted: {total_chunks}")
  
   def process_and_insert_tables(self, doc_id: int, db_ready_data: Dict):
       """
       Process tables with multiple representation strategies
       """
       pages = db_ready_data.get('pages', [])
       total_tables = 0
      
       print(f"Processing tables from {len(pages)} pages...")
      
       for page_num, page_data in enumerate(pages, 1):
           tables = page_data.get('tables', [])
          
           for table_idx, table in enumerate(tables):
               headers = table.get('headers', [])
               rows = table.get('rows', [])
              
               if not headers and not rows:
                   continue
              
               # Strategy 1: Natural language description
               table_description = f"Table from page {page_num} with columns: {', '.join(headers)}. "
              
               # Strategy 2: Row-by-row natural language
               table_text_parts = []
               for row_idx, row in enumerate(rows):
                   row_text = []
                   for col_idx, cell in enumerate(row):
                       if col_idx < len(headers) and cell:
                           row_text.append(f"{headers[col_idx]}: {cell}")
                  
                   if row_text:
                       table_text_parts.append(f"Row {row_idx + 1} - {', '.join(row_text)}")
              
               # Strategy 3: Key-value format
               key_value_text = []
               if headers and rows:
                   for row in rows:
                       for col_idx, cell in enumerate(row):
                           if col_idx < len(headers) and cell:
                               key_value_text.append(f"{headers[col_idx]} is {cell}")
              
               # Combine all strategies
               comprehensive_text = table_description
               if table_text_parts:
                   comprehensive_text += " " + ". ".join(table_text_parts)
               if key_value_text:
                   comprehensive_text += " Additional details: " + ". ".join(key_value_text[:10])  # Limit for length
              
               # Get embedding
               embedding = self.get_embedding(comprehensive_text)
              
               if embedding:
                   with psycopg2.connect(**self.db_config) as conn:
                       with conn.cursor() as cur:
                           cur.execute("""
                               INSERT INTO extracted_tables
                               (doc_id, page_number, table_data_json, table_as_text, embedding)
                               VALUES (%s, %s, %s, %s, %s)
                           """, (
                               doc_id,
                               page_num,
                               json.dumps(table),
                               comprehensive_text,
                               embedding
                           ))
                       conn.commit()
                  
                   total_tables += 1
                   print(f"   Page {page_num}, Table {table_idx + 1}: Inserted with comprehensive text")
      
       print(f"🎉 Total tables inserted: {total_tables}")
  
   def process_and_insert_images(self, doc_id: int, extracted_data: Dict):
       """
       Process images with AI analysis for comprehensive search capability
       """
       pages = extracted_data.get('pages', [])
       total_images = 0
      
       print(f" Processing images with AI analysis...")
      
       for page_num, page_data in enumerate(pages, 1):
           images = page_data.get('images', [])
           paragraphs = page_data.get('paragraphs', [])
          
           # Get surrounding text context
           surrounding_text = " ".join(paragraphs[:3]) if paragraphs else ""
          
           for image in images:
               base64_data = image.get('base64_data', '')
               if not base64_data:
                   continue
              
               print(f"   Analyzing image {image.get('image_id', 'unknown')} from page {page_num}...")
              
               # Analyze image with Vision API
               ai_analysis = self.analyze_image_with_vision(base64_data, surrounding_text)
              
               # Create comprehensive searchable text
               searchable_content = []
              
               # Add all analysis components
               if ai_analysis.get('detailed_description'):
                   searchable_content.append(f"Image description: {ai_analysis['detailed_description']}")
              
               if ai_analysis.get('ocr_text'):
                   searchable_content.append(f"Text in image: {ai_analysis['ocr_text']}")
              
               if ai_analysis.get('key_insights'):
                   searchable_content.append(f"Key insights: {ai_analysis['key_insights']}")
              
               if ai_analysis.get('data_extracted'):
                   searchable_content.append(f"Data found: {ai_analysis['data_extracted']}")
              
               # Add context
               searchable_content.append(f"Page {page_num} context: {surrounding_text[:200]}")
              
               # Combine all searchable content
               full_searchable_text = ". ".join(searchable_content)
              
               # Save image file
               image_filename = image.get('filename', f'page_{page_num:03d}_image_{total_images + 1:03d}.png')
               image_path = f"images/{image_filename}"
              
               # Create images directory if it doesn't exist
               os.makedirs('images', exist_ok=True)
              
               # Save base64 as file
               try:
                   image_bytes = base64.b64decode(base64_data.split(',')[1])
                   with open(image_path, 'wb') as f:
                       f.write(image_bytes)
               except Exception as e:
                   print(f"   Could not save image file: {e}")
                   image_path = ""
              
               # Insert into database
               with psycopg2.connect(**self.db_config) as conn:
                   with conn.cursor() as cur:
                       cur.execute("""
                           INSERT INTO extracted_images
                           (doc_id, page_number, image_filename, image_path)
                           VALUES (%s, %s, %s, %s)
                       """, (doc_id, page_num, image_filename, image_path))
                   conn.commit()
              
               # ALSO insert image analysis as text chunks for searchability
               embedding = self.get_embedding(full_searchable_text)
               if embedding:
                   with psycopg2.connect(**self.db_config) as conn:
                       with conn.cursor() as cur:
                           cur.execute("""
                               INSERT INTO document_chunks (doc_id, page_number, chunk_text, embedding)
                               VALUES (%s, %s, %s, %s)
                           """, (
                               doc_id,
                               page_num,
                               f"[IMAGE CONTENT] {full_searchable_text}",
                               embedding
                           ))
                       conn.commit()
              
               total_images += 1
               print(f"   Image processed and made searchable: {image_filename}")
      
       print(f" Total images processed: {total_images}")
  
   def insert_complete_document(self,
                               filename: str,
                               db_ready_path: str,
                               extracted_data_path: str,
                               company_name: str = None,
                               report_year: int = None):
       """
       Complete document insertion with optimal search capability
       """
       print(f" Starting complete document insertion for: {filename}")
      
       # Load both JSON files
       with open(db_ready_path, 'r', encoding='utf-8') as f:
           db_ready_data = json.load(f)
      
       with open(extracted_data_path, 'r', encoding='utf-8') as f:
           extracted_data = json.load(f)
      
       # Insert document record
       doc_id = self.insert_document(filename, company_name, report_year)
      
       # Process all content types
       print("\n Processing text chunks...")
       self.process_and_insert_chunks(doc_id, db_ready_data)
      
       print("\n Processing tables...")
       self.process_and_insert_tables(doc_id, db_ready_data)
      
       print("\n Processing images...")
       self.process_and_insert_images(doc_id, extracted_data)
      
       print(f"\n Complete document insertion finished for doc_id: {doc_id}")
      
       # Print summary
       with psycopg2.connect(**self.db_config) as conn:
           with conn.cursor() as cur:
               cur.execute("SELECT COUNT(*) FROM document_chunks WHERE doc_id = %s", (doc_id,))
               chunk_count = cur.fetchone()[0]
              
               cur.execute("SELECT COUNT(*) FROM extracted_tables WHERE doc_id = %s", (doc_id,))
               table_count = cur.fetchone()[0]
              
               cur.execute("SELECT COUNT(*) FROM extracted_images WHERE doc_id = %s", (doc_id,))
               image_count = cur.fetchone()[0]
      
       print(f"""
INSERTION SUMMARY:
  Document ID: {doc_id}
  Text Chunks: {chunk_count}
  Tables: {table_count}
  Images: {image_count}
 
Ready for 100% query coverage!
       """)
      
       return doc_id


def main():
   """
   Main execution function
   """
   inserter = DocumentInserter()
  
   # Configuration
   pdf_filename = "../pdf_holder/test3.pdf"  # Change this
   db_ready_json = "output/db_ready_data.json"    # Path to your db_ready JSON
   extracted_json = "output/extracted_data.json"  # Path to your extracted JSON
  
   company_name = "Example Corp"  # Optional
   report_year = 2023            # Optional
  
   try:
       # Insert complete document
       doc_id = inserter.insert_complete_document(
           filename=pdf_filename,
           db_ready_path=db_ready_json,
           extracted_data_path=extracted_json,
           company_name=company_name,
           report_year=report_year
       )
      
       print(f" SUCCESS! Document inserted with ID: {doc_id}")
       print("🔍 Your database is now ready for ANY query type!")
      
   except Exception as e:
       print(f" ERROR: {e}")
       import traceback
       traceback.print_exc()


if __name__ == "__main__":
   main()




rag.py

import psycopg2
import openai
import os
from dotenv import load_dotenv
import json
import re
from typing import List, Dict, Any, Tuple
import numpy as np


load_dotenv()


class EnhancedRAG:
   def __init__(self):
       self.db_config = {
           'host': 'localhost',
           'database': 'report_agent_11',
           'user': 'postgres',
           'password': os.getenv('PG_PASSWORD'),
           'port': 5432
       }
      
       self.openai_client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
      
   def get_embedding(self, text: str) -> List[float]:
       """Get embedding for text with error handling"""
       try:
           if not text or not text.strip():
               return None
          
           # Clean text
           text = text.replace('\n', ' ').strip()
          
           response = self.openai_client.embeddings.create(
               model="text-embedding-3-small",
               input=text
           )
           return response.data[0].embedding
       except Exception as e:
           print(f"Embedding error: {e}")
           return None
  
   def preprocess_query(self, query: str) -> Dict[str, Any]:
       """Analyze and preprocess the query to determine search strategy"""
       query = query.lower().strip()
      
       # Detect query patterns
       patterns = {
           'table_query': any(word in query for word in [
               'table', 'performance', 'measure', 'indicator', 'result', 'target',
               'actual', 'percentage', '%', 'score', 'rate', 'level', 'coverage'
           ]),
           'numerical_query': bool(re.search(r'\d+|number|count|amount|total|sum', query)),
           'comparison_query': any(word in query for word in [
               'compare', 'vs', 'versus', 'difference', 'change', 'increase', 'decrease',
               'better', 'worse', 'higher', 'lower'
           ]),
           'temporal_query': any(word in query for word in [
               '2020', '2021', '2022', '2023', '2024', 'year', 'fy', 'fiscal'
           ]),
           'specific_metric': any(word in query for word in [
               'service', 'accuracy', 'timeliness', 'satisfaction', 'inventory',
               'collection', 'compliance', 'resolution'
           ])
       }
      
       return {
           'original': query,
           'patterns': patterns,
           'is_complex': sum(patterns.values()) >= 2
       }
  
   def expand_query(self, query: str) -> List[str]:
       """Generate query variations for better recall"""
       variations = [query]
      
       # Add synonyms and related terms
       replacements = {
           'performance': ['result', 'outcome', 'achievement', 'metric'],
           'target': ['goal', 'objective', 'aim'],
           'actual': ['result', 'achieved', 'real'],
           'measure': ['metric', 'indicator', 'kpi'],
           'service': ['assistance', 'support', 'help'],
           'accuracy': ['correctness', 'precision'],
           'customer': ['taxpayer', 'caller', 'client']
       }
      
       for original, synonyms in replacements.items():
           if original in query.lower():
               for synonym in synonyms:
                   variations.append(query.lower().replace(original, synonym))
      
       return list(set(variations))[:5]  # Limit to 5 variations
  
   def hybrid_search(self, query: str, limit: int = 10) -> List[Dict]:
       """Enhanced hybrid search combining semantic and keyword matching"""
       query_analysis = self.preprocess_query(query)
       query_variations = self.expand_query(query)
      
       all_results = []
      
       try:
           with psycopg2.connect(**self.db_config) as conn:
               with conn.cursor() as cur:
                   # 1. Semantic search with multiple query variations
                   for q_variant in query_variations:
                       query_embedding = self.get_embedding(q_variant)
                       if not query_embedding:
                           continue
                      
                       embedding_str = '[' + ','.join(map(str, query_embedding)) + ']'
                      
                       # Search text chunks
                       cur.execute("""
                           SELECT chunk_text, page_number, doc_id,
                                  (embedding <-> %s::vector) as distance,
                                  'text' as content_type
                           FROM document_chunks
                           WHERE doc_id IS NOT NULL
                           ORDER BY distance
                           LIMIT %s
                       """, (embedding_str, limit))
                      
                       results = cur.fetchall()
                       for row in results:
                           all_results.append({
                               'content': row[0],
                               'page': row[1],
                               'doc_id': row[2],
                               'distance': row[3],
                               'type': 'text',
                               'score': 1 / (1 + row[3]),  # Convert distance to similarity score
                               'query_variant': q_variant
                           })
                  
                   # 2. Table-focused search (especially important for your data)
                   for q_variant in query_variations:
                       query_embedding = self.get_embedding(q_variant)
                       if not query_embedding:
                           continue
                      
                       embedding_str = '[' + ','.join(map(str, query_embedding)) + ']'
                      
                       cur.execute("""
                           SELECT table_as_text, page_number, doc_id, table_data_json,
                                  (embedding <-> %s::vector) as distance,
                                  'table' as content_type
                           FROM extracted_tables
                           WHERE doc_id IS NOT NULL
                           ORDER BY distance
                           LIMIT %s
                       """, (embedding_str, limit))
                      
                       table_results = cur.fetchall()
                       for row in table_results:
                           # Parse table JSON for structured data
                           table_json = {}
                           try:
                               table_json = json.loads(row[3]) if row[3] else {}
                           except:
                               pass
                          
                           all_results.append({
                               'content': row[0],
                               'page': row[1],
                               'doc_id': row[2],
                               'distance': row[4],
                               'type': 'table',
                               'score': 1 / (1 + row[4]),
                               'table_data': table_json,
                               'query_variant': q_variant
                           })
                  
                   # 3. Keyword-based fallback search
                   keywords = query.lower().split()
                   for keyword in keywords:
                       if len(keyword) > 3:  # Skip short words
                           cur.execute("""
                               SELECT chunk_text, page_number, doc_id,
                                      'keyword_text' as content_type
                               FROM document_chunks
                               WHERE LOWER(chunk_text) LIKE %s
                               LIMIT 5
                           """, (f'%{keyword}%',))
                          
                           keyword_results = cur.fetchall()
                           for row in keyword_results:
                               all_results.append({
                                   'content': row[0],
                                   'page': row[1],
                                   'doc_id': row[2],
                                   'distance': 0.5,  # Fixed distance for keyword matches
                                   'type': 'keyword_text',
                                   'score': 0.7,
                                   'matched_keyword': keyword
                               })
      
       except Exception as e:
           print(f"Search error: {e}")
           return []
      
       # Remove duplicates and rank results
       unique_results = self.deduplicate_and_rank(all_results, query_analysis)
      
       return unique_results[:limit]
  
   def deduplicate_and_rank(self, results: List[Dict], query_analysis: Dict) -> List[Dict]:
       """Remove duplicates and rank results by relevance"""
       seen = set()
       unique_results = []
      
       for result in results:
           # Create a hash based on content and page
           content_hash = hash(result['content'][:100] + str(result['page']))
           if content_hash not in seen:
               seen.add(content_hash)
              
               # Boost score based on query patterns
               boost_factor = 1.0
              
               if query_analysis['patterns']['table_query'] and result['type'] == 'table':
                   boost_factor = 1.5
               elif query_analysis['patterns']['numerical_query'] and any(c.isdigit() or c == '%' for c in result['content']):
                   boost_factor = 1.3
              
               result['final_score'] = result['score'] * boost_factor
               unique_results.append(result)
      
       # Sort by final score (descending)
       return sorted(unique_results, key=lambda x: x['final_score'], reverse=True)
  
   def format_context_for_llm(self, results: List[Dict], query: str) -> str:
       """Format search results into optimal context for LLM"""
       if not results:
           return "No relevant information found."
      
       context_parts = []
       table_count = 0
       text_count = 0
      
       for i, result in enumerate(results):
           if result['type'] == 'table':
               table_count += 1
               # Format table data specially
               table_info = f"\n--- TABLE {table_count} (Page {result['page']}) ---\n"
               table_info += result['content']
              
               # If we have structured table data, add it
               if 'table_data' in result and result['table_data']:
                   table_data = result['table_data']
                   if 'headers' in table_data and 'rows' in table_data:
                       table_info += "\n\nStructured Data:\n"
                       headers = table_data['headers']
                       for row_idx, row in enumerate(table_data['rows']):
                           row_info = []
                           for col_idx, cell in enumerate(row):
                               if col_idx < len(headers) and cell:
                                   row_info.append(f"{headers[col_idx]}: {cell}")
                           if row_info:
                               table_info += f"• {' | '.join(row_info)}\n"
              
               context_parts.append(table_info)
          
           else:
               text_count += 1
               context_parts.append(f"\n--- TEXT {text_count} (Page {result['page']}) ---\n{result['content']}")
      
       return "\n".join(context_parts)
  
   def generate_enhanced_answer(self, query: str, results: List[Dict]) -> str:
       """Generate comprehensive answer with enhanced prompting"""
       if not results:
           return "I couldn't find any relevant information in the documents to answer your question. Please try rephrasing your query or asking about different topics."
      
       context = self.format_context_for_llm(results, query)
       query_analysis = self.preprocess_query(query)
      
       # Determine response style based on query type
       if query_analysis['patterns']['table_query']:
           response_instruction = """Focus on extracting specific data points, numbers, percentages, and performance metrics.
           Present the information in a clear, structured way. If comparing values, highlight the differences clearly."""
      
       elif query_analysis['patterns']['comparison_query']:
           response_instruction = """Compare the relevant data points clearly. Show changes over time,
           highlight improvements or declines, and provide context for the changes."""
      
       else:
           response_instruction = """Provide a comprehensive answer that directly addresses the question.
           Include specific details and explain their significance."""
      
       prompt = f"""You are a document analyst. Answer the question directly and concisely.


QUESTION: {query}


RELEVANT INFORMATION:
{context}


INSTRUCTIONS:
- Give a direct, concise answer (1-3 sentences maximum)
- Include specific numbers/data if relevant
- No extra explanations unless specifically asked
- No formatting like bullet points or headers


ANSWER:"""
      
       try:
           response = self.openai_client.chat.completions.create(
               model="gpt-4o-mini",  # Using more capable model for better accuracy
               messages=[{"role": "user", "content": prompt}],
               max_tokens=800,
               temperature=0.1  # Low temperature for factual accuracy
           )
           return response.choices[0].message.content
       except Exception as e:
           return f"I found relevant information but encountered an error generating the response: {e}"
  
   def ask(self, question: str) -> str:
       """Main function to ask a question and get a direct answer"""
       # Search for relevant documents
       results = self.hybrid_search(question, limit=8)
      
       # Generate answer
       answer = self.generate_enhanced_answer(question, results)
      
       print(f"\n**Question:** {question}")
       print(f"**Answer:** {answer}")
      
       return answer
  
def main():
   """Simple Q&A loop"""
   rag = EnhancedRAG()
  
   print("RAG System Ready. Ask your questions:")
  
   while True:
       question = input("\nQuestion: ").strip()
       if not question:
           continue
      
       try:
           rag.ask(question)
       except Exception as e:
           print(f"Error: {e}")


if __name__ == "__main__":
   main()



Key Benefits: Business Impact

Let’s summarise the possible business impacts of this workflow.

Protecting Revenue

When a report lands, you don’t have hours to hunt for the right line item. This system pulls the exact numbers you ask for and shows where they came from—page and table—so budgets, forecasts, and board decks are built on verified data. Fewer copy-paste mistakes, fewer reworks, and less risk of decisions based on the wrong figure.

Shortening Time-to-Answer

Leaders ask direct questions (“what was net cash from operations in fy2024?”). Instead of a long memo, you get a clear 1–3 sentence reply with the value and its source. That means faster monthly closes, quicker board prep, and quicker responses to auditors and regulators—without spinning up a special analysis every time.

Reducing Manual Drudgery

Analysts shouldn’t spend days skimming 300-page PDFs. The AI does the reading and indexing, so your team can focus on judgment: why a metric moved, what to do next, and how to communicate it. The result is more analysis per person, less overtime, and a happier finance and ops team.

Strengthening Trust

Every answer is traceable. One click takes you back to the exact page or table in the original document. That level of auditability makes internal reviews smoother, speeds up external audits, and builds confidence with leadership that numbers are accurate and defensible.




Where It Pays Off 

The use cases are aplenty, across industries. Whether you are in healthcare or in legal, this applies to you.

Finance

Pulling key figures from annual reports, balance sheets, and invoices in seconds. Building budgets and forecasts on verified numbers with page-level citations, and answering auditor questions with evidence, not email chains.

Legal

Summarizing long contracts and agreements into clear briefs with links back to clauses. Speeding up review cycles, surfacing risky terms early, and cutting down on manual clause comparisons.

Research

Pinpointing and comparing specific sections in research papers or technical reports. Extracting tables, metrics, and conclusions with sources attached—ideal for competitive intelligence and due diligence.

Healthcare

Summarizing patient records, medical reports, and clinical trial documents while preserving traceability to the original text. Reducing review time and supporting compliance workflows that require precise citations.

Corporate Documentation

Centralizing internal reports, policies, and SOPs. Making it easy for teams to find the latest version, understand what changed, and reference the exact paragraph when creating decks or audits.

Github: https://github.com/piyushghughu-superteams-dotcom/pdf_agent




About Superteams.ai

Superteams.ai builds AI workflows for data-heavy teams. We turn long PDFs and messy processes into reliable, explainable systems—fast. If you want this running on your data, let’s talk.

Authors

Want to Scale Your Business with AI Deployed on your Cloud?

Talk to our team and get a complementary agentic AI advisory session.

We use cookies to ensure the best experience on our website. Learn more