Spaces:
Build error
Build error
| import os | |
| import base64 | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import JSONResponse | |
| import cv2 | |
| import numpy as np | |
| from realesrgan import RealESRGANer | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| from mediapipe import solutions as mp_solutions | |
| # Initialize FastAPI app | |
| app = FastAPI(title="Face Beautification API") | |
| # --- Model Loading --- | |
| # Load Mediapipe face detector | |
| try: | |
| mp_face = mp_solutions.face_detection.FaceDetection( | |
| model_selection=1, min_detection_confidence=0.5) | |
| except Exception as e: | |
| print(f"Error loading Mediapipe face detector: {e}") | |
| mp_face = None | |
| # Correctly load the Real-ESRGAN model for CPU inference | |
| # The model weights are pre-downloaded in the Dockerfile to the 'weights' directory | |
| model_path = os.path.join("weights", "RealESRGAN_x4plus.pth") | |
| upsampler = None | |
| if os.path.exists(model_path) and mp_face: | |
| try: | |
| # Define the model architecture | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4) | |
| # Initialize the RealESRGANer | |
| # `half=False` is important for CPU-only execution | |
| upsampler = RealESRGANer( | |
| scale=4, | |
| model_path=model_path, | |
| dni_weight=None, | |
| model=model, | |
| tile=0, | |
| tile_pad=10, | |
| pre_pad=0, | |
| half=False, | |
| gpu_id=None, # Explicitly set to None for CPU | |
| ) | |
| except Exception as e: | |
| print(f"Error loading Real-ESRGAN model: {e}") | |
| upsampler = None | |
| else: | |
| print("Model weights not found or face detector failed to load.") | |
| # --- API Endpoints --- | |
| async def root(): | |
| """Root endpoint to check if the API is running.""" | |
| return {"message": "Free Face Beautification API is running!"} | |
| async def beautify(image: UploadFile = File(...)): | |
| """ | |
| Receives an image, detects faces, enhances them, and returns the result. | |
| """ | |
| if not upsampler or not mp_face: | |
| return JSONResponse({"error": "Model not loaded, API is not operational."}, status_code=503) | |
| try: | |
| # 1. Read and decode the uploaded image | |
| contents = await image.read() | |
| npimg = np.frombuffer(contents, np.uint8) | |
| img = cv2.imdecode(npimg, cv2.IMREAD_COLOR) | |
| if img is None: | |
| return JSONResponse({"error": "Invalid image file."}, status_code=400) | |
| # 2. Detect faces using Mediapipe | |
| results = mp_face.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) | |
| if not results.detections: | |
| return JSONResponse({"error": "No face detected in the image."}, status_code=400) | |
| # 3. Process each detected face | |
| for detection in results.detections: | |
| bbox = detection.location_data.relative_bounding_box | |
| h, w, _ = img.shape | |
| # Ensure coordinates are within image bounds | |
| x1 = max(0, int(bbox.xmin * w)) | |
| y1 = max(0, int(bbox.ymin * h)) | |
| x2 = min(w, int((bbox.xmin + bbox.width) * w)) | |
| y2 = min(h, int((bbox.ymin + bbox.height) * h)) | |
| # Crop the face | |
| face = img[y1:y2, x1:x2] | |
| if face.size == 0: | |
| continue | |
| # 4. Enhance the face using Real-ESRGAN | |
| # The `enhance` method returns the upscaled image and its mode | |
| face_upscaled, _ = upsampler.enhance(face) | |
| # 5. Apply a smoothing filter for the "beautification" effect | |
| face_smooth = cv2.bilateralFilter(face_upscaled, d=9, sigmaColor=75, sigmaSpace=75) | |
| # 6. Resize the enhanced face back to its original dimensions and blend it in | |
| face_smooth_resized = cv2.resize(face_smooth, (x2 - x1, y2 - y1)) | |
| img[y1:y2, x1:x2] = face_smooth_resized | |
| # 7. Encode the final image to Base64 to send in the JSON response | |
| _, buffer = cv2.imencode(".jpg", img) | |
| img_base64 = base64.b64encode(buffer).decode("utf-8") | |
| return JSONResponse({ | |
| "status": "success", | |
| "message": "Beautification complete!", | |
| "image_base64": img_base64 | |
| }) | |
| except Exception as e: | |
| return JSONResponse({"error": f"An unexpected error occurred: {str(e)}"}, status_code=500) | |
| async def health(): | |
| """Health check endpoint.""" | |
| return {"ready": bool(upsampler and mp_face)} | |