Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import StreamingResponse, Response | |
| import httpx | |
| import uvicorn | |
| import asyncio, os, json | |
| from dotenv import load_dotenv | |
| # 加载 .env 文件中的环境变量 | |
| load_dotenv() | |
| app = FastAPI() | |
| # Placeholder for Airs Platform Token (replace with actual validation logic) | |
| AIRS_PLATFORM_TOKEN = os.getenv("AIRS_PLATFORM_TOKEN", "") | |
| # Dummy function to simulate database lookup for API key | |
| async def get_api_key_from_db(user_identifier: str) -> str: | |
| """ | |
| Simulates fetching an API key from a database based on a user identifier. | |
| In a real application, this would involve actual database queries. | |
| """ | |
| # For demonstration, return a fixed API key. | |
| # In a real scenario, you might map user_identifier to a specific API key. | |
| print(f"模拟数据库查找用户: {user_identifier}") | |
| if user_identifier == "valid_user_id_from_token": # This would come from decoding the actual token | |
| return "AIzaSyDCbN6WDNIEtB8yJmNX40ScechQu6mCZoo" | |
| else: | |
| raise HTTPException(status_code=403, detail="禁止访问: 未找到该用户的有效API密钥。") | |
| async def proxy_request(url_path: str, request: Request): | |
| # --- Airs Platform Token Authorization --- | |
| auth_header = request.headers.get("Authorization") | |
| if not auth_header: | |
| raise HTTPException(status_code=401, detail="未授权: 缺少Authorization头部。") | |
| # 期望格式: "Bearer your_airs_platform_secret_token" | |
| try: | |
| scheme, token = auth_header.split() | |
| if scheme.lower() != "bearer" or token != AIRS_PLATFORM_TOKEN: | |
| raise HTTPException(status_code=401, detail="未授权: Airs平台令牌无效。") | |
| except ValueError: | |
| raise HTTPException(status_code=401, detail="未授权: Authorization头部格式无效。") | |
| print("Airs平台令牌授权成功。") | |
| # --- 从数据库检索第三方API密钥 --- | |
| # 在实际场景中,'user_identifier' 将从已验证的Airs令牌中提取 | |
| # 在此示例中,我们使用占位符。 | |
| try: | |
| third_party_api_key = await get_api_key_from_db("valid_user_id_from_token") | |
| print(f"成功从数据库获取第三方API Key: {third_party_api_key[:5]}...") # 打印部分密钥以确保安全 | |
| except HTTPException as e: | |
| raise e # 重新抛出来自get_api_key_from_db的HTTPException | |
| print(f"接收到的 url_path: {url_path}") | |
| # url_path 示例: https/open.bigmodel.cn/api/paas/v4/chat/completions | |
| # 找到第一个 '/' 的位置,分隔协议和域名 | |
| first_slash_idx = url_path.find('/') | |
| if first_slash_idx == -1: | |
| raise HTTPException(status_code=400, detail="无效的URL路径格式。期望协议/域名/路径。") | |
| protocol = url_path[:first_slash_idx] # 'https' | |
| print(f"解析出的协议: {protocol}") | |
| # 找到第二个 '/' 的位置,分隔域名和实际的路径 | |
| second_slash_idx = url_path.find('/', first_slash_idx + 1) | |
| if second_slash_idx == -1: | |
| domain = url_path[first_slash_idx + 1:] | |
| remaining_path = '' | |
| else: | |
| domain = url_path[first_slash_idx + 1:second_slash_idx] | |
| remaining_path = url_path[second_slash_idx:] | |
| target_url = f"{protocol}://{domain}{remaining_path}" | |
| print(f"\n\n\n代理请求到 {target_url}") | |
| # 转发原始请求的头部,排除 'Host' 头部以避免冲突 | |
| # FastAPI 的 request.headers 是不可变的,需要转换为字典 | |
| headers = {key: value for key, value in request.headers.items() if key.lower() != 'host' and key.lower() != 'content-length'} | |
| headers["user-agent"]="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" | |
| # 将检索到的第三方API密钥添加到目标API的请求头中 | |
| headers["authorization"] = f"Bearer {third_party_api_key}" | |
| print(f"添加第三方API Key到请求头: Authorization: Bearer {third_party_api_key[:5]}...") | |
| # 获取请求体 | |
| request_body = await request.body() | |
| # 获取查询参数 | |
| query_params = request.query_params | |
| async with httpx.AsyncClient(verify=True, follow_redirects=False) as client: | |
| try: | |
| # 使用 httpx 库向目标 URL 发送请求 | |
| resp = await client.request( | |
| method=request.method, | |
| url=target_url, | |
| headers=headers, | |
| content=request_body, # 使用 content 传递请求体 | |
| params=query_params, | |
| timeout=30.0, # 设置超时时间为30秒 | |
| ) | |
| print('headers',headers) | |
| # 打印目标 API 返回的实际状态码和响应体,用于调试 | |
| print(f"目标API响应状态码: {resp.status_code}") | |
| print(f"目标API响应体: {resp.text[:500]}...") # 打印前500个字符,避免过长 | |
| # 构建响应头部 | |
| excluded_headers = ['content-encoding'] # 保持与 Flask 版本一致 | |
| response_headers = { | |
| name: value for name, value in resp.headers.items() | |
| if name.lower() not in excluded_headers | |
| } | |
| # 返回流式响应内容 | |
| # httpx 的 .aiter_bytes() 返回异步迭代器 | |
| async def generate_response(): | |
| async for chunk in resp.aiter_bytes(chunk_size=8192): | |
| yield chunk | |
| return StreamingResponse(generate_response(), status_code=resp.status_code, headers=response_headers) | |
| except httpx.RequestError as e: | |
| error_detail = f"代理请求到 {target_url} 失败: {type(e).__name__} - {e}" | |
| print(f"代理请求失败: {error_detail}") | |
| if e.request: | |
| print(f"请求信息: {e.request.method} {e.request.url}") | |
| if hasattr(e, 'response') and e.response: | |
| print(f"响应信息: {e.response.status_code} {e.response.text[:200]}...") | |
| raise HTTPException(status_code=500, detail=error_detail) | |
| # if __name__ == '__main__': | |
| # # 提示:请确保您已激活 conda 环境 'any-api' (conda activate any-api) | |
| # # 提示:请确保已安装 FastAPI, Uvicorn 和 httpx 库 (pip install fastapi uvicorn httpx) | |
| # print(f"代理服务器正在 0.0.0.0:7860 上启动") | |
| # uvicorn.run(app, host="0.0.0.0", port=7860) | |