From ea09334ab10cc5362f19115250dabe830e338be0 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Fri, 14 Oct 2022 02:35:27 +0000 Subject: [PATCH] Outbox --- .deploy/deploy-dev.yaml | 53 +++++----- .deploy/deploy-prod.yaml | 54 +++++----- Main/migrations/0039_auto_20221013_1855.py | 34 ++++++ Main/models/__init__.py | 1 + Main/models/outbox_message.py | 17 +++ SprintLib/BaseView.py | 100 +++++++++--------- SprintLib/queue.py | 70 +++++++----- SprintLib/utils.py | 10 +- daemons/management/commands/outbox.py | 45 ++++++++ .../commands/{receive.py => worker.py} | 1 + nginx/nginx.conf | 9 -- requirements.txt | 4 +- 12 files changed, 258 insertions(+), 140 deletions(-) create mode 100644 Main/migrations/0039_auto_20221013_1855.py create mode 100644 Main/models/outbox_message.py create mode 100644 daemons/management/commands/outbox.py rename daemons/management/commands/{receive.py => worker.py} (95%) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index be9a918..037f365 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -41,30 +41,6 @@ services: parallelism: 1 order: start-first - polling: - image: mathwave/sprint-repo:sprint - networks: - - net - environment: - DB_HOST: "pg.develop.sprinthub.ru" - RABBIT_HOST: "rabbitmq.develop.sprinthub.ru" - REDIS_HOST: "redis.develop.sprinthub.ru" - MINIO_HOST: "minio.develop.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV - RABBIT_PASSWORD: $RABBITMQ_PASSWORD_DEV - REDIS_PASSWORD: $REDIS_PASSWORD_DEV - DB_PASSWORD: $DB_PASSWORD_DEV - DEBUG: "true" - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV - command: ./manage.py runserver 0.0.0.0:7998 --noreload - deploy: - mode: replicated - restart_policy: - condition: any - update_config: - parallelism: 1 - order: start-first - bot: image: mathwave/sprint-repo:sprint networks: @@ -89,6 +65,31 @@ services: parallelism: 1 order: stop-first + outbox: + image: mathwave/sprint-repo:sprint + networks: + - net + command: ./manage.py outbox + environment: + DB_HOST: "pg.develop.sprinthub.ru" + RABBIT_HOST: "rabbitmq.develop.sprinthub.ru" + REDIS_HOST: "redis.develop.sprinthub.ru" + MINIO_HOST: "minio.develop.sprinthub.ru" + MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV + RABBIT_PASSWORD: $RABBITMQ_PASSWORD_DEV + REDIS_PASSWORD: $REDIS_PASSWORD_DEV + DB_PASSWORD: $DB_PASSWORD_DEV + DEBUG: "true" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV + deploy: + mode: replicated + replicas: 1 + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + checker_cleaner: image: mathwave/sprint-repo:sprint networks: @@ -117,7 +118,7 @@ services: image: mathwave/sprint-repo:sprint networks: - net - command: ./manage.py receive + command: ./manage.py worker environment: DB_HOST: "pg.develop.sprinthub.ru" RABBIT_HOST: "rabbitmq.develop.sprinthub.ru" @@ -134,7 +135,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock deploy: mode: replicated - replicas: 2 + replicas: 1 restart_policy: condition: any update_config: diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 7c2a73f..3dc4b2d 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -41,32 +41,6 @@ services: parallelism: 1 order: start-first - polling: - image: mathwave/sprint-repo:sprint - networks: - - net - environment: - DB_HOST: "pg.sprinthub.ru" - RABBIT_HOST: "rabbitmq.sprinthub.ru" - REDIS_HOST: "redis.sprinthub.ru" - MINIO_HOST: "minio.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD - RABBIT_PASSWORD: $RABBITMQ_PASSWORD_PROD - REDIS_PASSWORD: $REDIS_PASSWORD_PROD - DB_PASSWORD: $DB_PASSWORD_PROD - DEBUG: "false" - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD - SENTRY_TOKEN: $SENTRY_TOKEN - command: ./manage.py runserver 0.0.0.0:7998 --noreload - deploy: - mode: replicated - replicas: 1 - restart_policy: - condition: any - update_config: - parallelism: 1 - order: start-first - bot: image: mathwave/sprint-repo:sprint networks: @@ -93,6 +67,32 @@ services: parallelism: 1 order: stop-first + outbox: + image: mathwave/sprint-repo:sprint + networks: + - net + environment: + DB_HOST: "pg.sprinthub.ru" + RABBIT_HOST: "rabbitmq.sprinthub.ru" + REDIS_HOST: "redis.sprinthub.ru" + MINIO_HOST: "minio.sprinthub.ru" + MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD + RABBIT_PASSWORD: $RABBITMQ_PASSWORD_PROD + REDIS_PASSWORD: $REDIS_PASSWORD_PROD + DB_PASSWORD: $DB_PASSWORD_PROD + DEBUG: "false" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD + SENTRY_TOKEN: $SENTRY_TOKEN + command: ./manage.py outbox + deploy: + mode: replicated + replicas: 1 + restart_policy: + condition: any + update_config: + parallelism: 1 + order: stop-first + checker_cleaner: image: mathwave/sprint-repo:sprint networks: @@ -122,7 +122,7 @@ services: image: mathwave/sprint-repo:sprint networks: - net - command: ./manage.py receive + command: ./manage.py worker environment: DB_HOST: "pg.sprinthub.ru" RABBIT_HOST: "rabbitmq.sprinthub.ru" diff --git a/Main/migrations/0039_auto_20221013_1855.py b/Main/migrations/0039_auto_20221013_1855.py new file mode 100644 index 0000000..faa2dc7 --- /dev/null +++ b/Main/migrations/0039_auto_20221013_1855.py @@ -0,0 +1,34 @@ +# Generated by Django 3.2.4 on 2022-10-13 15:55 + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('Main', '0038_auto_20220516_1836'), + ] + + operations = [ + migrations.CreateModel( + name='OutboxMessage', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('queue', models.TextField()), + ('body', models.JSONField()), + ('time_created', models.DateTimeField(default=django.utils.timezone.now)), + ('time_sent', models.DateTimeField(default=None, null=True)), + ('time_received', models.DateTimeField(default=None, null=True)), + ('time_processed', models.DateTimeField(default=None, null=True)), + ], + ), + migrations.AddIndex( + model_name='outboxmessage', + index=models.Index(fields=['time_sent'], name='Main_outbox_time_se_4fc52a_idx'), + ), + migrations.AddIndex( + model_name='outboxmessage', + index=models.Index(fields=['time_sent', 'time_processed'], name='Main_outbox_time_se_edcbac_idx'), + ), + ] diff --git a/Main/models/__init__.py b/Main/models/__init__.py index bc4b339..9caa974 100644 --- a/Main/models/__init__.py +++ b/Main/models/__init__.py @@ -11,3 +11,4 @@ from Main.models.solution_file import SolutionFile from Main.models.friendship import Friendship from Main.models.language_apply import LanguageApply from Main.models.dump import Dump +from Main.models.outbox_message import OutboxMessage diff --git a/Main/models/outbox_message.py b/Main/models/outbox_message.py new file mode 100644 index 0000000..19d74ea --- /dev/null +++ b/Main/models/outbox_message.py @@ -0,0 +1,17 @@ +from django.db import models +from django.utils import timezone + + +class OutboxMessage(models.Model): + queue = models.TextField() + body = models.JSONField() + time_created = models.DateTimeField(default=timezone.now) + time_sent = models.DateTimeField(null=True, default=None) + time_received = models.DateTimeField(null=True, default=None) + time_processed = models.DateTimeField(null=True, default=None) + + class Meta: + indexes = [ + models.Index(fields=['time_sent']), + models.Index(fields=['time_sent', 'time_processed']) + ] diff --git a/SprintLib/BaseView.py b/SprintLib/BaseView.py index b33b78b..bcab092 100644 --- a/SprintLib/BaseView.py +++ b/SprintLib/BaseView.py @@ -1,6 +1,7 @@ from typing import Optional from django.core.handlers.wsgi import WSGIRequest +from django.db import transaction from django.http import HttpResponseRedirect, JsonResponse from django.shortcuts import render from django.utils import timezone @@ -27,56 +28,57 @@ class BaseView: def as_view(cls): @csrf_exempt def execute(request): - if request.user.is_authenticated: - user_info = request.user.userinfo - user_info.last_request = timezone.now() - user_info.save() - c = cls(request) - if c.required_login is not None: - if c.required_login and not request.user.is_authenticated: - return HttpResponseRedirect("/enter") - if c.required_login and not request.user.userinfo.verified: - return HttpResponseRedirect("/set_username") - if ( - not c.required_login - and request.user.is_authenticated - and request.user.userinfo.verified - ): + with transaction.atomic(): + if request.user.is_authenticated: + user_info = request.user.userinfo + user_info.last_request = timezone.now() + user_info.save() + c = cls(request) + if c.required_login is not None: + if c.required_login and not request.user.is_authenticated: + return HttpResponseRedirect("/enter") + if c.required_login and not request.user.userinfo.verified: + return HttpResponseRedirect("/set_username") + if ( + not c.required_login + and request.user.is_authenticated + and request.user.userinfo.verified + ): + return HttpResponseRedirect("/") + request_method = request.method.lower() + exec("from Main.models import *") + context = {} + for key in request.GET.keys(): + if key.endswith("_id") and key not in cls.fields_except: + model_name = key.rstrip("_id") + setattr( + c, + model_name, + eval(model_name[0].upper() + model_name[1:]).objects.get( + id=int(request.GET[key]) + ), + ) + context[model_name] = getattr(c, model_name) + if "action" in request.POST.keys(): + request_method += "_" + request.POST["action"] + method = getattr(c, request_method, None) + try: + data = c.pre_handle() + if method: + if data is None: + data = method() + if type(data) == str: + return HttpResponseRedirect(data) + if type(data) == dict: + return JsonResponse(data) + if data is not None: + return data + context = {**context, **c.context} + res = render(request, c.view_file, context) + res.headers["X-Frame-Options"] = "ALLOW" + return res + except AccessError: return HttpResponseRedirect("/") - request_method = request.method.lower() - exec("from Main.models import *") - context = {} - for key in request.GET.keys(): - if key.endswith("_id") and key not in cls.fields_except: - model_name = key.rstrip("_id") - setattr( - c, - model_name, - eval(model_name[0].upper() + model_name[1:]).objects.get( - id=int(request.GET[key]) - ), - ) - context[model_name] = getattr(c, model_name) - if "action" in request.POST.keys(): - request_method += "_" + request.POST["action"] - method = getattr(c, request_method, None) - try: - data = c.pre_handle() - if method: - if data is None: - data = method() - if type(data) == str: - return HttpResponseRedirect(data) - if type(data) == dict: - return JsonResponse(data) - if data is not None: - return data - context = {**context, **c.context} - res = render(request, c.view_file, context) - res.headers["X-Frame-Options"] = "ALLOW" - return res - except AccessError: - return HttpResponseRedirect("/") return execute diff --git a/SprintLib/queue.py b/SprintLib/queue.py index c1f1afa..8dc5d26 100644 --- a/SprintLib/queue.py +++ b/SprintLib/queue.py @@ -1,48 +1,70 @@ import json +from json import JSONDecodeError -import django.db -import django.db.utils import pika -import psycopg2 +from django.db import transaction +from django.core.exceptions import ObjectDoesNotExist from django.core.management import BaseCommand +from django.utils import timezone from pika.adapters.utils.connection_workflow import AMQPConnectorException +from Main.models.outbox_message import OutboxMessage from Sprint import settings def send_to_queue(queue_name, payload): - with pika.BlockingConnection( - pika.ConnectionParameters( - host=settings.RABBIT_HOST, - port=settings.RABBIT_PORT, - credentials=pika.PlainCredentials('guest', settings.RABBIT_PASSWORD) - ) - ) as connection: - channel = connection.channel() - channel.queue_declare(queue=queue_name) - channel.basic_publish( - exchange="", - routing_key=queue_name, - body=json.dumps(payload).encode('utf-8'), - ) + OutboxMessage.objects.create( + queue=queue_name, + body=payload + ) + + +class RollbackException(Exception): + ... class MessagingSupport(BaseCommand): queue_name = None + with_transaction = True def process(self, payload: dict): raise NotImplementedError def consume(self, ch, method, properties, body): - data = json.loads(body.decode('utf-8')) + try: + data = json.loads(body.decode('utf-8')) + except JSONDecodeError: + print("Message is not JSON decodable") + return print(f"Got {data}, processing...") try: - self.process(data) - print("Process finished successfully") - except (psycopg2.OperationalError, django.db.OperationalError, django.db.utils.OperationalError): - print("Failed to connect to database, restarting...") - send_to_queue(self.queue_name, data) - raise + outbox_message = OutboxMessage.objects.get(id=data["id"]) + except ObjectDoesNotExist: + print(f"Message with id {data['id']} does not exist") + return + if outbox_message.time_processed is not None: + print("Message is already processed") + return + outbox_message.time_received = timezone.now() + outbox_message.save() + try: + if self.with_transaction: + with transaction.atomic(): + self.process(data["body"]) + outbox_message.refresh_from_db() + if outbox_message.time_processed is not None: + print("Message already processed, rolling back") + raise RollbackException("Just for rolling back") + else: + outbox_message.time_processed = timezone.now() + outbox_message.save() + else: + self.process(data["body"]) + outbox_message.time_processed = timezone.now() + outbox_message.save() + except RollbackException: + pass + print("Process finished successfully") def handle(self, *args, **options): if self.queue_name is None: diff --git a/SprintLib/utils.py b/SprintLib/utils.py index be4343e..9c2a703 100644 --- a/SprintLib/utils.py +++ b/SprintLib/utils.py @@ -4,7 +4,7 @@ from random import choice from time import sleep from django.core.management import BaseCommand -from minio import Minio +from minio import Minio, S3Error from Sprint import settings from SprintLib.queue import send_to_queue @@ -23,8 +23,12 @@ client = Minio( @lock('write_bytes') def write_bytes(data: bytes): - obj = client.get_object(BUCKET_NAME, 'meta.txt') - num = int(obj.data.decode('utf-8')) + 1 + try: + obj = client.get_object(BUCKET_NAME, 'meta.txt') + num = int(obj.data.decode('utf-8')) + 1 + except S3Error: + client.put_object(BUCKET_NAME, 'meta.txt', io.BytesIO(b"1"), 1) + num = 1 b_num = str(num).encode('utf-8') client.put_object(BUCKET_NAME, str(num), io.BytesIO(data), len(data)) client.put_object(BUCKET_NAME, 'meta.txt', io.BytesIO(b_num), len(b_num)) diff --git a/daemons/management/commands/outbox.py b/daemons/management/commands/outbox.py new file mode 100644 index 0000000..b9b6eeb --- /dev/null +++ b/daemons/management/commands/outbox.py @@ -0,0 +1,45 @@ +import datetime +import json + +import pika +from django.db import transaction + +from django.db.models import Q +from django.utils import timezone + +from Main.models.outbox_message import OutboxMessage +from Sprint import settings +from SprintLib.utils import LoopWorker + + +class Command(LoopWorker): + sleep_period = 0.1 + + def send_message(self, message): + with pika.BlockingConnection( + pika.ConnectionParameters( + host=settings.RABBIT_HOST, + port=settings.RABBIT_PORT, + credentials=pika.PlainCredentials("guest", settings.RABBIT_PASSWORD), + ) + ) as connection: + channel = connection.channel() + channel.queue_declare(queue=message.queue) + channel.basic_publish( + exchange="", + routing_key=message.queue, + body=json.dumps({"id": message.id, "body": message.body}).encode("utf-8"), + ) + + def go(self): + messages = OutboxMessage.objects.filter( + Q(time_sent__isnull=True) | + Q(time_sent__lte=(timezone.now() - datetime.timedelta(minutes=5)), time_processed__isnull=True) + ).order_by( + "time_created" + ) + for message in messages: + with transaction.atomic(): + message.time_sent = timezone.now() + message.save() + self.send_message(message) diff --git a/daemons/management/commands/receive.py b/daemons/management/commands/worker.py similarity index 95% rename from daemons/management/commands/receive.py rename to daemons/management/commands/worker.py index 20aa437..9c46175 100644 --- a/daemons/management/commands/receive.py +++ b/daemons/management/commands/worker.py @@ -6,6 +6,7 @@ from SprintLib.testers import * class Command(MessagingSupport): help = "Tests solution" queue_name = "test" + with_transaction = False def process(self, payload: dict): id = payload['id'] diff --git a/nginx/nginx.conf b/nginx/nginx.conf index a1f9cc4..cc1d039 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -3,15 +3,6 @@ events {} http { server { listen 1235; - - location /checker/ { - proxy_pass http://polling:7998/checker/; - } - - location /polling/ { - proxy_pass http://polling:7998/polling/; - } - location / { proxy_pass http://web:8000/; } diff --git a/requirements.txt b/requirements.txt index 8245000..e3faaaf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,8 +20,8 @@ minio==7.1.11 pika==1.2.0 Pillow==8.3.1 prompt-toolkit==3.0.18 -psycopg2==2.9.1 -psycopg2-binary==2.9.1 +psycopg2==2.9.3 +psycopg2-binary==2.9.3 pyTelegramBotAPI==4.1.1 python-dateutil==2.8.1 pytz==2021.1