aursalan commited on
Commit
f5f1dc9
Β·
1 Parent(s): 33afddb

Added update

Browse files
Files changed (2) hide show
  1. __pycache__/main.cpython-313.pyc +0 -0
  2. main.py +125 -64
__pycache__/main.cpython-313.pyc ADDED
Binary file (12.7 kB). View file
 
main.py CHANGED
@@ -21,16 +21,15 @@ DRY_RUN = False
21
 
22
  # --- Global State ---
23
  model = None
24
- execution_logs = deque(maxlen=50) # Stores the last 50 batch logs in RAM
25
- is_processing = False # Lock to prevent overlapping pings
26
  processing_lock = threading.Lock()
27
 
28
- # --- Lifespan Manager (Loads Model on Startup) ---
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  global model
32
  print("⏳ Loading Model...")
33
- # Load model once when the API starts
34
  model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
35
  print("βœ… Model Loaded.")
36
  yield
@@ -41,66 +40,136 @@ app = FastAPI(lifespan=lifespan)
41
  # --- Helper Functions ---
42
 
43
  def fetch_and_lock_chunk(conn, chunk_size):
 
 
 
44
  query = """
45
- WITH locked_candidates AS (
46
- SELECT candidate_id, candidate_name
47
- FROM candidate_profiles
48
- WHERE
49
- candidate_embeddings IS NULL
50
- OR updated_at > candidate_embeddings_updated_at
51
- LIMIT %s
52
- FOR UPDATE SKIP LOCKED
53
- )
54
  SELECT
55
- lc.candidate_id,
56
- lc.candidate_name,
57
- (SELECT json_agg(DISTINCT role) FROM candidate_experiences WHERE candidate_id = lc.candidate_id AND role IS NOT NULL) AS experience_roles,
58
- (SELECT json_agg(DISTINCT experience_description) FROM candidate_experiences WHERE candidate_id = lc.candidate_id AND experience_description IS NOT NULL) AS experience_descriptions,
59
- (SELECT json_agg(DISTINCT s.skill_name) FROM candidate_skill_map csm JOIN skills s ON csm.skill_id = s.skill_id WHERE csm.candidate_id = lc.candidate_id) AS skills,
60
- (SELECT json_agg(DISTINCT project_description) FROM candidate_projects WHERE candidate_id = lc.candidate_id AND project_description IS NOT NULL) AS project_descriptions,
61
- (SELECT json_agg(DISTINCT degree) FROM candidate_education WHERE candidate_id = lc.candidate_id AND degree IS NOT NULL) AS degrees,
62
- (SELECT json_agg(DISTINCT coursework) FROM candidate_education WHERE candidate_id = lc.candidate_id AND coursework IS NOT NULL) AS courseworks,
63
- (SELECT json_agg(DISTINCT university) FROM candidate_education WHERE candidate_id = lc.candidate_id AND university IS NOT NULL) AS universities,
64
- (SELECT json_agg(DISTINCT certificate_name) FROM candidate_certifications WHERE candidate_id = lc.candidate_id AND certificate_name IS NOT NULL) AS certifications,
65
- (SELECT json_agg(DISTINCT achievement_description) FROM candidate_achievements WHERE candidate_id = lc.candidate_id AND achievement_description IS NOT NULL) AS achievements
66
- FROM locked_candidates lc;
 
 
 
 
67
  """
 
 
 
68
  return pd.read_sql_query(query, conn, params=(chunk_size,))
69
 
70
  def clean_and_format_text(row):
71
- field_config = [
72
- ('skills', 'Skills'),
73
- ('experience_roles', 'Past Roles'),
74
- ('project_descriptions', 'Projects'),
75
- ('experience_descriptions', 'Experience Details'),
76
- ('degrees', 'Education - Degrees'),
77
- ('certifications', 'Certifications'),
78
- ('achievements', 'Achievements'),
79
- ]
80
-
81
  text_parts = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
 
