sprint/SprintLib/queue.py
2024-08-18 17:43:34 +03:00

89 lines
3.1 KiB
Python

import json
from json import JSONDecodeError
import pika
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import BaseCommand
from django.utils import timezone
from django import db
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from Main.models.outbox_message import OutboxMessage
from Sprint import settings
def send_to_queue(queue_name, payload):
OutboxMessage.objects.create(
queue=queue_name,
body=payload
)
class RollbackException(Exception):
...
class MessagingSupport(BaseCommand):
queue_name = None
with_transaction = True
def process(self, payload: dict):
raise NotImplementedError
def consume(self, ch, method, properties, body):
try:
data = json.loads(body.decode('utf-8'))
except JSONDecodeError:
print("Message is not JSON decodable")
return
print(f"Got {data}, processing...")
db.close_old_connections()
try:
outbox_message = OutboxMessage.objects.get(id=data["id"])
except ObjectDoesNotExist:
print(f"Message with id {data['id']} does not exist")
return
if outbox_message.time_processed is not None:
print("Message is already processed")
return
outbox_message.time_received = timezone.now()
outbox_message.save()
try:
if self.with_transaction:
with transaction.atomic():
self.process(data["body"])
outbox_message.refresh_from_db()
if outbox_message.time_processed is not None:
print("Message already processed, rolling back")
raise RollbackException("Just for rolling back")
else:
outbox_message.time_processed = timezone.now()
outbox_message.save()
else:
self.process(data["body"])
outbox_message.time_processed = timezone.now()
outbox_message.save()
except RollbackException:
pass
print("Process finished successfully")
def handle(self, *args, **options):
if self.queue_name is None:
raise NotImplementedError("Queue name must be declared")
print("start listening " + self.queue_name)
while True:
try:
with pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBIT_HOST,
credentials=pika.PlainCredentials('guest', settings.RABBIT_PASSWORD)
)
) as connection:
channel = connection.channel()
channel.queue_declare(queue=self.queue_name)
channel.basic_consume(queue=self.queue_name, on_message_callback=self.consume, auto_ack=True)
channel.start_consuming()
except AMQPConnectorException:
print("connection to rabbit failed: reconnecting")