import psycopg2 from psycopg2.extras import execute_values import pandas as pd from sentence_transformers import SentenceTransformer import os import datetime import logging from collections import deque from fastapi import FastAPI, BackgroundTasks, HTTPException from contextlib import asynccontextmanager from fastapi.responses import HTMLResponse import threading # --- Configuration --- SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING") # --- Toggles & Tuning --- PROCESSING_CHUNK_SIZE = 32 EMBEDDING_BATCH_SIZE = 32 DRY_RUN = False # --- Global State --- model = None execution_logs = deque(maxlen=50) is_processing = False processing_lock = threading.Lock() # --- Lifespan Manager --- @asynccontextmanager async def lifespan(app: FastAPI): global model print("⏳ Loading Model...") model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True) print("✅ Model Loaded.") yield print("🛑 Shutting down...") app = FastAPI(lifespan=lifespan) # --- Helper Functions --- def fetch_and_lock_chunk(conn, chunk_size): """ Fetches candidates from the denormalized table where embeddings are missing. """ query = """ SELECT id, name, summary, work_experience, projects, education, achievements, certifications, volunteering, skills, languages FROM public.candidates WHERE -- Condition 1: Embedding is missing (New Job) embeddings IS NULL OR -- Condition 2: Job created after the last embedding (Retry/Update Logic) -- Note: Since there is no 'updated_at' column, we rely on created_at vs embeddings_created_at (embeddings_created_at IS NOT NULL AND created_at > embeddings_created_at) FOR UPDATE SKIP LOCKED LIMIT %s """ # Note: If you add an 'updated_at' column later, change WHERE to: # WHERE embeddings IS NULL OR updated_at > embeddings_created_at return pd.read_sql_query(query, conn, params=(chunk_size,)) def clean_and_format_text(row): """ Parses the JSONB and Array columns from the new schema to create a rich text representation for embedding. """ text_parts = [] # 1. Basic Info if row.get('name'): text_parts.append(f"Name: {row['name']}") if row.get('summary'): text_parts.append(f"Summary: {row['summary']}") # 2. Skills (Postgres Array -> Python List) if row.get('skills') and isinstance(row['skills'], list): # Filter out empty strings/None valid_skills = [s for s in row['skills'] if s] if valid_skills: text_parts.append(f"Skills: {', '.join(valid_skills)}") # 3. Work Experience (JSONB List of Dicts) # Schema keys: role, company, description, duration if row.get('work_experience') and isinstance(row['work_experience'], list): exps = [] for item in row['work_experience']: if isinstance(item, dict): role = item.get('role', '') company = item.get('company', '') desc = item.get('description', '') # Format: "Role at Company: Description" entry = f"{role} at {company}".strip() if desc: entry += f": {desc}" exps.append(entry) if exps: text_parts.append("Work Experience:\n" + "\n".join(exps)) # 4. Projects (JSONB List of Dicts) # Schema keys: title, description, link if row.get('projects') and isinstance(row['projects'], list): projs = [] for item in row['projects']: if isinstance(item, dict): title = item.get('title', '') desc = item.get('description', '') entry = f"{title}".strip() if desc: entry += f": {desc}" projs.append(entry) if projs: text_parts.append("Projects:\n" + "\n".join(projs)) # 5. Education (JSONB List of Dicts) # Schema keys: degree, institution, year if row.get('education') and isinstance(row['education'], list): edus = [] for item in row['education']: if isinstance(item, dict): degree = item.get('degree', '') inst = item.get('institution', '') entry = f"{degree} from {inst}".strip() edus.append(entry) if edus: text_parts.append("Education: " + ", ".join(edus)) # 6. Certifications (JSONB List of Dicts) # Schema keys: name, issuer if row.get('certifications') and isinstance(row['certifications'], list): certs = [] for item in row['certifications']: if isinstance(item, dict): name = item.get('name', '') issuer = item.get('issuer', '') entry = f"{name} by {issuer}".strip() certs.append(entry) if certs: text_parts.append("Certifications: " + ", ".join(certs)) # 7. Achievements (JSONB List of Dicts) if row.get('achievements') and isinstance(row['achievements'], list): achievements = [] for item in row['achievements']: if isinstance(item, dict): title = item.get('title', '') desc = item.get('description', '') entry = f"{title}: {desc}".strip() achievements.append(entry) if achievements: text_parts.append("Achievements: " + "; ".join(achievements)) return "\n\n".join(text_parts) def update_db_batch(conn, updates): if DRY_RUN: return # Updated to target public.candidates and cast ID to UUID query = """ UPDATE public.candidates AS c SET embeddings = data.vector::vector, embeddings_created_at = NOW() FROM (VALUES %s) AS data (id, vector) WHERE c.id = data.id::uuid """ cursor = conn.cursor() try: execute_values(cursor, query, updates) conn.commit() except Exception as e: conn.rollback() raise e finally: cursor.close() def run_worker_logic(): """ The core logic that runs one single batch processing. """ log_buffer = [] timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_buffer.append(f"BATCH RUN: {timestamp}") conn = None try: conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require') # 1. Fetch & Lock df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE) if df.empty: conn.rollback() log_buffer.append("💤 No pending candidates found.") execution_logs.appendleft("
".join(log_buffer)) return "No data" log_buffer.append(f"🔒 Locked & Processing {len(df)} candidates...") # 2. Clean Text df['full_text'] = df.apply(clean_and_format_text, axis=1) # 3. Log Inputs (For the Root API view) for index, row in df.iterrows(): log_buffer.append(f"
") # row['id'] is now the UUID log_buffer.append(f"ID: {row['id']} ({row.get('name', 'Unknown')})") log_buffer.append(f"
{row['full_text']}
") log_buffer.append("
") # 4. Generate Embeddings embeddings = model.encode( df['full_text'].tolist(), batch_size=EMBEDDING_BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True ) # 5. Update DB # Ensure ID is converted to string for the tuple list if it isn't already updates = list(zip(df['id'].astype(str).tolist(), embeddings.tolist())) if not DRY_RUN: update_db_batch(conn, updates) log_buffer.append(f"✅ Successfully updated {len(df)} profiles.") else: conn.rollback() log_buffer.append("⚠️ Dry Run: No DB updates made.") except Exception as e: if conn: conn.rollback() log_buffer.append(f"❌ ERROR: {str(e)}") print(f"Error: {e}") finally: if conn: conn.close() execution_logs.appendleft("
".join(log_buffer)) # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(): html_content = """ Embedding Worker Logs

📜 Candidates Embedding Worker

Most recent batches shown first.


""" if not execution_logs: html_content += "

No logs yet. Hit the /trigger-batch endpoint to start processing.

" for entry in execution_logs: html_content += f"
{entry}
" html_content += "" return html_content @app.get("/trigger-batch") async def trigger_processing(background_tasks: BackgroundTasks): if processing_lock.locked(): return {"status": "busy", "message": "Worker is currently processing a previous batch."} background_tasks.add_task(wrapped_worker) return {"status": "started", "message": "Batch processing started in background."} def wrapped_worker(): if processing_lock.acquire(blocking=False): try: run_worker_logic() finally: processing_lock.release()