From db0853886fc52c7349e0fc8948a9349090625492 Mon Sep 17 00:00:00 2001 From: Jonatan Rek Date: Mon, 2 Mar 2026 14:51:15 +0100 Subject: [PATCH] progress --- app.py | 44 ++++++++++++------------ docker_manager.py | 22 +++++++++--- middleware.py | 85 ++++++++++++++++++++++++----------------------- 3 files changed, 84 insertions(+), 67 deletions(-) diff --git a/app.py b/app.py index b6f00a9..75d20fe 100644 --- a/app.py +++ b/app.py @@ -1,35 +1,37 @@ import asyncio +import httpx + from config import load_config from docker_manager import DockerManager from middleware import ProxyMiddleware +# Single shared HTTP client โ€“ reuses TCP connections via connection pooling. +_http_client = httpx.AsyncClient(timeout=30.0) docker_mgr = DockerManager() routes = load_config("config.yml") -class _LifespanApp: - """Minimal ASGI app that only handles lifespan events.""" - - async def __call__(self, scope, receive, send): - if scope["type"] != "lifespan": +async def _lifespan(scope, receive, send): + if scope["type"] != "lifespan": + return + task = None + while True: + event = await receive() + if event["type"] == "lifespan.startup": + task = asyncio.create_task(docker_mgr.idle_watcher()) + await send({"type": "lifespan.startup.complete"}) + elif event["type"] == "lifespan.shutdown": + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + await _http_client.aclose() + await send({"type": "lifespan.shutdown.complete"}) return - task = None - while True: - event = await receive() - if event["type"] == "lifespan.startup": - task = asyncio.create_task(docker_mgr.idle_watcher()) - await send({"type": "lifespan.startup.complete"}) - elif event["type"] == "lifespan.shutdown": - if task: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - await send({"type": "lifespan.shutdown.complete"}) - return -app = ProxyMiddleware(_LifespanApp(), routes=routes, docker=docker_mgr) +app = ProxyMiddleware(_lifespan, routes=routes, docker=docker_mgr, http_client=_http_client) diff --git a/docker_manager.py b/docker_manager.py index 798b2cc..9aca83e 100644 --- a/docker_manager.py +++ b/docker_manager.py @@ -5,11 +5,16 @@ import docker from time import time +_STATUS_TTL = 1.0 # seconds to cache container running-status + + class DockerManager: def __init__(self): self._client = docker.from_env() self._locks: dict[str, asyncio.Lock] = {} self._idle_until: dict[str, float | None] = {} + # Cache: container_name -> (is_running, timestamp) + self._status_cache: dict[str, tuple[bool, float]] = {} def _lock(self, name: str) -> asyncio.Lock: if name not in self._locks: @@ -17,22 +22,30 @@ class DockerManager: return self._locks[name] async def is_running(self, name: str) -> bool: - """Return True if the container exists and is currently running.""" + """Return True if the container is running. + Result is cached for _STATUS_TTL seconds to reduce Docker API calls.""" + now = time() + cached = self._status_cache.get(name) + if cached and now - cached[1] < _STATUS_TTL: + return cached[0] loop = asyncio.get_running_loop() try: c = await loop.run_in_executor(None, self._client.containers.get, name) - return c.status == "running" + result = c.status == "running" except Exception: - return False + result = False + self._status_cache[name] = (result, now) + return result async def ensure_running(self, name: str, load_seconds: int = 0): """Start container if not running, then wait load_seconds.""" async with self._lock(name): loop = asyncio.get_running_loop() c = await loop.run_in_executor(None, self._client.containers.get, name) - if c.status != 'running': + if c.status != "running": await loop.run_in_executor(None, c.start) print(f"[docker] Started: {name}") + self._status_cache[name] = (True, time()) if load_seconds > 0: await asyncio.sleep(load_seconds) @@ -52,6 +65,7 @@ class DockerManager: c = await loop.run_in_executor(None, self._client.containers.get, name) await loop.run_in_executor(None, c.stop) print(f"[docker] Stopped idle: {name}") + self._status_cache[name] = (False, time()) except Exception as e: print(f"[docker] Failed to stop {name}: {e}") finally: diff --git a/middleware.py b/middleware.py index 78cdb96..0288c01 100644 --- a/middleware.py +++ b/middleware.py @@ -9,27 +9,32 @@ import websockets from config import ProxyRoute from docker_manager import DockerManager -# Headers that must not be forwarded between proxy and backend -HOP_BY_HOP = { - "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", - "te", "trailers", "transfer-encoding", "upgrade", +# Hop-by-hop headers stored as bytes โ€“ avoids .decode() on every header (RFC 2616 ยง13.5.1) +_HOP_BY_HOP = { + b"connection", b"keep-alive", b"proxy-authenticate", b"proxy-authorization", + b"te", b"trailers", b"transfer-encoding", b"upgrade", } -TEMPLATES = Path(__file__).parent / "templates" +_TEMPLATES_DIR = Path(__file__).parent / "templates" +_template_cache: dict[str, str] = {} # filename -> raw text, cached on first disk read def _load_template(name: str, **replacements: str) -> bytes: - text = (TEMPLATES / name).read_text(encoding="utf-8") + if name not in _template_cache: + _template_cache[name] = (_TEMPLATES_DIR / name).read_text(encoding="utf-8") + text = _template_cache[name] for key, value in replacements.items(): text = text.replace("{{" + key + "}}", value) return text.encode() class ProxyMiddleware: - def __init__(self, app, routes: dict[str, ProxyRoute], docker: DockerManager): + def __init__(self, app, routes: dict[str, ProxyRoute], docker: DockerManager, + http_client: httpx.AsyncClient): self.app = app self.routes = routes self.docker = docker + self._client = http_client async def __call__(self, scope, receive, send): if scope["type"] == "http": @@ -37,7 +42,6 @@ class ProxyMiddleware: elif scope["type"] == "websocket": await self._websocket(scope, receive, send) else: - # Pass lifespan and other event types to the inner app await self.app(scope, receive, send) # ------------------------------------------------------------------ @@ -59,7 +63,6 @@ class ProxyMiddleware: # ------------------------------------------------------------------ async def _http(self, scope, receive, send): - # Health check handled directly so it works regardless of Host header if scope["path"] == "/health": await send({"type": "http.response.start", "status": 200, "headers": [(b"content-type", b"application/json")]}) @@ -72,7 +75,8 @@ class ProxyMiddleware: host = headers.get(b"host", b"").decode().split(":")[0] body = _load_template("404.html", SERVICE=host) await send({"type": "http.response.start", "status": 404, - "headers": [(b"content-type", b"text/html; charset=utf-8")]}) + "headers": [(b"content-type", b"text/html; charset=utf-8"), + (b"content-length", str(len(body)).encode())]}) await send({"type": "http.response.body", "body": body}) return @@ -101,9 +105,10 @@ class ProxyMiddleware: if query: url += f"?{query}" + # bytes comparison โ€“ no .decode() needed per header req_headers = [ (k, v) for k, v in scope["headers"] - if k.lower().decode() not in HOP_BY_HOP + if k.lower() not in _HOP_BY_HOP ] body = b"" @@ -113,33 +118,31 @@ class ProxyMiddleware: if not msg.get("more_body", False): break - async with httpx.AsyncClient() as client: - try: - resp = await client.request( - method=scope["method"], - url=url, - headers=req_headers, - content=body, - timeout=30.0, - ) - # Strip hop-by-hop headers + content-encoding/length (httpx - # decompresses automatically, so the original values are wrong). - skip = HOP_BY_HOP | {"content-encoding", "content-length"} - resp_headers = [ - (k.lower(), v) for k, v in resp.headers.raw - if k.lower().decode() not in skip - ] - resp_headers.append( - (b"content-length", str(len(resp.content)).encode()) - ) - await send({"type": "http.response.start", - "status": resp.status_code, "headers": resp_headers}) - await send({"type": "http.response.body", "body": resp.content}) - except Exception as e: - print(f"[http] Upstream error for {url}: {e}") - await send({"type": "http.response.start", "status": 502, - "headers": [(b"content-type", b"text/plain")]}) - await send({"type": "http.response.body", "body": b"Bad Gateway"}) + try: + resp = await self._client.request( + method=scope["method"], + url=url, + headers=req_headers, + content=body, + ) + # Strip hop-by-hop + content-encoding/length (httpx decompresses + # automatically so the original values would be wrong). + skip = _HOP_BY_HOP | {b"content-encoding", b"content-length"} + resp_headers = [ + (k.lower(), v) for k, v in resp.headers.raw + if k.lower() not in skip + ] + resp_headers.append( + (b"content-length", str(len(resp.content)).encode()) + ) + await send({"type": "http.response.start", + "status": resp.status_code, "headers": resp_headers}) + await send({"type": "http.response.body", "body": resp.content}) + except Exception as e: + print(f"[http] Upstream error for {url}: {e}") + await send({"type": "http.response.start", "status": 502, + "headers": [(b"content-type", b"text/plain")]}) + await send({"type": "http.response.body", "body": b"Bad Gateway"}) # ------------------------------------------------------------------ # WebSocket proxy @@ -148,7 +151,6 @@ class ProxyMiddleware: async def _websocket(self, scope, receive, send): route = self._find_route(scope) if not route: - # Reject before accept await send({"type": "websocket.close", "code": 4004}) return @@ -165,7 +167,6 @@ class ProxyMiddleware: if query: url += f"?{query}" - # Accept the client WebSocket connection await send({"type": "websocket.accept"}) async def client_to_backend(backend_ws): @@ -194,8 +195,8 @@ class ProxyMiddleware: try: async with websockets.connect(url) as backend_ws: - t1 = asyncio.ensure_future(client_to_backend(backend_ws)) - t2 = asyncio.ensure_future(backend_to_client(backend_ws)) + t1 = asyncio.create_task(client_to_backend(backend_ws)) + t2 = asyncio.create_task(backend_to_client(backend_ws)) _done, pending = await asyncio.wait( [t1, t2], return_when=asyncio.FIRST_COMPLETED )