Spaces:
Sleeping
Sleeping
File size: 6,498 Bytes
ebf2eae d28701b b8edb83 ebf2eae 0f7bf78 ebf2eae b8edb83 ebf2eae b8edb83 ebf2eae bee22e6 e02e6ae ebf2eae b8edb83 725a289 b8edb83 a67593a c2a4f15 ebf2eae c2a4f15 ebf2eae c2a4f15 ebf2eae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, Response
import httpx
import uvicorn
import asyncio, os, json
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量
load_dotenv()
app = FastAPI()
# Placeholder for Airs Platform Token (replace with actual validation logic)
AIRS_PLATFORM_TOKEN = os.getenv("AIRS_PLATFORM_TOKEN", "")
# Dummy function to simulate database lookup for API key
async def get_api_key_from_db(user_identifier: str) -> str:
"""
Simulates fetching an API key from a database based on a user identifier.
In a real application, this would involve actual database queries.
"""
# For demonstration, return a fixed API key.
# In a real scenario, you might map user_identifier to a specific API key.
print(f"模拟数据库查找用户: {user_identifier}")
if user_identifier == "valid_user_id_from_token": # This would come from decoding the actual token
return "AIzaSyDCbN6WDNIEtB8yJmNX40ScechQu6mCZoo"
else:
raise HTTPException(status_code=403, detail="禁止访问: 未找到该用户的有效API密钥。")
@app.api_route("/v1/{url_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
async def proxy_request(url_path: str, request: Request):
# --- Airs Platform Token Authorization ---
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="未授权: 缺少Authorization头部。")
# 期望格式: "Bearer your_airs_platform_secret_token"
try:
scheme, token = auth_header.split()
if scheme.lower() != "bearer" or token != AIRS_PLATFORM_TOKEN:
raise HTTPException(status_code=401, detail="未授权: Airs平台令牌无效。")
except ValueError:
raise HTTPException(status_code=401, detail="未授权: Authorization头部格式无效。")
print("Airs平台令牌授权成功。")
# --- 从数据库检索第三方API密钥 ---
# 在实际场景中,'user_identifier' 将从已验证的Airs令牌中提取
# 在此示例中,我们使用占位符。
try:
third_party_api_key = await get_api_key_from_db("valid_user_id_from_token")
print(f"成功从数据库获取第三方API Key: {third_party_api_key[:5]}...") # 打印部分密钥以确保安全
except HTTPException as e:
raise e # 重新抛出来自get_api_key_from_db的HTTPException
print(f"接收到的 url_path: {url_path}")
# url_path 示例: https/open.bigmodel.cn/api/paas/v4/chat/completions
# 找到第一个 '/' 的位置,分隔协议和域名
first_slash_idx = url_path.find('/')
if first_slash_idx == -1:
raise HTTPException(status_code=400, detail="无效的URL路径格式。期望协议/域名/路径。")
protocol = url_path[:first_slash_idx] # 'https'
print(f"解析出的协议: {protocol}")
# 找到第二个 '/' 的位置,分隔域名和实际的路径
second_slash_idx = url_path.find('/', first_slash_idx + 1)
if second_slash_idx == -1:
domain = url_path[first_slash_idx + 1:]
remaining_path = ''
else:
domain = url_path[first_slash_idx + 1:second_slash_idx]
remaining_path = url_path[second_slash_idx:]
target_url = f"{protocol}://{domain}{remaining_path}"
print(f"\n\n\n代理请求到 {target_url}")
# 转发原始请求的头部,排除 'Host' 头部以避免冲突
# FastAPI 的 request.headers 是不可变的,需要转换为字典
headers = {key: value for key, value in request.headers.items() if key.lower() != 'host' and key.lower() != 'content-length'}
headers["user-agent"]="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
# 将检索到的第三方API密钥添加到目标API的请求头中
headers["authorization"] = f"Bearer {third_party_api_key}"
print(f"添加第三方API Key到请求头: Authorization: Bearer {third_party_api_key[:5]}...")
# 获取请求体
request_body = await request.body()
# 获取查询参数
query_params = request.query_params
async with httpx.AsyncClient(verify=True, follow_redirects=False) as client:
try:
# 使用 httpx 库向目标 URL 发送请求
resp = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=request_body, # 使用 content 传递请求体
params=query_params,
timeout=30.0, # 设置超时时间为30秒
)
print('headers',headers)
# 打印目标 API 返回的实际状态码和响应体,用于调试
print(f"目标API响应状态码: {resp.status_code}")
print(f"目标API响应体: {resp.text[:500]}...") # 打印前500个字符,避免过长
# 构建响应头部
excluded_headers = ['content-encoding'] # 保持与 Flask 版本一致
response_headers = {
name: value for name, value in resp.headers.items()
if name.lower() not in excluded_headers
}
# 返回流式响应内容
# httpx 的 .aiter_bytes() 返回异步迭代器
async def generate_response():
async for chunk in resp.aiter_bytes(chunk_size=8192):
yield chunk
return StreamingResponse(generate_response(), status_code=resp.status_code, headers=response_headers)
except httpx.RequestError as e:
error_detail = f"代理请求到 {target_url} 失败: {type(e).__name__} - {e}"
print(f"代理请求失败: {error_detail}")
if e.request:
print(f"请求信息: {e.request.method} {e.request.url}")
if hasattr(e, 'response') and e.response:
print(f"响应信息: {e.response.status_code} {e.response.text[:200]}...")
raise HTTPException(status_code=500, detail=error_detail)
# if __name__ == '__main__':
# # 提示:请确保您已激活 conda 环境 'any-api' (conda activate any-api)
# # 提示:请确保已安装 FastAPI, Uvicorn 和 httpx 库 (pip install fastapi uvicorn httpx)
# print(f"代理服务器正在 0.0.0.0:7860 上启动")
# uvicorn.run(app, host="0.0.0.0", port=7860)
|