import http.server import http.client import yaml import docker import threading import time from datetime import datetime, timezone from socketserver import ThreadingMixIn import os import asyncio import websockets import hashlib import base64 # Define the target server to proxy requests to class ProxyHandler(http.server.BaseHTTPRequestHandler): def __init__(self, configuration, docker_client): global activity self.configuration = configuration self.docker_client = docker_client def __call__(self, *args, **kwargs): """Handle a request.""" super().__init__(*args, **kwargs) def log_message(self, format, *args): pass def finish(self,*args,**kw): try: if not self.wfile.closed: self.wfile.flush() self.wfile.close() except socket.error: pass self.rfile.close() def do_GET(self): self.handle_request('GET') def do_POST(self): self.handle_request('POST') def do_PUT(self): self.handle_request('PUT') def do_DELETE(self): self.handle_request('DELETE') def do_HEAD(self): self.handle_request('HEAD') def handle_request(self, method): #print(self.headers.get('Host').split(":")[0]) parsed_request_host = self.headers.get('Host') if (':' in parsed_request_host): parsed_request_host = parsed_request_host.split(":")[0] proxy_host_configuration = next(filter(lambda host: host['domain'] == parsed_request_host, self.configuration['proxy_hosts'])) starting = False for container in proxy_host_configuration['containers']: container_objects = self.docker_client.containers.list(all=True, filters = { 'name' : container['container_name'] }) if (container_objects == []): self.send_404(proxy_host_configuration['domain']) return container_object = container_objects[0] if (container_object.status != 'running'): print("starting container: {0}".format(container['container_name'])) container_object.start() starting = True if (starting == True): self.send_loading(proxy_host_configuration['proxy_load_seconds'], proxy_host_configuration['domain']) return activity[proxy_host_configuration['domain']] = datetime.now(timezone.utc) # Check if this is a WebSocket request if self.headers.get("Upgrade", "").lower() == "websocket": print("Request is WS connecting to {0}".format(container['container_name'])) print("Request is WS connecting to {0}:{1}".format(proxy_host_configuration['proxy_host'],proxy_host_configuration['proxy_port'])) activity[proxy_host_configuration['domain']] = True self.upgrade_to_websocket(proxy_host_configuration['proxy_host'], proxy_host_configuration['proxy_port']) return # Open a connection to the target server conn = http.client.HTTPConnection(proxy_host_configuration['proxy_host'], proxy_host_configuration['proxy_port']) conn.request(method, self.path, headers=self.headers) response = conn.getresponse() self.send_response(response.status) self.send_header('host', proxy_host_configuration['proxy_host']) for header, value in response.getheaders(): self.send_header(header, value) self.end_headers() self.wfile.write(response.read()) conn.close() def send_404(self, service_name): self.send_response(404) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.send_header('Pragma', 'no-cache') self.send_header('Expires', '0') self.end_headers() with open(os.path.dirname(os.path.realpath(__file__)) + '/templates/404.html', 'r') as file: html = file.read() html = html.replace("{{SERVICE}}", service_name) self.wfile.write(bytes(html,"utf-8")) self.wfile.flush() def send_loading(self, wait_time, service_name): self.send_response(201) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Cache-Control', 'no-cache, no-store, must-revalidate') self.send_header('Pragma', 'no-cache') self.send_header('Expires', '0') self.send_header('refresh', wait_time) self.end_headers() with open(os.path.dirname(os.path.realpath(__file__)) + '/templates/wait.html', 'r') as file: html = file.read() self.wfile.write(bytes(html,"utf-8")) #self.wfile.write(bytes("starting service: {0} waiting for {1}s".format(self.headers.get('Host').split(":")[0], proxy_host_configuration['proxy_timeout_seconds']),"utf-8")) #self.wfile.write(bytes("\nlast started at: {0} ".format(activity[proxy_host_configuration['domain']]),"utf-8")) self.wfile.flush() async def websocket_proxy(self, target_host, target_port): server_ws = None try: client_connection = self.connection # Establish server connection to backend server_ws = await websockets.connect(f"ws://{target_host}:{target_port}") # Bridge function to handle message forwarding async def bridge_websockets(): try: while True: # Wait for a message from the client client_message = await client_connection.recv() # Send it to the server await server_ws.send(client_message) # Wait for a message from the server server_message = await server_ws.recv() # Send it to the client await client_connection.send(server_message) except websockets.exceptions.ConnectionClosed as e: print(f"WebSocket connection closed: {e}") except Exception as e: print(f"Error during WebSocket communication: {e}") # Run the bridge coroutine await bridge_websockets() except Exception as e: print(f"WebSocket proxy encountered an error: {e}") finally: if server_ws: await server_ws.close() def upgrade_to_websocket(self, target_host, target_port): """ Handles WebSocket upgrade requests and spawns an asyncio WebSocket proxy. """ key = self.headers['Sec-WebSocket-Key'] accept_val = base64.b64encode(hashlib.sha1((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')).digest()).decode('utf-8') self.send_response(101) # Switching Protocols self.send_header("Upgrade", "websocket") self.send_header("Connection", "Upgrade") self.send_header("Sec-WebSocket-Accept", accept_val) self.end_headers() loop = asyncio.new_event_loop() threading.Thread(target=loop.run_until_complete, args=(self.websocket_proxy(target_host, target_port),)).start() class ThreadedHTTPServer(ThreadingMixIn, http.server.HTTPServer): """Handle requests in a separate thread.""" class BackgroundTasks(threading.Thread): def __init__(self, configuration, docker_client): super(BackgroundTasks, self).__init__() self.configuration = configuration self.docker_client = docker_client def run(self,*args,**kwargs): global activity while True: sleep_time = 900 for apps in self.configuration['proxy_hosts']: if(sleep_time > apps['proxy_timeout_seconds']): sleep_time = apps['proxy_timeout_seconds'] for container in apps['containers']: try: container_object = self.docker_client.containers.get(container['container_name']) if (container_object.status == 'running'): dt = datetime.now(timezone.utc) if (apps['domain'] in activity): dt = activity[apps['domain']] if (dt == True): continue diff_seconds = (datetime.now(timezone.utc) - dt).total_seconds() if(diff_seconds > apps['proxy_timeout_seconds']): print("stopping container: {0} ({1}) after {2}s".format(container['container_name'], container_object.id, diff_seconds)) container_object.stop() except docker.errors.NotFound: pass time.sleep(sleep_time) async def websocket_proxy(client_ws, target_host, target_port): """ Forwards WebSocket messages between the client and the target container. """ try: async with websockets.connect(f"ws://{target_host}:{target_port}") as server_ws: # Create tasks to read from both directions async def forward_client_to_server(): async for message in client_ws: await server_ws.send(message) async def forward_server_to_client(): async for message in server_ws: await client_ws.send(message) await asyncio.gather(forward_client_to_server(), forward_server_to_client()) except Exception as e: print(f"WebSocket error: {e}") finally: await client_ws.close() # MAIN # if __name__ == '__main__': activity = {} with open('config.yml', 'r') as file: configuration = yaml.safe_load(file) docker_client = docker.from_env() t = BackgroundTasks(configuration, docker_client) t.start() # Start the reverse proxy server on port 8888 server_address = ('', configuration['proxy_port']) proxy_handler = ProxyHandler(configuration, docker_client) httpd = ThreadedHTTPServer(server_address, proxy_handler) print('Reverse proxy server running on port {0}...'.format(configuration['proxy_port'])) httpd.serve_forever()