from fastapi import FastAPI, Request, HTTPException from fastapi.responses import StreamingResponse, Response import httpx import uvicorn import asyncio, os, json from dotenv import load_dotenv # 加载 .env 文件中的环境变量 load_dotenv() app = FastAPI() # Placeholder for Airs Platform Token (replace with actual validation logic) AIRS_PLATFORM_TOKEN = os.getenv("AIRS_PLATFORM_TOKEN", "") # Dummy function to simulate database lookup for API key async def get_api_key_from_db(user_identifier: str) -> str: """ Simulates fetching an API key from a database based on a user identifier. In a real application, this would involve actual database queries. """ # For demonstration, return a fixed API key. # In a real scenario, you might map user_identifier to a specific API key. print(f"模拟数据库查找用户: {user_identifier}") if user_identifier == "valid_user_id_from_token": # This would come from decoding the actual token return "AIzaSyDCbN6WDNIEtB8yJmNX40ScechQu6mCZoo" else: raise HTTPException(status_code=403, detail="禁止访问: 未找到该用户的有效API密钥。") @app.api_route("/v1/{url_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) async def proxy_request(url_path: str, request: Request): # --- Airs Platform Token Authorization --- auth_header = request.headers.get("Authorization") if not auth_header: raise HTTPException(status_code=401, detail="未授权: 缺少Authorization头部。") # 期望格式: "Bearer your_airs_platform_secret_token" try: scheme, token = auth_header.split() if scheme.lower() != "bearer" or token != AIRS_PLATFORM_TOKEN: raise HTTPException(status_code=401, detail="未授权: Airs平台令牌无效。") except ValueError: raise HTTPException(status_code=401, detail="未授权: Authorization头部格式无效。") print("Airs平台令牌授权成功。") # --- 从数据库检索第三方API密钥 --- # 在实际场景中,'user_identifier' 将从已验证的Airs令牌中提取 # 在此示例中,我们使用占位符。 try: third_party_api_key = await get_api_key_from_db("valid_user_id_from_token") print(f"成功从数据库获取第三方API Key: {third_party_api_key[:5]}...") # 打印部分密钥以确保安全 except HTTPException as e: raise e # 重新抛出来自get_api_key_from_db的HTTPException print(f"接收到的 url_path: {url_path}") # url_path 示例: https/open.bigmodel.cn/api/paas/v4/chat/completions # 找到第一个 '/' 的位置,分隔协议和域名 first_slash_idx = url_path.find('/') if first_slash_idx == -1: raise HTTPException(status_code=400, detail="无效的URL路径格式。期望协议/域名/路径。") protocol = url_path[:first_slash_idx] # 'https' print(f"解析出的协议: {protocol}") # 找到第二个 '/' 的位置,分隔域名和实际的路径 second_slash_idx = url_path.find('/', first_slash_idx + 1) if second_slash_idx == -1: domain = url_path[first_slash_idx + 1:] remaining_path = '' else: domain = url_path[first_slash_idx + 1:second_slash_idx] remaining_path = url_path[second_slash_idx:] target_url = f"{protocol}://{domain}{remaining_path}" print(f"\n\n\n代理请求到 {target_url}") # 转发原始请求的头部,排除 'Host' 头部以避免冲突 # FastAPI 的 request.headers 是不可变的,需要转换为字典 headers = {key: value for key, value in request.headers.items() if key.lower() != 'host' and key.lower() != 'content-length'} headers["user-agent"]="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" # 将检索到的第三方API密钥添加到目标API的请求头中 headers["authorization"] = f"Bearer {third_party_api_key}" print(f"添加第三方API Key到请求头: Authorization: Bearer {third_party_api_key[:5]}...") # 获取请求体 request_body = await request.body() # 获取查询参数 query_params = request.query_params async with httpx.AsyncClient(verify=True, follow_redirects=False) as client: try: # 使用 httpx 库向目标 URL 发送请求 resp = await client.request( method=request.method, url=target_url, headers=headers, content=request_body, # 使用 content 传递请求体 params=query_params, timeout=30.0, # 设置超时时间为30秒 ) print('headers',headers) # 打印目标 API 返回的实际状态码和响应体,用于调试 print(f"目标API响应状态码: {resp.status_code}") print(f"目标API响应体: {resp.text[:500]}...") # 打印前500个字符,避免过长 # 构建响应头部 excluded_headers = ['content-encoding'] # 保持与 Flask 版本一致 response_headers = { name: value for name, value in resp.headers.items() if name.lower() not in excluded_headers } # 返回流式响应内容 # httpx 的 .aiter_bytes() 返回异步迭代器 async def generate_response(): async for chunk in resp.aiter_bytes(chunk_size=8192): yield chunk return StreamingResponse(generate_response(), status_code=resp.status_code, headers=response_headers) except httpx.RequestError as e: error_detail = f"代理请求到 {target_url} 失败: {type(e).__name__} - {e}" print(f"代理请求失败: {error_detail}") if e.request: print(f"请求信息: {e.request.method} {e.request.url}") if hasattr(e, 'response') and e.response: print(f"响应信息: {e.response.status_code} {e.response.text[:200]}...") raise HTTPException(status_code=500, detail=error_detail) # if __name__ == '__main__': # # 提示:请确保您已激活 conda 环境 'any-api' (conda activate any-api) # # 提示:请确保已安装 FastAPI, Uvicorn 和 httpx 库 (pip install fastapi uvicorn httpx) # print(f"代理服务器正在 0.0.0.0:7860 上启动") # uvicorn.run(app, host="0.0.0.0", port=7860)