import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from sentence_transformers import SentenceTransformer
import os
import datetime
import logging
from collections import deque
from fastapi import FastAPI, BackgroundTasks, HTTPException
from contextlib import asynccontextmanager
from fastapi.responses import HTMLResponse
import threading
# --- Configuration ---
SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")
# --- Toggles & Tuning ---
PROCESSING_CHUNK_SIZE = 32
EMBEDDING_BATCH_SIZE = 32
DRY_RUN = False
# --- Global State ---
model = None
execution_logs = deque(maxlen=50)
is_processing = False
processing_lock = threading.Lock()
# --- Lifespan Manager ---
@asynccontextmanager
async def lifespan(app: FastAPI):
global model
print("⏳ Loading Model...")
model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
print("✅ Model Loaded.")
yield
print("🛑 Shutting down...")
app = FastAPI(lifespan=lifespan)
# --- Helper Functions ---
def fetch_and_lock_chunk(conn, chunk_size):
"""
Fetches candidates from the denormalized table where embeddings are missing.
"""
query = """
SELECT
id,
name,
summary,
work_experience,
projects,
education,
achievements,
certifications,
volunteering,
skills,
languages
FROM public.candidates
WHERE
-- Condition 1: Embedding is missing (New Job)
embeddings IS NULL
OR
-- Condition 2: Job created after the last embedding (Retry/Update Logic)
-- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at
(embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at)
FOR UPDATE SKIP LOCKED
LIMIT %s
"""
# Note: If you add an 'updated_at' column later, change WHERE to:
# WHERE embeddings IS NULL OR updated_at > embeddings_created_at
return pd.read_sql_query(query, conn, params=(chunk_size,))
def clean_and_format_text(row):
"""
Parses the JSONB and Array columns from the new schema to create a
rich text representation for embedding.
"""
text_parts = []
# 1. Basic Info
if row.get('name'):
text_parts.append(f"Name: {row['name']}")
if row.get('summary'):
text_parts.append(f"Summary: {row['summary']}")
# 2. Skills (Postgres Array -> Python List)
if row.get('skills') and isinstance(row['skills'], list):
# Filter out empty strings/None
valid_skills = [s for s in row['skills'] if s]
if valid_skills:
text_parts.append(f"Skills: {', '.join(valid_skills)}")
# 3. Work Experience (JSONB List of Dicts)
# Schema keys: role, company, description, duration
if row.get('work_experience') and isinstance(row['work_experience'], list):
exps = []
for item in row['work_experience']:
if isinstance(item, dict):
role = item.get('role', '')
company = item.get('company', '')
desc = item.get('description', '')
# Format: "Role at Company: Description"
entry = f"{role} at {company}".strip()
if desc:
entry += f": {desc}"
exps.append(entry)
if exps:
text_parts.append("Work Experience:\n" + "\n".join(exps))
# 4. Projects (JSONB List of Dicts)
# Schema keys: title, description, link
if row.get('projects') and isinstance(row['projects'], list):
projs = []
for item in row['projects']:
if isinstance(item, dict):
title = item.get('title', '')
desc = item.get('description', '')
entry = f"{title}".strip()
if desc:
entry += f": {desc}"
projs.append(entry)
if projs:
text_parts.append("Projects:\n" + "\n".join(projs))
# 5. Education (JSONB List of Dicts)
# Schema keys: degree, institution, year
if row.get('education') and isinstance(row['education'], list):
edus = []
for item in row['education']:
if isinstance(item, dict):
degree = item.get('degree', '')
inst = item.get('institution', '')
entry = f"{degree} from {inst}".strip()
edus.append(entry)
if edus:
text_parts.append("Education: " + ", ".join(edus))
# 6. Certifications (JSONB List of Dicts)
# Schema keys: name, issuer
if row.get('certifications') and isinstance(row['certifications'], list):
certs = []
for item in row['certifications']:
if isinstance(item, dict):
name = item.get('name', '')
issuer = item.get('issuer', '')
entry = f"{name} by {issuer}".strip()
certs.append(entry)
if certs:
text_parts.append("Certifications: " + ", ".join(certs))
# 7. Achievements (JSONB List of Dicts)
if row.get('achievements') and isinstance(row['achievements'], list):
achievements = []
for item in row['achievements']:
if isinstance(item, dict):
title = item.get('title', '')
desc = item.get('description', '')
entry = f"{title}: {desc}".strip()
achievements.append(entry)
if achievements:
text_parts.append("Achievements: " + "; ".join(achievements))
return "\n\n".join(text_parts)
def update_db_batch(conn, updates):
if DRY_RUN: return
# Updated to target public.candidates and cast ID to UUID
query = """
UPDATE public.candidates AS c
SET embeddings = data.vector::vector,
embeddings_created_at = NOW()
FROM (VALUES %s) AS data (id, vector)
WHERE c.id = data.id::uuid
"""
cursor = conn.cursor()
try:
execute_values(cursor, query, updates)
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
def run_worker_logic():
"""
The core logic that runs one single batch processing.
"""
log_buffer = []
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_buffer.append(f"BATCH RUN: {timestamp}")
conn = None
try:
conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require')
# 1. Fetch & Lock
df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE)
if df.empty:
conn.rollback()
log_buffer.append("💤 No pending candidates found.")
execution_logs.appendleft("
".join(log_buffer))
return "No data"
log_buffer.append(f"🔒 Locked & Processing {len(df)} candidates...")
# 2. Clean Text
df['full_text'] = df.apply(clean_and_format_text, axis=1)
# 3. Log Inputs (For the Root API view)
for index, row in df.iterrows():
log_buffer.append(f"
{row['full_text']}")
log_buffer.append("Most recent batches shown first.
No logs yet. Hit the /trigger-batch endpoint to start processing.