Merge branch 'outbox' into 'master'

Outbox

See merge request self/Sprint!2
This commit is contained in:
Egor Matveev 2022-10-14 02:35:27 +00:00
commit 6e75621dfb
12 changed files with 258 additions and 140 deletions

View File

@ -41,30 +41,6 @@ services:
parallelism: 1
order: start-first
polling:
image: mathwave/sprint-repo:sprint
networks:
- net
environment:
DB_HOST: "pg.develop.sprinthub.ru"
RABBIT_HOST: "rabbitmq.develop.sprinthub.ru"
REDIS_HOST: "redis.develop.sprinthub.ru"
MINIO_HOST: "minio.develop.sprinthub.ru"
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV
RABBIT_PASSWORD: $RABBITMQ_PASSWORD_DEV
REDIS_PASSWORD: $REDIS_PASSWORD_DEV
DB_PASSWORD: $DB_PASSWORD_DEV
DEBUG: "true"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
command: ./manage.py runserver 0.0.0.0:7998 --noreload
deploy:
mode: replicated
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
bot:
image: mathwave/sprint-repo:sprint
networks:
@ -89,6 +65,31 @@ services:
parallelism: 1
order: stop-first
outbox:
image: mathwave/sprint-repo:sprint
networks:
- net
command: ./manage.py outbox
environment:
DB_HOST: "pg.develop.sprinthub.ru"
RABBIT_HOST: "rabbitmq.develop.sprinthub.ru"
REDIS_HOST: "redis.develop.sprinthub.ru"
MINIO_HOST: "minio.develop.sprinthub.ru"
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV
RABBIT_PASSWORD: $RABBITMQ_PASSWORD_DEV
REDIS_PASSWORD: $REDIS_PASSWORD_DEV
DB_PASSWORD: $DB_PASSWORD_DEV
DEBUG: "true"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
deploy:
mode: replicated
replicas: 1
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
checker_cleaner:
image: mathwave/sprint-repo:sprint
networks:
@ -117,7 +118,7 @@ services:
image: mathwave/sprint-repo:sprint
networks:
- net
command: ./manage.py receive
command: ./manage.py worker
environment:
DB_HOST: "pg.develop.sprinthub.ru"
RABBIT_HOST: "rabbitmq.develop.sprinthub.ru"
@ -134,7 +135,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
deploy:
mode: replicated
replicas: 2
replicas: 1
restart_policy:
condition: any
update_config:

View File

@ -41,32 +41,6 @@ services:
parallelism: 1
order: start-first
polling:
image: mathwave/sprint-repo:sprint
networks:
- net
environment:
DB_HOST: "pg.sprinthub.ru"
RABBIT_HOST: "rabbitmq.sprinthub.ru"
REDIS_HOST: "redis.sprinthub.ru"
MINIO_HOST: "minio.sprinthub.ru"
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD
RABBIT_PASSWORD: $RABBITMQ_PASSWORD_PROD
REDIS_PASSWORD: $REDIS_PASSWORD_PROD
DB_PASSWORD: $DB_PASSWORD_PROD
DEBUG: "false"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
SENTRY_TOKEN: $SENTRY_TOKEN
command: ./manage.py runserver 0.0.0.0:7998 --noreload
deploy:
mode: replicated
replicas: 1
restart_policy:
condition: any
update_config:
parallelism: 1
order: start-first
bot:
image: mathwave/sprint-repo:sprint
networks:
@ -93,6 +67,32 @@ services:
parallelism: 1
order: stop-first
outbox:
image: mathwave/sprint-repo:sprint
networks:
- net
environment:
DB_HOST: "pg.sprinthub.ru"
RABBIT_HOST: "rabbitmq.sprinthub.ru"
REDIS_HOST: "redis.sprinthub.ru"
MINIO_HOST: "minio.sprinthub.ru"
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD
RABBIT_PASSWORD: $RABBITMQ_PASSWORD_PROD
REDIS_PASSWORD: $REDIS_PASSWORD_PROD
DB_PASSWORD: $DB_PASSWORD_PROD
DEBUG: "false"
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
SENTRY_TOKEN: $SENTRY_TOKEN
command: ./manage.py outbox
deploy:
mode: replicated
replicas: 1
restart_policy:
condition: any
update_config:
parallelism: 1
order: stop-first
checker_cleaner:
image: mathwave/sprint-repo:sprint
networks:
@ -122,7 +122,7 @@ services:
image: mathwave/sprint-repo:sprint
networks:
- net
command: ./manage.py receive
command: ./manage.py worker
environment:
DB_HOST: "pg.sprinthub.ru"
RABBIT_HOST: "rabbitmq.sprinthub.ru"

View File

