From f601bb3fc8d64af61acb3547e24ad7234a4a7b15 Mon Sep 17 00:00:00 2001 From: Jonatan Rek Date: Mon, 2 Mar 2026 12:16:00 +0100 Subject: [PATCH] Progress --- Dockerfile | 2 +- app.py | 165 ++++++----------------------------- compose.yaml | 50 ++++++++++- config.py | 31 +++++++ config.yml | 15 ++-- docker_manager.py | 58 +++++++++++++ middleware.py | 214 ++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 - 8 files changed, 385 insertions(+), 151 deletions(-) create mode 100644 config.py create mode 100644 docker_manager.py create mode 100644 middleware.py diff --git a/Dockerfile b/Dockerfile index 5e33fc6..9d6b718 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,6 @@ COPY . /app RUN pip install --no-cache-dir -r /app/requirements.txt # Start Anonimization -CMD ["python", "-u", "/app/app.py"] +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"] WORKDIR /app \ No newline at end of file diff --git a/app.py b/app.py index 1c83e57..b6f00a9 100644 --- a/app.py +++ b/app.py @@ -1,148 +1,35 @@ -# main.py import asyncio -from fastapi import FastAPI, Request, Response, status -from fastapi.responses import JSONResponse -import httpx -import yaml -import docker -from starlette.middleware.base import BaseHTTPMiddleware -from starlette.types import ASGIApp -from threading import Thread -from time import time, sleep -import websockets -from starlette.types import Receive, Scope, Send -from starlette.websockets import WebSocket -from starlette.requests import HTTPConnection + +from config import load_config +from docker_manager import DockerManager +from middleware import ProxyMiddleware -# --- Configuration Loading --- -class Route: - def __init__(self, path_prefix: str, target: str, container: str | None = None): - self.path_prefix = path_prefix - self.target = target - self.container = container - -def load_config(path: str): - with open(path, 'r') as f: - data = yaml.safe_load(f) - return [Route(**r) for r in data.get('routes', [])] +docker_mgr = DockerManager() +routes = load_config("config.yml") -# --- Docker Management --- -idle_containers = {} -idle_timeout = {} -idle_check_interval = 5 +class _LifespanApp: + """Minimal ASGI app that only handles lifespan events.""" -def idle_watcher(): - client = docker.from_env() - while True: - now = time() - for container, until in list(idle_containers.items()): - if until and now > until: - try: - c = client.containers.get(container) - c.stop() - print(f"[watcher] Stopped idle container: {container}") - except Exception as e: - print(f"[watcher] Failed to stop {container}: {e}") - finally: - idle_containers.pop(container, None) - sleep(idle_check_interval) - -Thread(target=idle_watcher, daemon=True).start() - - -# --- Proxy Middleware --- -class ReverseProxyMiddleware(BaseHTTPMiddleware): - def __init__(self, app: ASGIApp, routes: list[Route]): - super().__init__(app) - self.routes = routes - self.docker = docker.from_env() - - async def dispatch(self, request: Request, call_next): - path = request.url.path - route = next((r for r in self.routes if path.startswith(r.path_prefix)), None) - - if not route: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content={"detail": "Not Found"}) - - if route.container: - try: - container = self.docker.containers.get(route.container) - if container.status != 'running': - container.start() - print(f"Started container {route.container}") - timeout = idle_timeout.get(route.container) - if timeout: - idle_containers[route.container] = time() + timeout - else: - idle_containers[route.container] = None - except Exception as e: - print(f"[proxy] Failed to ensure container '{route.container}': {e}") - return JSONResponse(status_code=status.HTTP_502_BAD_GATEWAY, content={"detail": "Container Error"}) - - # WebSocket upgrade detection - if request.headers.get("upgrade", "").lower() == "websocket": - return await self.handle_websocket_upgrade(request, route) - - new_url = route.target.rstrip('/') + path[len(route.path_prefix):] - async with httpx.AsyncClient() as client: - try: - proxied = await client.request( - method=request.method, - url=new_url, - headers=request.headers.raw, - content=await request.body(), - timeout=30.0 - ) - return Response(content=proxied.content, status_code=proxied.status_code, headers=proxied.headers) - except Exception as e: - print(f"[proxy] Failed proxying to {new_url}: {e}") - return JSONResponse(status_code=status.HTTP_502_BAD_GATEWAY, content={"detail": "Upstream Error"}) - - async def handle_websocket_upgrade(self, request: Request, route: Route): - scope: Scope = request.scope - receive: Receive = request.receive - send: Send = request._send # Unsafe, but needed for ASGI hijack - ws = WebSocket(scope, receive=receive, send=send) - await ws.accept() - - target_ws_url = route.target.rstrip('/') + request.url.path[len(route.path_prefix):] - if target_ws_url.startswith("http"): - target_ws_url = target_ws_url.replace("http", "ws", 1) - - try: - async with websockets.connect(target_ws_url) as backend: - async def to_backend(): + async def __call__(self, scope, receive, send): + if scope["type"] != "lifespan": + return + task = None + while True: + event = await receive() + if event["type"] == "lifespan.startup": + task = asyncio.create_task(docker_mgr.idle_watcher()) + await send({"type": "lifespan.startup.complete"}) + elif event["type"] == "lifespan.shutdown": + if task: + task.cancel() try: - while True: - data = await ws.receive_text() - await backend.send(data) - except Exception: - await backend.close() - - async def from_backend(): - try: - while True: - data = await backend.recv() - await ws.send_text(data) - except Exception: - await ws.close() - - await asyncio.gather(to_backend(), from_backend()) - except Exception as e: - print(f"[ws] Proxy error: {e}") - await ws.close(code=1011) - return Response(status_code=502, content=b"WebSocket proxy error") + await task + except asyncio.CancelledError: + pass + await send({"type": "lifespan.shutdown.complete"}) + return -# --- App Setup --- -app = FastAPI() -config = load_config("config.yml") -app.add_middleware(ReverseProxyMiddleware, routes=config) - - -# Optional: Health check -@app.get("/health") -async def health(): - return {"status": "ok"} +app = ProxyMiddleware(_LifespanApp(), routes=routes, docker=docker_mgr) diff --git a/compose.yaml b/compose.yaml index 6ad23ed..4974a0c 100644 --- a/compose.yaml +++ b/compose.yaml @@ -5,8 +5,52 @@ services: context: . restart: unless-stopped volumes: - - ./config.yaml:/app/config.yaml + - ./config.yml:/app/config.yml - /var/run/docker.sock:/var/run/docker.sock ports: - - 80:80 -networks: {} \ No newline at end of file + - "80:80" + networks: + - proxy_net + depends_on: + - db + + web-socket-test: + image: ksdn117/web-socket-test + container_name: web-socket-test + restart: unless-stopped + networks: + - proxy_net + + wordpress: + image: wordpress:latest + container_name: wordpress + restart: unless-stopped + environment: + WORDPRESS_DB_HOST: db + WORDPRESS_DB_USER: wp + WORDPRESS_DB_PASSWORD: wp + WORDPRESS_DB_NAME: wordpress + depends_on: + - db + networks: + - proxy_net + + db: + image: mariadb:11 + container_name: db + restart: unless-stopped + environment: + MARIADB_DATABASE: wordpress + MARIADB_USER: wp + MARIADB_PASSWORD: wp + MARIADB_ROOT_PASSWORD: root + volumes: + - db_data:/var/lib/mysql + networks: + - proxy_net + +networks: + proxy_net: + +volumes: + db_data: diff --git a/config.py b/config.py new file mode 100644 index 0000000..3b0d196 --- /dev/null +++ b/config.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass, field +import yaml + + +@dataclass +class ProxyRoute: + domain: str + target_host: str + target_port: int + containers: list[str] = field(default_factory=list) + timeout_seconds: int = 0 # idle timeout before stopping container (0 = never) + load_seconds: int = 0 # seconds to wait after starting container + + +def load_config(path: str) -> dict[str, ProxyRoute]: + """Parse config.yml and return dict of domain -> ProxyRoute.""" + with open(path) as f: + data = yaml.safe_load(f) + routes: dict[str, ProxyRoute] = {} + for h in data.get('proxy_hosts', []): + containers = [c['container_name'] for c in h.get('containers', [])] + route = ProxyRoute( + domain=h['domain'], + target_host=h['proxy_host'], + target_port=h['proxy_port'], + containers=containers, + timeout_seconds=h.get('proxy_timeout_seconds', 0), + load_seconds=h.get('proxy_load_seconds', 0), + ) + routes[route.domain] = route + return routes diff --git a/config.yml b/config.yml index c268573..c358e9b 100644 --- a/config.yml +++ b/config.yml @@ -3,15 +3,16 @@ proxy_hosts: - domain: ws.local containers: - container_name: web-socket-test - proxy_host: 10.201.0.128 + proxy_host: web-socket-test proxy_port: 8010 - proxy_timeout_seconds: 10 - proxy_load_seconds: 5 + proxy_timeout_seconds: 60 + proxy_load_seconds: 3 - domain: wp.local containers: + - container_name: wordpress - container_name: db - proxy_host: 10.201.0.128 - proxy_port: 8888 - proxy_timeout_seconds: 10 - proxy_load_seconds: 5 \ No newline at end of file + proxy_host: wordpress + proxy_port: 80 + proxy_timeout_seconds: 60 + proxy_load_seconds: 10 \ No newline at end of file diff --git a/docker_manager.py b/docker_manager.py new file mode 100644 index 0000000..798b2cc --- /dev/null +++ b/docker_manager.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import asyncio +import docker +from time import time + + +class DockerManager: + def __init__(self): + self._client = docker.from_env() + self._locks: dict[str, asyncio.Lock] = {} + self._idle_until: dict[str, float | None] = {} + + def _lock(self, name: str) -> asyncio.Lock: + if name not in self._locks: + self._locks[name] = asyncio.Lock() + return self._locks[name] + + async def is_running(self, name: str) -> bool: + """Return True if the container exists and is currently running.""" + loop = asyncio.get_running_loop() + try: + c = await loop.run_in_executor(None, self._client.containers.get, name) + return c.status == "running" + except Exception: + return False + + async def ensure_running(self, name: str, load_seconds: int = 0): + """Start container if not running, then wait load_seconds.""" + async with self._lock(name): + loop = asyncio.get_running_loop() + c = await loop.run_in_executor(None, self._client.containers.get, name) + if c.status != 'running': + await loop.run_in_executor(None, c.start) + print(f"[docker] Started: {name}") + if load_seconds > 0: + await asyncio.sleep(load_seconds) + + def reset_idle(self, name: str, timeout_seconds: int): + """Reset idle countdown for a container.""" + self._idle_until[name] = (time() + timeout_seconds) if timeout_seconds > 0 else None + + async def idle_watcher(self, interval: int = 5): + """Background async task that stops idle containers.""" + loop = asyncio.get_running_loop() + while True: + await asyncio.sleep(interval) + now = time() + for name, until in list(self._idle_until.items()): + if until and now > until: + try: + c = await loop.run_in_executor(None, self._client.containers.get, name) + await loop.run_in_executor(None, c.stop) + print(f"[docker] Stopped idle: {name}") + except Exception as e: + print(f"[docker] Failed to stop {name}: {e}") + finally: + self._idle_until.pop(name, None) diff --git a/middleware.py b/middleware.py new file mode 100644 index 0000000..78cdb96 --- /dev/null +++ b/middleware.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path + +import httpx +import websockets + +from config import ProxyRoute +from docker_manager import DockerManager + +# Headers that must not be forwarded between proxy and backend +HOP_BY_HOP = { + "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", + "te", "trailers", "transfer-encoding", "upgrade", +} + +TEMPLATES = Path(__file__).parent / "templates" + + +def _load_template(name: str, **replacements: str) -> bytes: + text = (TEMPLATES / name).read_text(encoding="utf-8") + for key, value in replacements.items(): + text = text.replace("{{" + key + "}}", value) + return text.encode() + + +class ProxyMiddleware: + def __init__(self, app, routes: dict[str, ProxyRoute], docker: DockerManager): + self.app = app + self.routes = routes + self.docker = docker + + async def __call__(self, scope, receive, send): + if scope["type"] == "http": + await self._http(scope, receive, send) + elif scope["type"] == "websocket": + await self._websocket(scope, receive, send) + else: + # Pass lifespan and other event types to the inner app + await self.app(scope, receive, send) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _find_route(self, scope) -> ProxyRoute | None: + headers = dict(scope.get("headers", [])) + host = headers.get(b"host", b"").decode().split(":")[0] + return self.routes.get(host) + + async def _ensure_containers(self, route: ProxyRoute): + for name in route.containers: + await self.docker.ensure_running(name, route.load_seconds) + self.docker.reset_idle(name, route.timeout_seconds) + + # ------------------------------------------------------------------ + # HTTP proxy + # ------------------------------------------------------------------ + + async def _http(self, scope, receive, send): + # Health check handled directly so it works regardless of Host header + if scope["path"] == "/health": + await send({"type": "http.response.start", "status": 200, + "headers": [(b"content-type", b"application/json")]}) + await send({"type": "http.response.body", "body": b'{"status":"ok"}'}) + return + + route = self._find_route(scope) + if not route: + headers = dict(scope.get("headers", [])) + host = headers.get(b"host", b"").decode().split(":")[0] + body = _load_template("404.html", SERVICE=host) + await send({"type": "http.response.start", "status": 404, + "headers": [(b"content-type", b"text/html; charset=utf-8")]}) + await send({"type": "http.response.body", "body": body}) + return + + # If any container is not yet running, show wait page and start it + # in the background. The browser will auto-refresh via the Refresh header. + for name in route.containers: + if not await self.docker.is_running(name): + asyncio.create_task(self._ensure_containers(route)) + body = _load_template("wait.html") + await send({"type": "http.response.start", "status": 503, + "headers": [ + (b"content-type", b"text/html; charset=utf-8"), + (b"refresh", b"3"), + (b"content-length", str(len(body)).encode()), + ]}) + await send({"type": "http.response.body", "body": body}) + return + + # All containers running – reset idle timers and proxy the request. + for name in route.containers: + self.docker.reset_idle(name, route.timeout_seconds) + + path = scope["path"] + query = scope.get("query_string", b"").decode() + url = f"http://{route.target_host}:{route.target_port}{path}" + if query: + url += f"?{query}" + + req_headers = [ + (k, v) for k, v in scope["headers"] + if k.lower().decode() not in HOP_BY_HOP + ] + + body = b"" + while True: + msg = await receive() + body += msg.get("body", b"") + if not msg.get("more_body", False): + break + + async with httpx.AsyncClient() as client: + try: + resp = await client.request( + method=scope["method"], + url=url, + headers=req_headers, + content=body, + timeout=30.0, + ) + # Strip hop-by-hop headers + content-encoding/length (httpx + # decompresses automatically, so the original values are wrong). + skip = HOP_BY_HOP | {"content-encoding", "content-length"} + resp_headers = [ + (k.lower(), v) for k, v in resp.headers.raw + if k.lower().decode() not in skip + ] + resp_headers.append( + (b"content-length", str(len(resp.content)).encode()) + ) + await send({"type": "http.response.start", + "status": resp.status_code, "headers": resp_headers}) + await send({"type": "http.response.body", "body": resp.content}) + except Exception as e: + print(f"[http] Upstream error for {url}: {e}") + await send({"type": "http.response.start", "status": 502, + "headers": [(b"content-type", b"text/plain")]}) + await send({"type": "http.response.body", "body": b"Bad Gateway"}) + + # ------------------------------------------------------------------ + # WebSocket proxy + # ------------------------------------------------------------------ + + async def _websocket(self, scope, receive, send): + route = self._find_route(scope) + if not route: + # Reject before accept + await send({"type": "websocket.close", "code": 4004}) + return + + try: + await self._ensure_containers(route) + except Exception as e: + print(f"[ws] Container error: {e}") + await send({"type": "websocket.close", "code": 1011}) + return + + path = scope["path"] + query = scope.get("query_string", b"").decode() + url = f"ws://{route.target_host}:{route.target_port}{path}" + if query: + url += f"?{query}" + + # Accept the client WebSocket connection + await send({"type": "websocket.accept"}) + + async def client_to_backend(backend_ws): + try: + while True: + msg = await receive() + if msg["type"] == "websocket.receive": + if msg.get("text") is not None: + await backend_ws.send(msg["text"]) + elif msg.get("bytes") is not None: + await backend_ws.send(msg["bytes"]) + elif msg["type"] == "websocket.disconnect": + break + except Exception as e: + print(f"[ws] client→backend error: {e}") + + async def backend_to_client(backend_ws): + try: + async for msg in backend_ws: + if isinstance(msg, str): + await send({"type": "websocket.send", "text": msg}) + else: + await send({"type": "websocket.send", "bytes": msg}) + except Exception as e: + print(f"[ws] backend→client error: {e}") + + try: + async with websockets.connect(url) as backend_ws: + t1 = asyncio.ensure_future(client_to_backend(backend_ws)) + t2 = asyncio.ensure_future(backend_to_client(backend_ws)) + _done, pending = await asyncio.wait( + [t1, t2], return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception as e: + print(f"[ws] Connection error to {url}: {e}") + finally: + try: + await send({"type": "websocket.close", "code": 1000}) + except Exception: + pass diff --git a/requirements.txt b/requirements.txt index 9d3fee6..e37e4c7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ docker PyYAML websockets -fastapi uvicorn[standard] httpx \ No newline at end of file