83
- for col_name, tag in field_config:
84
- if col_name in row:
85
- data = row[col_name]
86
- if isinstance(data, list) and len(data) > 0:
87
- clean_items = [str(item).strip() for item in data if item is not None and str(item).strip()]
88
- if clean_items:
89
- text_parts.append(f"{tag}: " + ", ".join(clean_items))
90
- elif isinstance(data, str) and data.strip():
91
- text_parts.append(f"{tag}: {data.strip()}")
92
-
93
- return "\n".join(text_parts)
 
 
94
 
95
  def update_db_batch(conn, updates):
96
  if DRY_RUN: return
97
 
 
98
  query = """
99
- UPDATE candidate_profiles AS cp
100
- SET candidate_embeddings = data.vector::vector,
101
- candidate_embeddings_updated_at = NOW()
102
  FROM (VALUES %s) AS data (id, vector)
103
- WHERE cp.candidate_id = data.id
104
  """
105
  cursor = conn.cursor()
106
  try:
@@ -116,7 +185,7 @@ def run_worker_logic():
116
  """
117
  The core logic that runs one single batch processing.
118
  """
119
- log_buffer = [] # Local buffer to capture logs for this specific run
120
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
121
 
122
  log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
@@ -131,7 +200,6 @@ def run_worker_logic():
131
  if df.empty:
132
  conn.rollback()
133
  log_buffer.append("πŸ’€ No pending candidates found.")
134
- # Add to global logs and exit
135
  execution_logs.appendleft("<br>".join(log_buffer))
136
  return "No data"
137
 
@@ -143,7 +211,8 @@ def run_worker_logic():
143
  # 3. Log Inputs (For the Root API view)
144
  for index, row in df.iterrows():
145
  log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
146
- log_buffer.append(f"<strong>ID: {row['candidate_id']} ({row.get('candidate_name', 'Unknown')})</strong>")
 
147
  log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>")
148
  log_buffer.append("</div>")
149
 
@@ -157,7 +226,8 @@ def run_worker_logic():
157
  )
158
 
159
  # 5. Update DB
160
- updates = list(zip(df['candidate_id'].tolist(), embeddings.tolist()))
 
161
 
162
  if not DRY_RUN:
163
  update_db_batch(conn, updates)
@@ -172,16 +242,12 @@ def run_worker_logic():
172
  print(f"Error: {e}")
173
  finally:
174
  if conn: conn.close()
175
- # Push the local buffer to the global execution log
176
  execution_logs.appendleft("<br>".join(log_buffer))
177
 
178
  # --- API Endpoints ---
179
 
180
  @app.get("/", response_class=HTMLResponse)
181
  async def read_root():
182
- """
183
- Root endpoint: Displays the logs of recent processing batches.
184
- """
185
  html_content = """
186
  <html>
187
  <head>
@@ -193,7 +259,7 @@ async def read_root():
193
  </style>
194
  </head>
195
  <body>
196
- <h1>πŸ“œ Background Worker Execution Logs</h1>
197
  <p><i>Most recent batches shown first.</i></p>
198
  <hr>
199
  """
@@ -209,18 +275,13 @@ async def read_root():
209
 
210
  @app.get("/trigger-batch")
211
  async def trigger_processing(background_tasks: BackgroundTasks):
212
- """
213
- External Pinger: Hits this endpoint to trigger one batch of processing.
214
- """
215
  if processing_lock.locked():
216
  return {"status": "busy", "message": "Worker is currently processing a previous batch."}
217
 
218
- # We run the worker in a background task so the API response is fast
219
  background_tasks.add_task(wrapped_worker)
220
  return {"status": "started", "message": "Batch processing started in background."}
221
 
222
  def wrapped_worker():
223
- """Thread-safe wrapper for the worker logic"""
224
  if processing_lock.acquire(blocking=False):
225
  try:
226
  run_worker_logic()
 
21
 
22
  # --- Global State ---
23
  model = None
24
+ execution_logs = deque(maxlen=50)
25
+ is_processing = False
26
  processing_lock = threading.Lock()
27
 
