Files
PY_PROXY/docker_manager.py
Jonatan Rek f601bb3fc8 Progress
2026-03-02 12:16:00 +01:00

59 lines
2.3 KiB
Python

from __future__ import annotations
import asyncio
import docker
from time import time
class DockerManager:
def __init__(self):
self._client = docker.from_env()
self._locks: dict[str, asyncio.Lock] = {}
self._idle_until: dict[str, float | None] = {}
def _lock(self, name: str) -> asyncio.Lock:
if name not in self._locks:
self._locks[name] = asyncio.Lock()
return self._locks[name]
async def is_running(self, name: str) -> bool:
"""Return True if the container exists and is currently running."""
loop = asyncio.get_running_loop()
try:
c = await loop.run_in_executor(None, self._client.containers.get, name)
return c.status == "running"
except Exception:
return False
async def ensure_running(self, name: str, load_seconds: int = 0):
"""Start container if not running, then wait load_seconds."""
async with self._lock(name):
loop = asyncio.get_running_loop()
c = await loop.run_in_executor(None, self._client.containers.get, name)
if c.status != 'running':
await loop.run_in_executor(None, c.start)
print(f"[docker] Started: {name}")
if load_seconds > 0:
await asyncio.sleep(load_seconds)
def reset_idle(self, name: str, timeout_seconds: int):
"""Reset idle countdown for a container."""
self._idle_until[name] = (time() + timeout_seconds) if timeout_seconds > 0 else None
async def idle_watcher(self, interval: int = 5):
"""Background async task that stops idle containers."""
loop = asyncio.get_running_loop()
while True:
await asyncio.sleep(interval)
now = time()
for name, until in list(self._idle_until.items()):
if until and now > until:
try:
c = await loop.run_in_executor(None, self._client.containers.get, name)
await loop.run_in_executor(None, c.stop)
print(f"[docker] Stopped idle: {name}")
except Exception as e:
print(f"[docker] Failed to stop {name}: {e}")
finally:
self._idle_until.pop(name, None)