Skip to content

Commit

Permalink
WIP: It still works, but it also has a bunch of unused worker process…
Browse files Browse the repository at this point in the history
…ing code.

Signed-off-by: bghira <bghira@users.github.com>
  • Loading branch information
bghira committed Apr 1, 2023
1 parent face0e1 commit 4a1669b
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 6 deletions.
21 changes: 19 additions & 2 deletions discord_tron_master/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from flask_migrate import Migrate
from flask import Flask

from discord_tron_master.classes.worker_manager import WorkerManager
from discord_tron_master.classes.queue_manager import QueueManager
from discord_tron_master.classes.websocket_server import WebSocketServer
from discord_tron_master.classes.command_processor import CommandProcessor

config = AppConfig()
api = API()
from discord_tron_master.auth import Auth
Expand All @@ -24,11 +29,24 @@
from concurrent.futures import ThreadPoolExecutor

def main():
with ThreadPoolExecutor(max_workers=3) as executor:
# Create instances of the worker manager and queue manager
worker_manager = WorkerManager()
queue_manager = QueueManager(worker_manager)

# Create a command processor instance
command_processor = CommandProcessor(queue_manager, worker_manager)

# Create the WebSocket server instance
websocket_host = config.get_websocket_hub_host()
websocket_port = config.get_websocket_hub_port()
websocket_server = WebSocketServer(websocket_host, websocket_port, None, command_processor)

with ThreadPoolExecutor(max_workers=4) as executor:
tasks = [
executor.submit(run_flask_api),
executor.submit(asyncio.run, websocket_hub.run()),
executor.submit(discord_bot.run),
executor.submit(websocket_server.start),
]

for future in concurrent.futures.as_completed(tasks):
Expand All @@ -42,7 +60,6 @@ def run_flask_api():
logging.info("Startup! Begin API.")
api.run()


def create_worker_user(username: str, password: str, email = None):
from discord_tron_master.models.base import db
from discord_tron_master.models.user import User
Expand Down
35 changes: 35 additions & 0 deletions discord_tron_master/classes/command_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from discord_tron_master.classes.queue_manager import QueueManager
from discord_tron_master.classes.worker_manager import WorkerManager
from typing import Dict, Any

class CommandProcessor:
def __init__(self, queue_manager: QueueManager, worker_manager: WorkerManager):
self.queue_manager = queue_manager
self.worker_manager = worker_manager
self.command_handlers = {
"register_worker": self.register_worker,
"unregister_worker": self.unregister_worker,
# Add more command handlers as needed
}

async def process_command(self, command: str, payload: Dict[str, Any]) -> None:
handler = self.command_handlers.get(command)
if handler is None:
# No handler found for the command
return

await handler(payload)

async def register_worker(self, payload: Dict[str, Any]) -> None:
worker_id = payload["worker_id"]
supported_job_types = payload["supported_job_types"]
hardware_limits = payload["hardware_limits"]
self.worker_manager.register_worker(worker_id, supported_job_types, hardware_limits)
self.queue_manager.register_worker(worker_id)

async def unregister_worker(self, payload: Dict[str, Any]) -> None:
worker_id = payload["worker_id"]
self.worker_manager.unregister_worker(worker_id)
self.queue_manager.unregister_worker(worker_id)

# Add more command handler methods as needed
13 changes: 13 additions & 0 deletions discord_tron_master/classes/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import uuid
from typing import Dict, Any


class Job:
def __init__(self, job_type: str, payload: Dict[str, Any]):
self.id = str(uuid.uuid4())
self.job_type = job_type
self.payload = payload

def execute(self):
# Implement the logic to execute the job based on the job type and payload
pass
24 changes: 24 additions & 0 deletions discord_tron_master/classes/queue_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
from typing import Dict, List
from discord_tron_master.classes.worker_manager import WorkerManager

class QueueManager:
def __init__(self, worker_manager: WorkerManager):
self.queues = {} # {"worker_id": {"queue": asyncio.Queue(), "supported_job_types": [...]}, ...}
self.worker_manager = worker_manager

def register_worker(self, worker_id, supported_job_types: List[str]):
self.queues[worker_id] = {"queue": asyncio.Queue(), "supported_job_types": supported_job_types}

def unregister_worker(self, worker_id):
del self.queues[worker_id]

def find_best_fit_worker(self, job_type: str) -> str:
# Logic to find the best fit worker based on job type and worker limits
pass

async def enqueue_job(self, worker_id, job):
await self.queues[worker_id]["queue"].put(job)

async def dequeue_job(self, worker_id):
return await self.queues[worker_id]["queue"].get()
21 changes: 21 additions & 0 deletions discord_tron_master/classes/websocket_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from discord_tron_master.classes.queue_manager import QueueManager

class WebSocketHandler:
def __init__(self):
self.clients = set()
self.queue_manager = QueueManager()

async def handler(self, websocket, path):
# Register client connection
self.clients.add(websocket)
try:
async for message in websocket:
# Process incoming message
await self.process_message(websocket, message)
finally:
# Unregister client connection
self.clients.remove(websocket)