@ -0,0 +1,34 @@
# Generated by Django 3.2.4 on 2022-10-13 15:55
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('Main', '0038_auto_20220516_1836'),
]
operations = [
migrations.CreateModel(
name='OutboxMessage',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('queue', models.TextField()),
('body', models.JSONField()),
('time_created', models.DateTimeField(default=django.utils.timezone.now)),
('time_sent', models.DateTimeField(default=None, null=True)),
('time_received', models.DateTimeField(default=None, null=True)),
('time_processed', models.DateTimeField(default=None, null=True)),
],
),
migrations.AddIndex(
model_name='outboxmessage',
index=models.Index(fields=['time_sent'], name='Main_outbox_time_se_4fc52a_idx'),
),
migrations.AddIndex(
model_name='outboxmessage',
index=models.Index(fields=['time_sent', 'time_processed'], name='Main_outbox_time_se_edcbac_idx'),
),
]

View File

@ -11,3 +11,4 @@ from Main.models.solution_file import SolutionFile
from Main.models.friendship import Friendship
from Main.models.language_apply import LanguageApply
from Main.models.dump import Dump
from Main.models.outbox_message import OutboxMessage

View File

@ -0,0 +1,17 @@
from django.db import models
from django.utils import timezone
class OutboxMessage(models.Model):
queue = models.TextField()
body = models.JSONField()
time_created = models.DateTimeField(default=timezone.now)
time_sent = models.DateTimeField(null=True, default=None)
time_received = models.DateTimeField(null=True, default=None)
time_processed = models.DateTimeField(null=True, default=None)
class Meta:
indexes = [
models.Index(fields=['time_sent']),
models.Index(fields=['time_sent', 'time_processed'])
]

View File

@ -1,6 +1,7 @@
from typing import Optional
from django.core.handlers.wsgi import WSGIRequest
from django.db import transaction
from django.http import HttpResponseRedirect, JsonResponse
from django.shortcuts import render
from django.utils import timezone
@ -27,56 +28,57 @@ class BaseView:
def as_view(cls):
@csrf_exempt
def execute(request):
if request.user.is_authenticated:
user_info = request.user.userinfo
user_info.last_request = timezone.now()
user_info.save()
c = cls(request)
if c.required_login is not None:
if c.required_login and not request.user.is_authenticated:
return HttpResponseRedirect("/enter")
if c.required_login and not request.user.userinfo.verified:
return HttpResponseRedirect("/set_username")
if (
not c.required_login
and request.user.is_authenticated
and request.user.userinfo.verified
):
with transaction.atomic():
if request.user.is_authenticated:
user_info = request.user.userinfo
user_info.last_request = timezone.now()
user_info.save()
c = cls(request)
if c.required_login is not None:
if c.required_login and not request.user.is_authenticated:
return HttpResponseRedirect("/enter")
if c.required_login and not request.user.userinfo.verified:
return HttpResponseRedirect("/set_username")
if (
not c.required_login
and request.user.is_authenticated
and request.user.userinfo.verified
):
return HttpResponseRedirect("/")
request_method = request.method.lower()
exec("from Main.models import *")
context = {}
for key in request.GET.keys():
if key.endswith("_id") and key not in cls.fields_except:
model_name = key.rstrip("_id")
setattr(
c,
model_name,
eval(model_name[0].upper() + model_name[1:]).objects.get(
id=int(request.GET[key])
),
)
context[model_name] = getattr(c, model_name)
if "action" in request.POST.keys():
request_method += "_" + request.POST["action"]
method = getattr(c, request_method, None)
try:
data = c.pre_handle()
if method:
if data is None:
data = method()
if type(data) == str:
return HttpResponseRedirect(data)
if type(data) == dict:
return JsonResponse(data)
if data is not None:
return data
context = {**context, **c.context}
res = render(request, c.view_file, context)
res.headers["X-Frame-Options"] = "ALLOW"
return res
except AccessError:
return HttpResponseRedirect("/")
request_method = request.method.lower()
exec("from Main.models import *")
context = {}
for key in request.GET.keys():
if key.endswith("_id") and key not in cls.fields_except:
model_name = key.rstrip("_id")
setattr(
c,
model_name,
eval(model_name[0].upper() + model_name[1:]).objects.get(
id=int(request.GET[key])
),
)
context[model_name] = getattr(c, model_name)
if "action" in request.POST.keys():
request_method += "_" + request.POST["action"]
method = getattr(c, request_method, None)
try:
data = c.pre_handle()
if method:
if data is None:
data = method()
if type(data) == str:
return HttpResponseRedirect(data)
if type(data) == dict:
return JsonResponse(data)
if data is not None:
return data
context = {**context, **c.context}
res = render(request, c.view_file, context)
res.headers["X-Frame-Options"] = "ALLOW"
return res
except AccessError:
return HttpResponseRedirect("/")
return execute

View File

