sprint/SprintLib/queue.py
2022-10-14 02:35:27 +00:00

87 lines
3.0 KiB
Python

import json
from json import JSONDecodeError
import pika
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import BaseCommand
from django.utils import timezone
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from Main.models.outbox_message import OutboxMessage
from Sprint import settings
def send_to_queue(queue_name, payload):
OutboxMessage.objects.create(
queue=queue_name,
body=payload
)
class RollbackException(Exception):
...
class MessagingSupport(BaseCommand):
queue_name = None
with_transaction = True
def process(self, payload: dict):
raise NotImplementedError
def consume(self, ch, method, properties, body):
try:
data = json.loads(body.decode('utf-8'))
except JSONDecodeError:
print("Message is not JSON decodable")
return
print(f"Got {data}, processing...")
try:
outbox_message = OutboxMessage.objects.get(id=data["id"])
except ObjectDoesNotExist:
print(f"Message with id {data['id']} does not exist")
return
if outbox_message.time_processed is not None:
print("Message is already processed")
return
outbox_message.time_received = timezone.now()
outbox_message.save()
try:
if self.with_transaction:
with transaction.atomic():
self.process(data["body"])
outbox_message.refresh_from_db()
if outbox_message.time_processed is not None:
print("Message already processed, rolling back")
raise RollbackException("Just for rolling back")
else:
outbox_message.time_processed = timezone.now()
outbox_message.save()
else:
self.process(data["body"])
outbox_message.time_processed = timezone.now()
outbox_message.save()
except RollbackException:
pass
print("Process finished successfully")
def handle(self, *args, **options):
if self.queue_name is None:
raise NotImplementedError("Queue name must be declared")
print("start listening " + self.queue_name)
while True:
try:
with pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBIT_HOST,
credentials=pika.PlainCredentials('guest', settings.RABBIT_PASSWORD)
)
) as connection:
channel = connection.channel()
channel.queue_declare(queue=self.queue_name)
channel.basic_consume(queue=self.queue_name, on_message_callback=self.consume, auto_ack=True)
channel.start_consuming()
except AMQPConnectorException:
print("connection to rabbit failed: reconnecting")