From d8cfa99eb18f3a0a05bd69ab81191c23ea6de920 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Tue, 15 Mar 2022 01:20:59 +0300 Subject: [PATCH] messaging --- SprintLib/queue.py | 59 ++++++------------- daemons/management/commands/file_generator.py | 9 +++ daemons/management/commands/receive.py | 52 ++++++---------- 3 files changed, 45 insertions(+), 75 deletions(-) create mode 100644 daemons/management/commands/file_generator.py diff --git a/SprintLib/queue.py b/SprintLib/queue.py index 2a8c8c5..670bcbe 100644 --- a/SprintLib/queue.py +++ b/SprintLib/queue.py @@ -1,8 +1,6 @@ -import json -from enum import Enum, auto -from typing import Union - import pika +from django.core.management import BaseCommand +from pika.adapters.utils.connection_workflow import AMQPConnectorException from Sprint import settings @@ -22,43 +20,24 @@ def send_testing(solution): ) -class Queue(str, Enum): - test = auto() - notification = auto() +class MessagingSupport(BaseCommand): + queue_name = None + def consume(self, ch, method, properties, body): + raise NotImplementedError -class QueueAccessor: - - def publish(self, queue: Union[Queue, str], message: Union[bytes, dict]): - if isinstance(message, dict): - message = json.dumps(message).encode("UTF-8") - if isinstance(queue, str): - queue = Queue(queue) - with pika.BlockingConnection( - pika.ConnectionParameters(host=settings.RABBIT_HOST, port=settings.RABBIT_PORT) - ) as connection: - channel = connection.channel() - channel.queue_declare(queue=queue.name) - channel.basic_publish( - exchange="", - routing_key=queue.name, - body=message, - ) - - -def message_handler(queue: Union[Queue, str]): - if isinstance(queue, str): - queue = Queue(queue) - - def decorator(func): - def new_func(*args, **kwargs): - print("Enter listener for queue", queue) - with pika.BlockingConnection( - pika.ConnectionParameters(host=settings.RABBIT_HOST) - ) as connection: + def handle(self, *args, **options): + if self.queue_name is None: + raise NotImplementedError("Queue name must be declared") + print("start listening " + self.queue_name) + while True: + try: + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=settings.RABBIT_HOST) + ) channel = connection.channel() - channel.queue_declare(queue=queue.name) - channel.basic_consume(queue=queue.name, on_message_callback=func, auto_ack=True) + channel.queue_declare(queue=self.queue_name) + channel.basic_consume(queue=self.queue_name, on_message_callback=self.consume, auto_ack=True) channel.start_consuming() - return new_func - return decorator + except AMQPConnectorException: + print("connection to rabbit failed: reconnecting") diff --git a/daemons/management/commands/file_generator.py b/daemons/management/commands/file_generator.py new file mode 100644 index 0000000..ee55b4f --- /dev/null +++ b/daemons/management/commands/file_generator.py @@ -0,0 +1,9 @@ +from SprintLib.queue import MessagingSupport + + +class Command(MessagingSupport): + help = "starts file generator" + queue_name = "files" + + def consume(self, ch, method, properties, body): + ... \ No newline at end of file diff --git a/daemons/management/commands/receive.py b/daemons/management/commands/receive.py index d1c17ec..d98cd95 100644 --- a/daemons/management/commands/receive.py +++ b/daemons/management/commands/receive.py @@ -1,44 +1,26 @@ from os.path import join, exists from shutil import rmtree -import pika -from django.core.management.base import BaseCommand -from pika.adapters.utils.connection_workflow import AMQPConnectorException - from Main.models import Solution -from Sprint import settings +from SprintLib.queue import MessagingSupport from SprintLib.testers import * -class Command(BaseCommand): +class Command(MessagingSupport): help = "Tests solution" + queue_name = "test" - def handle(self, *args, **options): - print("Enter worker") - while True: - try: - connection = pika.BlockingConnection( - pika.ConnectionParameters(host=settings.RABBIT_HOST) - ) - channel = connection.channel() - channel.queue_declare(queue="test") - - def callback(ch, method, properties, body): - id = int(str(body, encoding="utf-8")) - print(f"Received id {id}") - solution = Solution.objects.get(id=id) - try: - eval(solution.language.work_name + "Tester")(solution).execute() - except Exception as e: - print(e) - solution.result = "TE" - solution.save() - finally: - path = join("solutions", str(id)) - if exists(path): - rmtree(path) - - channel.basic_consume(queue="test", on_message_callback=callback, auto_ack=True) - channel.start_consuming() - except AMQPConnectorException: - print("connection to rabbit failed: reconnecting") + def consume(self, ch, method, properties, body): + id = int(str(body, encoding="utf-8")) + print(f"Received id {id}") + solution = Solution.objects.get(id=id) + try: + eval(solution.language.work_name + "Tester")(solution).execute() + except Exception as e: + print(e) + solution.result = "TE" + solution.save() + finally: + path = join("solutions", str(id)) + if exists(path): + rmtree(path)