@ -1,48 +1,70 @@
import json
from json import JSONDecodeError
import django.db
import django.db.utils
import pika
import psycopg2
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import BaseCommand
from django.utils import timezone
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from Main.models.outbox_message import OutboxMessage
from Sprint import settings
def send_to_queue(queue_name, payload):
with pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBIT_HOST,
port=settings.RABBIT_PORT,
credentials=pika.PlainCredentials('guest', settings.RABBIT_PASSWORD)
)
) as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(
exchange="",
routing_key=queue_name,
body=json.dumps(payload).encode('utf-8'),
)
OutboxMessage.objects.create(
queue=queue_name,
body=payload
)
class RollbackException(Exception):
...
class MessagingSupport(BaseCommand):
queue_name = None
with_transaction = True
def process(self, payload: dict):
raise NotImplementedError
def consume(self, ch, method, properties, body):
data = json.loads(body.decode('utf-8'))
try:
data = json.loads(body.decode('utf-8'))
except JSONDecodeError:
print("Message is not JSON decodable")
return
print(f"Got {data}, processing...")
try:
self.process(data)
print("Process finished successfully")
except (psycopg2.OperationalError, django.db.OperationalError, django.db.utils.OperationalError):
print("Failed to connect to database, restarting...")
send_to_queue(self.queue_name, data)
raise
outbox_message = OutboxMessage.objects.get(id=data["id"])
except ObjectDoesNotExist:
print(f"Message with id {data['id']} does not exist")
return
if outbox_message.time_processed is not None:
print("Message is already processed")
return
outbox_message.time_received = timezone.now()
outbox_message.save()
try:
if self.with_transaction:
with transaction.atomic():
self.process(data["body"])
outbox_message.refresh_from_db()
if outbox_message.time_processed is not None:
print("Message already processed, rolling back")
raise RollbackException("Just for rolling back")
else:
outbox_message.time_processed = timezone.now()
outbox_message.save()
else:
self.process(data["body"])
outbox_message.time_processed = timezone.now()
outbox_message.save()
except RollbackException:
pass
print("Process finished successfully")
def handle(self, *args, **options):
if self.queue_name is None:

View File

@ -4,7 +4,7 @@ from random import choice
from time import sleep
from django.core.management import BaseCommand
from minio import Minio
from minio import Minio, S3Error
from Sprint import settings
from SprintLib.queue import send_to_queue
@ -23,8 +23,12 @@ client = Minio(
@lock('write_bytes')
def write_bytes(data: bytes):
obj = client.get_object(BUCKET_NAME, 'meta.txt')
num = int(obj.data.decode('utf-8')) + 1
try:
obj = client.get_object(BUCKET_NAME, 'meta.txt')
num = int(obj.data.decode('utf-8')) + 1
except S3Error:
client.put_object(BUCKET_NAME, 'meta.txt', io.BytesIO(b"1"), 1)
num = 1
b_num = str(num).encode('utf-8')
client.put_object(BUCKET_NAME, str(num), io.BytesIO(data), len(data))
client.put_object(BUCKET_NAME, 'meta.txt', io.BytesIO(b_num), len(b_num))

View File

@ -0,0 +1,45 @@
import datetime
import json
import pika
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from Main.models.outbox_message import OutboxMessage
from Sprint import settings
from SprintLib.utils import LoopWorker
class Command(LoopWorker):
sleep_period = 0.1
def send_message(self, message):
with pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBIT_HOST,
port=settings.RABBIT_PORT,
credentials=pika.PlainCredentials("guest", settings.RABBIT_PASSWORD),
)
) as connection:
channel = connection.channel()
channel.queue_declare(queue=message.queue)
channel.basic_publish(
exchange="",
routing_key=message.queue,
body=json.dumps({"id": message.id, "body": message.body}).encode("utf-8"),
)
def go(self):
messages = OutboxMessage.objects.filter(
Q(time_sent__isnull=True) |
Q(time_sent__lte=(timezone.now() - datetime.timedelta(minutes=5)), time_processed__isnull=True)
).order_by(
"time_created"
)
for message in messages:
with transaction.atomic():
message.time_sent = timezone.now()
message.save()
self.send_message(message)

View File

@ -6,6 +6,7 @@ from SprintLib.testers import *
class Command(MessagingSupport):
help = "Tests solution"
queue_name = "test"
with_transaction = False
def process(self, payload: dict):
id = payload['id']

View File

@ -3,15 +3,6 @@ events {}
http {
server {
listen 1235;
location /checker/ {
proxy_pass http://polling:7998/checker/;
}
location /polling/ {
proxy_pass http://polling:7998/polling/;
}
location / {
proxy_pass http://web:8000/;
}

View File

@ -20,8 +20,8 @@ minio==7.1.11
pika==1.2.0
Pillow==8.3.1
prompt-toolkit==3.0.18
psycopg2==2.9.1
psycopg2-binary==2.9.1
psycopg2==2.9.3
psycopg2-binary==2.9.3
pyTelegramBotAPI==4.1.1
python-dateutil==2.8.1
pytz==2021.1