aursalan commited on
Commit
33afddb
Β·
1 Parent(s): 20a1c36

Added files

Browse files
Files changed (3) hide show
  1. Dockerfile +15 -0
  2. main.py +228 -0
  3. requirements.txt +7 -0
Dockerfile ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.9
2
+
3
+ WORKDIR /code
4
+
5
+ COPY ./requirements.txt /code/requirements.txt
6
+ RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
7
+
8
+ COPY . /code
9
+
10
+ # Create a non-root user (security best practice for HF Spaces)
11
+ RUN useradd -m -u 1000 user
12
+ USER user
13
+
14
+ # Run the application on port 7860
15
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
main.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import psycopg2
2
+ from psycopg2.extras import execute_values
3
+ import pandas as pd
4
+ from sentence_transformers import SentenceTransformer
5
+ import os
6
+ import datetime
7
+ import logging
8
+ from collections import deque
9
+ from fastapi import FastAPI, BackgroundTasks, HTTPException
10
+ from contextlib import asynccontextmanager
11
+ from fastapi.responses import HTMLResponse
12
+ import threading
13
+
14
+ # --- Configuration ---
15
+ SUPABASE_CONNECTION_STRING = os.getenv("SUPABASE_CONNECTION_STRING")
16
+
17
+ # --- Toggles & Tuning ---
18
+ PROCESSING_CHUNK_SIZE = 10
19
+ EMBEDDING_BATCH_SIZE = 32
20
+ DRY_RUN = False
21
+
22
+ # --- Global State ---
23
+ model = None
24
+ execution_logs = deque(maxlen=50) # Stores the last 50 batch logs in RAM
25
+ is_processing = False # Lock to prevent overlapping pings
26
+ processing_lock = threading.Lock()
27
+
28
+ # --- Lifespan Manager (Loads Model on Startup) ---
29
+ @asynccontextmanager
30
+ async def lifespan(app: FastAPI):
31
+ global model
32
+ print("⏳ Loading Model...")
33
+ # Load model once when the API starts
34
+ model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
35
+ print("βœ… Model Loaded.")
36
+ yield
37
+ print("πŸ›‘ Shutting down...")
38
+
39
+ app = FastAPI(lifespan=lifespan)
40
+
41
+ # --- Helper Functions ---
42
+
43
+ def fetch_and_lock_chunk(conn, chunk_size):
44
+ query = """
45
+ WITH locked_candidates AS (
46
+ SELECT candidate_id, candidate_name
47
+ FROM candidate_profiles
48
+ WHERE
49
+ candidate_embeddings IS NULL
50
+ OR updated_at > candidate_embeddings_updated_at
51
+ LIMIT %s
52
+ FOR UPDATE SKIP LOCKED
53
+ )
54
+ SELECT
55
+ lc.candidate_id,
56
+ lc.candidate_name,
57
+ (SELECT json_agg(DISTINCT role) FROM candidate_experiences WHERE candidate_id = lc.candidate_id AND role IS NOT NULL) AS experience_roles,
58
+ (SELECT json_agg(DISTINCT experience_description) FROM candidate_experiences WHERE candidate_id = lc.candidate_id AND experience_description IS NOT NULL) AS experience_descriptions,
59
+ (SELECT json_agg(DISTINCT s.skill_name) FROM candidate_skill_map csm JOIN skills s ON csm.skill_id = s.skill_id WHERE csm.candidate_id = lc.candidate_id) AS skills,
60
+ (SELECT json_agg(DISTINCT project_description) FROM candidate_projects WHERE candidate_id = lc.candidate_id AND project_description IS NOT NULL) AS project_descriptions,
61
+ (SELECT json_agg(DISTINCT degree) FROM candidate_education WHERE candidate_id = lc.candidate_id AND degree IS NOT NULL) AS degrees,
62
+ (SELECT json_agg(DISTINCT coursework) FROM candidate_education WHERE candidate_id = lc.candidate_id AND coursework IS NOT NULL) AS courseworks,
63
+ (SELECT json_agg(DISTINCT university) FROM candidate_education WHERE candidate_id = lc.candidate_id AND university IS NOT NULL) AS universities,
64
+ (SELECT json_agg(DISTINCT certificate_name) FROM candidate_certifications WHERE candidate_id = lc.candidate_id AND certificate_name IS NOT NULL) AS certifications,
65
+ (SELECT json_agg(DISTINCT achievement_description) FROM candidate_achievements WHERE candidate_id = lc.candidate_id AND achievement_description IS NOT NULL) AS achievements
66
+ FROM locked_candidates lc;
67
+ """
68
+ return pd.read_sql_query(query, conn, params=(chunk_size,))
69
+
70
+ def clean_and_format_text(row):
71
+ field_config = [
72
+ ('skills', 'Skills'),
73
+ ('experience_roles', 'Past Roles'),
74
+ ('project_descriptions', 'Projects'),
75
+ ('experience_descriptions', 'Experience Details'),
76
+ ('degrees', 'Education - Degrees'),
77
+ ('certifications', 'Certifications'),
78
+ ('achievements', 'Achievements'),
79
+ ]
80
+
81
+ text_parts = []
82
+
83
+ for col_name, tag in field_config:
84
+ if col_name in row:
85
+ data = row[col_name]
86
+ if isinstance(data, list) and len(data) > 0:
87
+ clean_items = [str(item).strip() for item in data if item is not None and str(item).strip()]
88
+ if clean_items:
89
+ text_parts.append(f"{tag}: " + ", ".join(clean_items))
90
+ elif isinstance(data, str) and data.strip():
91
+ text_parts.append(f"{tag}: {data.strip()}")
92
+
93
+ return "\n".join(text_parts)
94
+
95
+ def update_db_batch(conn, updates):
96
+ if DRY_RUN: return
97
+
98
+ query = """
99
+ UPDATE candidate_profiles AS cp
100
+ SET candidate_embeddings = data.vector::vector,
101
+ candidate_embeddings_updated_at = NOW()
102
+ FROM (VALUES %s) AS data (id, vector)
103
+ WHERE cp.candidate_id = data.id
104
+ """
105
+ cursor = conn.cursor()
106
+ try:
107
+ execute_values(cursor, query, updates)
108
+ conn.commit()
109
+ except Exception as e:
110
+ conn.rollback()
111
+ raise e
112
+ finally:
113
+ cursor.close()
114
+
115
+ def run_worker_logic():
116
+ """
117
+ The core logic that runs one single batch processing.
118
+ """
119
+ log_buffer = [] # Local buffer to capture logs for this specific run
120
+ timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
121
+
122
+ log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
123
+
124
+ conn = None
125
+ try:
126
+ conn = psycopg2.connect(SUPABASE_CONNECTION_STRING, sslmode='require')
127
+
128
+ # 1. Fetch & Lock
129
+ df = fetch_and_lock_chunk(conn, PROCESSING_CHUNK_SIZE)
130
+
131
+ if df.empty:
132
+ conn.rollback()
133
+ log_buffer.append("πŸ’€ No pending candidates found.")
134
+ # Add to global logs and exit
135
+ execution_logs.appendleft("<br>".join(log_buffer))
136
+ return "No data"
137
+
138
+ log_buffer.append(f"πŸ”’ Locked & Processing {len(df)} candidates...")
139
+
140
+ # 2. Clean Text
141
+ df['full_text'] = df.apply(clean_and_format_text, axis=1)
142
+
143
+ # 3. Log Inputs (For the Root API view)
144
+ for index, row in df.iterrows():
145
+ log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
146
+ log_buffer.append(f"<strong>ID: {row['candidate_id']} ({row.get('candidate_name', 'Unknown')})</strong>")
147
+ log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>")
148
+ log_buffer.append("</div>")
149
+
150
+ # 4. Generate Embeddings
151
+ embeddings = model.encode(
152
+ df['full_text'].tolist(),
153
+ batch_size=EMBEDDING_BATCH_SIZE,
154
+ show_progress_bar=False,
155
+ convert_to_numpy=True,
156
+ normalize_embeddings=True
157
+ )
158
+
159
+ # 5. Update DB
160
+ updates = list(zip(df['candidate_id'].tolist(), embeddings.tolist()))
161
+
162
+ if not DRY_RUN:
163
+ update_db_batch(conn, updates)
164
+ log_buffer.append(f"βœ… Successfully updated {len(df)} profiles.")
165
+ else:
166
+ conn.rollback()
167
+ log_buffer.append("⚠️ Dry Run: No DB updates made.")
168
+
169
+ except Exception as e:
170
+ if conn: conn.rollback()
171
+ log_buffer.append(f"❌ ERROR: {str(e)}")
172
+ print(f"Error: {e}")
173
+ finally:
174
+ if conn: conn.close()
175
+ # Push the local buffer to the global execution log
176
+ execution_logs.appendleft("<br>".join(log_buffer))
177
+
178
+ # --- API Endpoints ---
179
+
180
+ @app.get("/", response_class=HTMLResponse)
181
+ async def read_root():
182
+ """
183
+ Root endpoint: Displays the logs of recent processing batches.
184
+ """
185
+ html_content = """
186
+ <html>
187
+ <head>
188
+ <title>Embedding Worker Logs</title>
189
+ <style>
190
+ body { font-family: monospace; padding: 20px; }
191
+ h1 { color: #333; }
192
+ .log-entry { margin-bottom: 20px; border-bottom: 2px solid #333; padding-bottom: 20px; }
193
+ </style>
194
+ </head>
195
+ <body>
196
+ <h1>πŸ“œ Background Worker Execution Logs</h1>
197
+ <p><i>Most recent batches shown first.</i></p>
198
+ <hr>
199
+ """
200
+
201
+ if not execution_logs:
202
+ html_content += "<p>No logs yet. Hit the <code>/trigger-batch</code> endpoint to start processing.</p>"
203
+
204
+ for entry in execution_logs:
205
+ html_content += f"<div class='log-entry'>{entry}</div>"
206
+
207
+ html_content += "</body></html>"
208
+ return html_content
209
+
210
+ @app.get("/trigger-batch")
211
+ async def trigger_processing(background_tasks: BackgroundTasks):
212
+ """
213
+ External Pinger: Hits this endpoint to trigger one batch of processing.
214
+ """
215
+ if processing_lock.locked():
216
+ return {"status": "busy", "message": "Worker is currently processing a previous batch."}
217
+
218
+ # We run the worker in a background task so the API response is fast
219
+ background_tasks.add_task(wrapped_worker)
220
+ return {"status": "started", "message": "Batch processing started in background."}
221
+
222
+ def wrapped_worker():
223
+ """Thread-safe wrapper for the worker logic"""
224
+ if processing_lock.acquire(blocking=False):
225
+ try:
226
+ run_worker_logic()
227
+ finally:
228
+ processing_lock.release()
requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn
3
+ psycopg2-binary
4
+ pandas
5
+ sentence-transformers
6
+ einops
7
+ accelerate