28
+ # --- Lifespan Manager ---
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  global model
32
  print("⏳ Loading Model...")
 
33
  model = SentenceTransformer('Alibaba-NLP/gte-modernbert-base', trust_remote_code=True)
34
  print("βœ… Model Loaded.")
35
  yield
 
40
  # --- Helper Functions ---
41
 
42
  def fetch_and_lock_chunk(conn, chunk_size):
43
+ """
44
+ Fetches candidates from the denormalized table where embeddings are missing.
45
+ """
46
  query = """
 
 
 
 
 
 
 
 
 
47
  SELECT
48
+ id,
49
+ name,
50
+ summary,
51
+ work_experience,
52
+ projects,
53
+ education,
54
+ achievements,
55
+ certifications,
56
+ volunteering,
57
+ skills,
58
+ languages
59
+ FROM public.candidates
60
+ WHERE
61
+ embeddings IS NULL
62
+ FOR UPDATE SKIP LOCKED
63
+ LIMIT %s
64
  """
65
+ # Note: If you add an 'updated_at' column later, change WHERE to:
66
+ # WHERE embeddings IS NULL OR updated_at > embeddings_created_at
67
+
68
  return pd.read_sql_query(query, conn, params=(chunk_size,))
69
 
70
  def clean_and_format_text(row):
71
+ """
72
+ Parses the JSONB and Array columns from the new schema to create a
73
+ rich text representation for embedding.
74
+ """
 
 
 
 
 
 
75
  text_parts = []
76
+
77
+ # 1. Basic Info
78
+ if row.get('name'):
79
+ text_parts.append(f"Name: {row['name']}")
80
+
81
+ if row.get('summary'):
82
+ text_parts.append(f"Summary: {row['summary']}")
83
+
84
+ # 2. Skills (Postgres Array -> Python List)
85
+ if row.get('skills') and isinstance(row['skills'], list):
86
+ # Filter out empty strings/None
87
+ valid_skills = [s for s in row['skills'] if s]
88
+ if valid_skills:
89
+ text_parts.append(f"Skills: {', '.join(valid_skills)}")
90
+
91
+ # 3. Work Experience (JSONB List of Dicts)
92
+ # Schema keys: role, company, description, duration
93
+ if row.get('work_experience') and isinstance(row['work_experience'], list):
94
+ exps = []
95
+ for item in row['work_experience']:
96
+ if isinstance(item, dict):
97
+ role = item.get('role', '')
98
+ company = item.get('company', '')
99
+ desc = item.get('description', '')
100
+ # Format: "Role at Company: Description"
101
+ entry = f"{role} at {company}".strip()
102
+ if desc:
103
+ entry += f": {desc}"
104
+ exps.append(entry)
105
+ if exps:
106
+ text_parts.append("Work Experience:\n" + "\n".join(exps))
107
+
108
+ # 4. Projects (JSONB List of Dicts)
109
+ # Schema keys: title, description, link
110
+ if row.get('projects') and isinstance(row['projects'], list):
111
+ projs = []
112
+ for item in row['projects']:
113
+ if isinstance(item, dict):
114
+ title = item.get('title', '')
115
+ desc = item.get('description', '')
116
+ entry = f"{title}".strip()
117
+ if desc:
118
+ entry += f": {desc}"
119
+ projs.append(entry)
120
+ if projs:
121
+ text_parts.append("Projects:\n" + "\n".join(projs))
122
+
123
+ # 5. Education (JSONB List of Dicts)
124
+ # Schema keys: degree, institution, year
125
+ if row.get('education') and isinstance(row['education'], list):
126
+ edus = []
127
+ for item in row['education']:
128
+ if isinstance(item, dict):
129
+ degree = item.get('degree', '')
130
+ inst = item.get('institution', '')
131
+ entry = f"{degree} from {inst}".strip()
132
+ edus.append(entry)
133
+ if edus:
134
+ text_parts.append("Education: " + ", ".join(edus))
135
+
136
+ # 6. Certifications (JSONB List of Dicts)
137
+ # Schema keys: name, issuer
138
+ if row.get('certifications') and isinstance(row['certifications'], list):
139
+ certs = []
140
+ for item in row['certifications']:
141
+ if isinstance(item, dict):
142
+ name = item.get('name', '')
143
+ issuer = item.get('issuer', '')
144
+ entry = f"{name} by {issuer}".strip()
145
+ certs.append(entry)
146
+ if certs:
147
+ text_parts.append("Certifications: " + ", ".join(certs))
148
 
