fastapi-proxy / proxyserver-fastapi.py
airsltd's picture
update
e02e6ae
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse, Response
import httpx
import uvicorn
import asyncio, os, json
from dotenv import load_dotenv
# 加载 .env 文件中的环境变量
load_dotenv()
app = FastAPI()
# Placeholder for Airs Platform Token (replace with actual validation logic)
AIRS_PLATFORM_TOKEN = os.getenv("AIRS_PLATFORM_TOKEN", "")
# Dummy function to simulate database lookup for API key
async def get_api_key_from_db(user_identifier: str) -> str:
"""
Simulates fetching an API key from a database based on a user identifier.
In a real application, this would involve actual database queries.
"""
# For demonstration, return a fixed API key.
# In a real scenario, you might map user_identifier to a specific API key.
print(f"模拟数据库查找用户: {user_identifier}")
if user_identifier == "valid_user_id_from_token": # This would come from decoding the actual token
return "AIzaSyDCbN6WDNIEtB8yJmNX40ScechQu6mCZoo"
else:
raise HTTPException(status_code=403, detail="禁止访问: 未找到该用户的有效API密钥。")
@app.api_route("/v1/{url_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
async def proxy_request(url_path: str, request: Request):
# --- Airs Platform Token Authorization ---
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="未授权: 缺少Authorization头部。")
# 期望格式: "Bearer your_airs_platform_secret_token"
try:
scheme, token = auth_header.split()
if scheme.lower() != "bearer" or token != AIRS_PLATFORM_TOKEN:
raise HTTPException(status_code=401, detail="未授权: Airs平台令牌无效。")
except ValueError:
raise HTTPException(status_code=401, detail="未授权: Authorization头部格式无效。")
print("Airs平台令牌授权成功。")
# --- 从数据库检索第三方API密钥 ---
# 在实际场景中,'user_identifier' 将从已验证的Airs令牌中提取
# 在此示例中,我们使用占位符。
try:
third_party_api_key = await get_api_key_from_db("valid_user_id_from_token")
print(f"成功从数据库获取第三方API Key: {third_party_api_key[:5]}...") # 打印部分密钥以确保安全
except HTTPException as e:
raise e # 重新抛出来自get_api_key_from_db的HTTPException
print(f"接收到的 url_path: {url_path}")
# url_path 示例: https/open.bigmodel.cn/api/paas/v4/chat/completions
# 找到第一个 '/' 的位置,分隔协议和域名
first_slash_idx = url_path.find('/')
if first_slash_idx == -1:
raise HTTPException(status_code=400, detail="无效的URL路径格式。期望协议/域名/路径。")
protocol = url_path[:first_slash_idx] # 'https'
print(f"解析出的协议: {protocol}")
# 找到第二个 '/' 的位置,分隔域名和实际的路径
second_slash_idx = url_path.find('/', first_slash_idx + 1)
if second_slash_idx == -1:
domain = url_path[first_slash_idx + 1:]
remaining_path = ''
else:
domain = url_path[first_slash_idx + 1:second_slash_idx]
remaining_path = url_path[second_slash_idx:]
target_url = f"{protocol}://{domain}{remaining_path}"
print(f"\n\n\n代理请求到 {target_url}")
# 转发原始请求的头部,排除 'Host' 头部以避免冲突
# FastAPI 的 request.headers 是不可变的,需要转换为字典
headers = {key: value for key, value in request.headers.items() if key.lower() != 'host' and key.lower() != 'content-length'}
headers["user-agent"]="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
# 将检索到的第三方API密钥添加到目标API的请求头中
headers["authorization"] = f"Bearer {third_party_api_key}"
print(f"添加第三方API Key到请求头: Authorization: Bearer {third_party_api_key[:5]}...")
# 获取请求体
request_body = await request.body()
# 获取查询参数
query_params = request.query_params
async with httpx.AsyncClient(verify=True, follow_redirects=False) as client:
try:
# 使用 httpx 库向目标 URL 发送请求
resp = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=request_body, # 使用 content 传递请求体
params=query_params,
timeout=30.0, # 设置超时时间为30秒
)
print('headers',headers)
# 打印目标 API 返回的实际状态码和响应体,用于调试
print(f"目标API响应状态码: {resp.status_code}")
print(f"目标API响应体: {resp.text[:500]}...") # 打印前500个字符,避免过长
# 构建响应头部
excluded_headers = ['content-encoding'] # 保持与 Flask 版本一致
response_headers = {
name: value for name, value in resp.headers.items()
if name.lower() not in excluded_headers
}
# 返回流式响应内容
# httpx 的 .aiter_bytes() 返回异步迭代器
async def generate_response():
async for chunk in resp.aiter_bytes(chunk_size=8192):
yield chunk
return StreamingResponse(generate_response(), status_code=resp.status_code, headers=response_headers)
except httpx.RequestError as e:
error_detail = f"代理请求到 {target_url} 失败: {type(e).__name__} - {e}"
print(f"代理请求失败: {error_detail}")
if e.request:
print(f"请求信息: {e.request.method} {e.request.url}")
if hasattr(e, 'response') and e.response:
print(f"响应信息: {e.response.status_code} {e.response.text[:200]}...")
raise HTTPException(status_code=500, detail=error_detail)
# if __name__ == '__main__':
# # 提示:请确保您已激活 conda 环境 'any-api' (conda activate any-api)
# # 提示:请确保已安装 FastAPI, Uvicorn 和 httpx 库 (pip install fastapi uvicorn httpx)
# print(f"代理服务器正在 0.0.0.0:7860 上启动")
# uvicorn.run(app, host="0.0.0.0", port=7860)