async def process_message(self, websocket, message):
# Process the message here, e.g., send to appropriate queue based on the message type
pass
41 changes: 41 additions & 0 deletions discord_tron_master/classes/websocket_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
import websockets
from typing import Dict, Any
from discord_tron_master.classes.queue_manager import QueueManager
from discord_tron_master.classes.worker_manager import WorkerManager
from discord_tron_master.classes.command_processor import CommandProcessor

class WebSocketServer:
def __init__(self, host: str, port: int, ssl_context, command_processor: CommandProcessor):
self.host = host
self.port = port
self.ssl_context = ssl_context
self.command_processor = command_processor

async def handler(self, websocket, path):
try:
async for message in websocket:
data = json.loads(message)
command = data["command"]
payload = data.get("payload", {})
await self.command_processor.process_command(command, payload)
except websockets.exceptions.ConnectionClosedError:
# Handle connection closed errors, if needed
pass

async def start(self):
server = await websockets.serve(self.handler, self.host, self.port, ssl=self.ssl_context)
await server.wait_closed()


async def main():
# Initialize instances of required classes and start the WebSocket server
queue_manager = QueueManager()
worker_manager = WorkerManager()
command_processor = CommandProcessor(queue_manager, worker_manager)
websocket_server = WebSocketServer("0.0.0.0", 8080, None, command_processor)
await websocket_server.start()


if __name__ == "__main__":
asyncio.run(main())
25 changes: 25 additions & 0 deletions discord_tron_master/classes/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import threading
from queue import Queue
from discord_tron_master.classes.job import Job

class Worker:
def __init__(self, worker_id: str, supported_job_types: list[str]):
self.worker_id = worker_id
self.supported_job_types = supported_job_types
self.job_queue = Queue()

def add_job(self, job: Job):
if job.job_type not in self.supported_job_types:
raise ValueError(f"Unsupported job type: {job.job_type}")
self.job_queue.put(job)

def process_jobs(self):
while True:
job = self.job_queue.get()
if job is None:
break
job.execute()

def start(self):
worker_thread = threading.Thread(target=self.process_jobs)
worker_thread.start()
21 changes: 21 additions & 0 deletions discord_tron_master/classes/worker_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Dict, Any, List

class WorkerManager:
def __init__(self):
self.workers = {} # {"worker_id": {"supported_job_types": [...], "hardware_limits": {...}}, ...}

def register_worker(self, worker_id, supported_job_types: List[str], hardware_limits: Dict[str, Any]):
self.workers[worker_id] = {"supported_job_types": supported_job_types, "hardware_limits": hardware_limits}

def unregister_worker(self, worker_id):
del self.workers[worker_id]

def get_worker_supported_job_types(self, worker_id: str) -> List[str]:
return self.workers[worker_id]["supported_job_types"]

def get_worker_hardware_limits(self, worker_id: str) -> Dict[str, Any]:
return self.workers[worker_id]["hardware_limits"]

def find_best_fit_worker(self, job_type: str) -> str:
# Logic to find the best fit worker based on job type and worker limits
pass
43 changes: 43 additions & 0 deletions discord_tron_master/example_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from discord_tron_master.classes.command_processor import CommandProcessor
from discord_tron_master.classes.job import Job
from discord_tron_master.classes.queue_manager import QueueManager
from discord_tron_master.classes.websocket_handler import WebSocketHandler
from discord_tron_master.classes.websocket_server import WebSocketServer
from discord_tron_master.classes.worker import Worker
from discord_tron_master.classes.worker_manager import WorkerManager
import asyncio

# Initialize the worker manager and add a few workers
worker_manager = WorkerManager()
worker1 = Worker("worker1", ["job_type_1", "job_type_2"])
worker2 = Worker("worker2", ["job_type_2", "job_type_3"])
worker_manager.add_worker(worker1)
worker_manager.add_worker(worker2)

# Initialize the command processor
command_processor = CommandProcessor(worker_manager)

# Initialize the WebSocket handler
websocket_handler = WebSocketHandler(command_processor)

# Initialize the WebSocket server
websocket_server = WebSocketServer(websocket_handler)

# Add some example jobs
job1 = Job("job_type_1", {"data": "Job 1 data"})
job2 = Job("job_type_2", {"data": "Job 2 data"})

# Start the workers and WebSocket server
async def main():
worker1.start()
worker2.start()

# Add jobs to the queue
worker1.add_job(job1)
worker2.add_job(job2)

# Start the WebSocket server
await websocket_server.start()

# Run the example
asyncio.run(main())
14 changes: 10 additions & 4 deletions discord_tron_master/websocket_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ async def handler(self, websocket, path):
if not access_token or not self.auth.validate_access_token(access_token):
await websocket.close(code=4001, reason="Invalid access token")
return
self.connected_clients.add(websocket)
# Add the client to the set of clients
self.clients.add(websocket)
try:
# Process incoming messages
async for message in websocket:
await self.broadcast(message)
# Handle messages here
pass
finally:
self.connected_clients.remove(websocket)
# Remove the client from the set of clients
self.clients.remove(websocket)


async def broadcast(self, message):
for client in self.connected_clients:
await client.send(message)

async def run(self, host="0.0.0.0", port=6789):
server = await websockets.serve(self.handler, host, port)
await server.wait_closed()
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()

0 comments on commit 4a1669b

Please sign in to comment.