-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: It still works, but it also has a bunch of unused worker process…
…ing code. Signed-off-by: bghira <bghira@users.github.com>
- Loading branch information
bghira
committed
Apr 1, 2023
1 parent
face0e1
commit 4a1669b
Showing
10 changed files
with
252 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from discord_tron_master.classes.queue_manager import QueueManager | ||
from discord_tron_master.classes.worker_manager import WorkerManager | ||
from typing import Dict, Any | ||
|
||
class CommandProcessor: | ||
def __init__(self, queue_manager: QueueManager, worker_manager: WorkerManager): | ||
self.queue_manager = queue_manager | ||
self.worker_manager = worker_manager | ||
self.command_handlers = { | ||
"register_worker": self.register_worker, | ||
"unregister_worker": self.unregister_worker, | ||
# Add more command handlers as needed | ||
} | ||
|
||
async def process_command(self, command: str, payload: Dict[str, Any]) -> None: | ||
handler = self.command_handlers.get(command) | ||
if handler is None: | ||
# No handler found for the command | ||
return | ||
|
||
await handler(payload) | ||
|
||
async def register_worker(self, payload: Dict[str, Any]) -> None: | ||
worker_id = payload["worker_id"] | ||
supported_job_types = payload["supported_job_types"] | ||
hardware_limits = payload["hardware_limits"] | ||
self.worker_manager.register_worker(worker_id, supported_job_types, hardware_limits) | ||
self.queue_manager.register_worker(worker_id) | ||
|
||
async def unregister_worker(self, payload: Dict[str, Any]) -> None: | ||
worker_id = payload["worker_id"] | ||
self.worker_manager.unregister_worker(worker_id) | ||
self.queue_manager.unregister_worker(worker_id) | ||
|
||
# Add more command handler methods as needed |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import uuid | ||
from typing import Dict, Any | ||
|
||
|
||
class Job: | ||
def __init__(self, job_type: str, payload: Dict[str, Any]): | ||
self.id = str(uuid.uuid4()) | ||
self.job_type = job_type | ||
self.payload = payload | ||
|
||
def execute(self): | ||
# Implement the logic to execute the job based on the job type and payload | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import asyncio | ||
from typing import Dict, List | ||
from discord_tron_master.classes.worker_manager import WorkerManager | ||
|
||
class QueueManager: | ||
def __init__(self, worker_manager: WorkerManager): | ||
self.queues = {} # {"worker_id": {"queue": asyncio.Queue(), "supported_job_types": [...]}, ...} | ||
self.worker_manager = worker_manager | ||
|
||
def register_worker(self, worker_id, supported_job_types: List[str]): | ||
self.queues[worker_id] = {"queue": asyncio.Queue(), "supported_job_types": supported_job_types} | ||
|
||
def unregister_worker(self, worker_id): | ||
del self.queues[worker_id] | ||
|
||
def find_best_fit_worker(self, job_type: str) -> str: | ||
# Logic to find the best fit worker based on job type and worker limits | ||
pass | ||
|
||
async def enqueue_job(self, worker_id, job): | ||
await self.queues[worker_id]["queue"].put(job) | ||
|
||
async def dequeue_job(self, worker_id): | ||
return await self.queues[worker_id]["queue"].get() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from discord_tron_master.classes.queue_manager import QueueManager | ||
|
||
class WebSocketHandler: | ||
def __init__(self): | ||
self.clients = set() | ||
self.queue_manager = QueueManager() | ||
|
||
async def handler(self, websocket, path): | ||
# Register client connection | ||
self.clients.add(websocket) | ||
try: | ||
async for message in websocket: | ||
# Process incoming message | ||
await self.process_message(websocket, message) | ||
finally: | ||
# Unregister client connection | ||
self.clients.remove(websocket) | ||
|
||
async def process_message(self, websocket, message): | ||
# Process the message here, e.g., send to appropriate queue based on the message type | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import asyncio | ||
import websockets | ||
from typing import Dict, Any | ||
from discord_tron_master.classes.queue_manager import QueueManager | ||
from discord_tron_master.classes.worker_manager import WorkerManager | ||
from discord_tron_master.classes.command_processor import CommandProcessor | ||
|
||
class WebSocketServer: | ||
def __init__(self, host: str, port: int, ssl_context, command_processor: CommandProcessor): | ||
self.host = host | ||
self.port = port | ||
self.ssl_context = ssl_context | ||
self.command_processor = command_processor | ||
|
||
async def handler(self, websocket, path): | ||
try: | ||
async for message in websocket: | ||
data = json.loads(message) | ||
command = data["command"] | ||
payload = data.get("payload", {}) | ||
await self.command_processor.process_command(command, payload) | ||
except websockets.exceptions.ConnectionClosedError: | ||
# Handle connection closed errors, if needed | ||
pass | ||
|
||
async def start(self): | ||
server = await websockets.serve(self.handler, self.host, self.port, ssl=self.ssl_context) | ||
await server.wait_closed() | ||
|
||
|
||
async def main(): | ||
# Initialize instances of required classes and start the WebSocket server | ||
queue_manager = QueueManager() | ||
worker_manager = WorkerManager() | ||
command_processor = CommandProcessor(queue_manager, worker_manager) | ||
websocket_server = WebSocketServer("0.0.0.0", 8080, None, command_processor) | ||
await websocket_server.start() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import threading | ||
from queue import Queue | ||
from discord_tron_master.classes.job import Job | ||
|
||
class Worker: | ||
def __init__(self, worker_id: str, supported_job_types: list[str]): | ||
self.worker_id = worker_id | ||
self.supported_job_types = supported_job_types | ||
self.job_queue = Queue() | ||
|
||
def add_job(self, job: Job): | ||
if job.job_type not in self.supported_job_types: | ||
raise ValueError(f"Unsupported job type: {job.job_type}") | ||
self.job_queue.put(job) | ||
|
||
def process_jobs(self): | ||
while True: | ||
job = self.job_queue.get() | ||
if job is None: | ||
break | ||
job.execute() | ||
|
||
def start(self): | ||
worker_thread = threading.Thread(target=self.process_jobs) | ||
worker_thread.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from typing import Dict, Any, List | ||
|
||
class WorkerManager: | ||
def __init__(self): | ||
self.workers = {} # {"worker_id": {"supported_job_types": [...], "hardware_limits": {...}}, ...} | ||
|
||
def register_worker(self, worker_id, supported_job_types: List[str], hardware_limits: Dict[str, Any]): | ||
self.workers[worker_id] = {"supported_job_types": supported_job_types, "hardware_limits": hardware_limits} | ||
|
||
def unregister_worker(self, worker_id): | ||
del self.workers[worker_id] | ||
|
||
def get_worker_supported_job_types(self, worker_id: str) -> List[str]: | ||
return self.workers[worker_id]["supported_job_types"] | ||
|
||
def get_worker_hardware_limits(self, worker_id: str) -> Dict[str, Any]: | ||
return self.workers[worker_id]["hardware_limits"] | ||
|
||
def find_best_fit_worker(self, job_type: str) -> str: | ||
# Logic to find the best fit worker based on job type and worker limits | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from discord_tron_master.classes.command_processor import CommandProcessor | ||
from discord_tron_master.classes.job import Job | ||
from discord_tron_master.classes.queue_manager import QueueManager | ||
from discord_tron_master.classes.websocket_handler import WebSocketHandler | ||
from discord_tron_master.classes.websocket_server import WebSocketServer | ||
from discord_tron_master.classes.worker import Worker | ||
from discord_tron_master.classes.worker_manager import WorkerManager | ||
import asyncio | ||
|
||
# Initialize the worker manager and add a few workers | ||
worker_manager = WorkerManager() | ||
worker1 = Worker("worker1", ["job_type_1", "job_type_2"]) | ||
worker2 = Worker("worker2", ["job_type_2", "job_type_3"]) | ||
worker_manager.add_worker(worker1) | ||
worker_manager.add_worker(worker2) | ||
|
||
# Initialize the command processor | ||
command_processor = CommandProcessor(worker_manager) | ||
|
||
# Initialize the WebSocket handler | ||
websocket_handler = WebSocketHandler(command_processor) | ||
|
||
# Initialize the WebSocket server | ||
websocket_server = WebSocketServer(websocket_handler) | ||
|
||
# Add some example jobs | ||
job1 = Job("job_type_1", {"data": "Job 1 data"}) | ||
job2 = Job("job_type_2", {"data": "Job 2 data"}) | ||
|
||
# Start the workers and WebSocket server | ||
async def main(): | ||
worker1.start() | ||
worker2.start() | ||
|
||
# Add jobs to the queue | ||
worker1.add_job(job1) | ||
worker2.add_job(job2) | ||
|
||
# Start the WebSocket server | ||
await websocket_server.start() | ||
|
||
# Run the example | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters