ocfastapi / proxy.py
tanbushi's picture
🔧 修复 OpenCode 服务启动问题
b020ec1
# 代理模块 - 处理 OpenCode 请求转发
import httpx
from typing import Optional
from fastapi import Request, Response, HTTPException, status
from fastapi.responses import StreamingResponse
import asyncio
import logging
from config import Config
logger = logging.getLogger(__name__)
class OpenCodeProxy:
"""OpenCode 代理类"""
def __init__(self):
self.base_url = Config.get_opencode_url()
self.client = httpx.AsyncClient(
base_url=self.base_url,
timeout=Config.REQUEST_TIMEOUT
)
async def proxy_request(
self,
request: Request,
path: str,
current_user: Optional[dict] = None
) -> Response:
"""
代理请求到 OpenCode 服务
Args:
request: FastAPI 请求对象
path: 转发路径
current_user: 当前用户信息(用于认证)
Returns:
Response: 代理响应
"""
try:
# 构建目标 URL
target_url = f"/{path}" if path else "/"
# 准备请求头
headers = dict(request.headers)
# 移除不需要转发的头
headers.pop("host", None)
headers.pop("content-length", None)
# 添加认证信息(如果需要)
if current_user:
headers["X-User"] = current_user.get("username", "")
# 获取请求体
body = await request.body()
# 发送代理请求
response = await self.client.request(
method=request.method,
url=target_url,
headers=headers,
content=body if body else None,
params=request.query_params
)
# 准备响应头
response_headers = dict(response.headers)
# 移除不需要的响应头
response_headers.pop("content-encoding", None)
response_headers.pop("transfer-encoding", None)
return Response(
content=response.content,
status_code=response.status_code,
headers=response_headers
)
except httpx.ConnectError:
logger.error(f"无法连接到 OpenCode 服务: {self.base_url}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="OpenCode 服务不可用,请检查服务状态"
)
except httpx.TimeoutException:
logger.error(f"OpenCode 服务响应超时: {self.base_url}")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail="OpenCode 服务响应超时"
)
except Exception as e:
logger.error(f"代理请求失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="代理请求失败"
)
async def proxy_websocket(self, path: str):
"""
代理 WebSocket 连接(如果需要)
目前为占位符实现
"""
# TODO: 实现 WebSocket 代理
# 这需要更复杂的处理,包括协议升级
pass
async def check_service_health(self) -> bool:
"""检查 OpenCode 服务健康状态"""
try:
# 先检查端口是否开放
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', 3000))
sock.close()
if result != 0:
return False
# 端口开放,再检查 HTTP 响应
response = await self.client.get("/", timeout=5)
return response.status_code < 500
except Exception:
return False
async def get_service_info(self) -> dict:
"""获取 OpenCode 服务信息"""
try:
response = await self.client.get("/", timeout=5)
return {
"status": "healthy" if response.status_code < 500 else "unhealthy",
"status_code": response.status_code,
"response_time": response.elapsed.total_seconds() if hasattr(response, 'elapsed') else None
}
except Exception as e:
return {
"status": "unreachable",
"error": str(e)
}
# 全局代理实例
opencode_proxy = OpenCodeProxy()
async def proxy_to_opencode(
request: Request,
path: str,
current_user: Optional[dict] = None
) -> Response:
"""
便捷函数:代理请求到 OpenCode
"""
return await opencode_proxy.proxy_request(request, path, current_user)