from __future__ import annotations import asyncio import docker from time import time _STATUS_TTL = 1.0 # seconds to cache container running-status class DockerManager: def __init__(self): self._client = docker.from_env() self._locks: dict[str, asyncio.Lock] = {} self._idle_until: dict[str, float | None] = {} # Cache: container_name -> (is_running, timestamp) self._status_cache: dict[str, tuple[bool, float]] = {} def _lock(self, name: str) -> asyncio.Lock: if name not in self._locks: self._locks[name] = asyncio.Lock() return self._locks[name] async def is_running(self, name: str) -> bool: """Return True if the container is running. Result is cached for _STATUS_TTL seconds to reduce Docker API calls.""" now = time() cached = self._status_cache.get(name) if cached and now - cached[1] < _STATUS_TTL: return cached[0] loop = asyncio.get_running_loop() try: c = await loop.run_in_executor(None, self._client.containers.get, name) result = c.status == "running" except Exception: result = False self._status_cache[name] = (result, now) return result async def ensure_running(self, name: str, load_seconds: int = 0): """Start container if not running, then wait load_seconds.""" async with self._lock(name): loop = asyncio.get_running_loop() c = await loop.run_in_executor(None, self._client.containers.get, name) if c.status != "running": await loop.run_in_executor(None, c.start) print(f"[docker] Started: {name}") self._status_cache[name] = (True, time()) if load_seconds > 0: await asyncio.sleep(load_seconds) def reset_idle(self, name: str, timeout_seconds: int): """Reset idle countdown for a container.""" self._idle_until[name] = (time() + timeout_seconds) if timeout_seconds > 0 else None async def idle_watcher(self, interval: int = 5): """Background async task that stops idle containers.""" loop = asyncio.get_running_loop() while True: await asyncio.sleep(interval) now = time() for name, until in list(self._idle_until.items()): if until and now > until: try: c = await loop.run_in_executor(None, self._client.containers.get, name) await loop.run_in_executor(None, c.stop) print(f"[docker] Stopped idle: {name}") self._status_cache[name] = (False, time()) except Exception as e: print(f"[docker] Failed to stop {name}: {e}") finally: self._idle_until.pop(name, None)