From 010f89c7d9f217f014a7b1c935d9ae30c4f9b89b Mon Sep 17 00:00:00 2001 From: Dustella Date: Thu, 20 Feb 2025 15:44:27 +0800 Subject: [PATCH] feat: new ping --- backend.py | 4 +- modules/balloon/__init__.py | 5 -- ping.py | 130 ++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 ping.py diff --git a/backend.py b/backend.py index 8735f5d..7842928 100644 --- a/backend.py +++ b/backend.py @@ -10,6 +10,7 @@ from quart_cors import cors from typing import get_args import sys import matplotlib.font_manager as fm +import ping app = Quart(__name__) app = cors(app, send_origin_wildcard=True, allow_origin="*") @@ -35,7 +36,7 @@ def auth(): @app.route("/ping") -def ping(): +def dping(): return "pong" @@ -45,6 +46,7 @@ app.register_blueprint(saber.saber_module, url_prefix="/saber") app.register_blueprint(tidi.tidi_module, url_prefix="/tidi") app.register_blueprint(cosmic.cosmic_module, url_prefix="/cosmic") # allow cors +ping.setup_websocket(app) if __name__ == '__main__': diff --git a/modules/balloon/__init__.py b/modules/balloon/__init__.py index 66fca2a..171d098 100644 --- a/modules/balloon/__init__.py +++ b/modules/balloon/__init__.py @@ -26,11 +26,6 @@ def get_dataframe_between_year(start_year, end_year): balloon_module = Blueprint("Balloon", __name__) -@balloon_module.route('/') -def home(): - return jsonify(message="Welcome to the Flask balloon_module!") - - @balloon_module.route("/metadata/modes") def supermeta(): return jsonify({ diff --git a/ping.py b/ping.py new file mode 100644 index 0000000..44a5689 --- /dev/null +++ b/ping.py @@ -0,0 +1,130 @@ +from quart import Quart, websocket +import asyncio +import json +import time +from typing import Dict, Set, Optional, Callable, Any +from dataclasses import dataclass +from functools import partial + + +@dataclass +class WebSocketConfig: + """WebSocket配置类""" + heartbeat_timeout: float = 45.0 # 心跳超时时间(秒) + heartbeat_interval: float = 15.0 # 心跳检查间隔(秒) + on_connect: Optional[Callable[[str], Any]] = None # 连接回调 + on_disconnect: Optional[Callable[[str], Any]] = None # 断开回调 + on_message: Optional[Callable[[str, dict], Any]] = None # 消息回调 + + +def setup_websocket( + app: Quart, + url_prefix: str = '/ping/ws', + config: Optional[WebSocketConfig] = None +) -> None: + """ + 设置WebSocket服务 + + Args: + app: Quart应用实例 + url_prefix: WebSocket路由前缀 + config: WebSocket配置 + """ + if config is None: + config = WebSocketConfig() + + # 存储活跃连接 + connected_clients: Set[str] = set() + last_heartbeat: Dict[str, float] = {} + + async def heartbeat_check() -> None: + """心跳检查任务""" + while True: + current_time = time.time() + disconnected = [] + for client_id in connected_clients: + if current_time - last_heartbeat.get(client_id, 0) > config.heartbeat_timeout: + disconnected.append(client_id) + + for client_id in disconnected: + connected_clients.remove(client_id) + last_heartbeat.pop(client_id, None) + if config.on_disconnect: + await config.on_disconnect(client_id) + print(f"Client {client_id} timed out") + + await asyncio.sleep(config.heartbeat_interval) + + @app.websocket(url_prefix) + async def ws(): + """WebSocket路由处理函数""" + try: + client_id = str(id(websocket)) + connected_clients.add(client_id) + last_heartbeat[client_id] = time.time() + + if config.on_connect: + await config.on_connect(client_id) + + print(f"Client {client_id} connected") + + # 发送连接成功消息 + await websocket.send(json.dumps({ + "type": "connected", + "message": "Successfully connected to server", + "client_id": client_id + })) + + while True: + data = await websocket.receive() + try: + message = json.loads(data) + + # 处理心跳 + if message.get("type") == "ping": + last_heartbeat[client_id] = time.time() + await websocket.send(json.dumps({ + "type": "pong", + "timestamp": time.time() + })) + + # 处理其他消息 + elif config.on_message: + await config.on_message(client_id, message) + + except json.JSONDecodeError: + await websocket.send(json.dumps({ + "type": "error", + "message": "Invalid JSON format" + })) + + except asyncio.CancelledError: + print(f"Client {client_id} disconnected") + if config.on_disconnect: + await config.on_disconnect(client_id) + connected_clients.discard(client_id) + last_heartbeat.pop(client_id, None) + raise + + except Exception as e: + print(f"Error handling client {client_id}: {str(e)}") + if config.on_disconnect: + await config.on_disconnect(client_id) + connected_clients.discard(client_id) + last_heartbeat.pop(client_id, None) + await websocket.close(1011) + + @app.before_serving + async def startup(): + """服务启动时启动心跳检查""" + app.heartbeat_task = asyncio.create_task(heartbeat_check()) + + @app.after_serving + async def shutdown(): + """服务关闭时清理""" + if hasattr(app, 'heartbeat_task'): + app.heartbeat_task.cancel() + try: + await app.heartbeat_task + except asyncio.CancelledError: + pass