149
+ # 7. Achievements (JSONB List of Dicts)
150
+ if row.get('achievements') and isinstance(row['achievements'], list):
151
+ achievements = []
152
+ for item in row['achievements']:
153
+ if isinstance(item, dict):
154
+ title = item.get('title', '')
155
+ desc = item.get('description', '')
156
+ entry = f"{title}: {desc}".strip()
157
+ achievements.append(entry)
158
+ if achievements:
159
+ text_parts.append("Achievements: " + "; ".join(achievements))
160
+
161
+ return "\n\n".join(text_parts)
162
 
163
  def update_db_batch(conn, updates):
164
  if DRY_RUN: return
165
 
166
+ # Updated to target public.candidates and cast ID to UUID
167
  query = """
168
+ UPDATE public.candidates AS c
169
+ SET embeddings = data.vector::vector,
170
+ embeddings_created_at = NOW()
171
  FROM (VALUES %s) AS data (id, vector)
172
+ WHERE c.id = data.id::uuid
173
  """
174
  cursor = conn.cursor()
175
  try:
 
185
  """
186
  The core logic that runs one single batch processing.
187
  """
188
+ log_buffer = []
189
  timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
190
 
191
  log_buffer.append(f"<b>BATCH RUN: {timestamp}</b>")
 
200
  if df.empty:
201
  conn.rollback()
202
  log_buffer.append("πŸ’€ No pending candidates found.")
 
203
  execution_logs.appendleft("<br>".join(log_buffer))
204
  return "No data"
205
 
 
211
  # 3. Log Inputs (For the Root API view)
212
  for index, row in df.iterrows():
213
  log_buffer.append(f"<div style='border:1px solid #ccc; margin:5px; padding:5px; background:#f9f9f9'>")
214
+ # row['id'] is now the UUID
215
+ log_buffer.append(f"<strong>ID: {row['id']} ({row.get('name', 'Unknown')})</strong>")
216
  log_buffer.append(f"<pre style='white-space: pre-wrap;'>{row['full_text']}</pre>")
217
  log_buffer.append("</div>")
218
 
 
226
  )
227
 
228
  # 5. Update DB
229
+ # Ensure ID is converted to string for the tuple list if it isn't already
230
+ updates = list(zip(df['id'].astype(str).tolist(), embeddings.tolist()))
231
 
232
  if not DRY_RUN:
233
  update_db_batch(conn, updates)
 
242
  print(f"Error: {e}")
243
  finally:
244
  if conn: conn.close()
 
245
  execution_logs.appendleft("<br>".join(log_buffer))
246
 
247
  # --- API Endpoints ---
248
 
249
  @app.get("/", response_class=HTMLResponse)
250
  async def read_root():
 
 
 
251
  html_content = """
252
  <html>
253
  <head>
 
259
  </style>
260
  </head>
261
  <body>
262
+ <h1>πŸ“œ Candidates Embedding Worker</h1>
263
  <p><i>Most recent batches shown first.</i></p>
264
  <hr>
265
  """
 
275
 
276
  @app.get("/trigger-batch")
277
  async def trigger_processing(background_tasks: BackgroundTasks):
 
 
 
278
  if processing_lock.locked():
279
  return {"status": "busy", "message": "Worker is currently processing a previous batch."}
280
 
 
281
  background_tasks.add_task(wrapped_worker)
282
  return {"status": "started", "message": "Batch processing started in background."}
283
 
284
  def wrapped_worker():
 
285
  if processing_lock.acquire(blocking=False):
286
  try:
287
  run_worker_logic()