import json from json import JSONDecodeError import pika from django.db import transaction from django.core.exceptions import ObjectDoesNotExist from django.core.management import BaseCommand from django.utils import timezone from django import db from pika.adapters.utils.connection_workflow import AMQPConnectorException from Main.models.outbox_message import OutboxMessage from Sprint import settings def send_to_queue(queue_name, payload): OutboxMessage.objects.create( queue=queue_name, body=payload ) class RollbackException(Exception): ... class MessagingSupport(BaseCommand): queue_name = None with_transaction = True def process(self, payload: dict): raise NotImplementedError def consume(self, ch, method, properties, body): try: data = json.loads(body.decode('utf-8')) except JSONDecodeError: print("Message is not JSON decodable") return print(f"Got {data}, processing...") db.close_old_connections() try: outbox_message = OutboxMessage.objects.get(id=data["id"]) except ObjectDoesNotExist: print(f"Message with id {data['id']} does not exist") return if outbox_message.time_processed is not None: print("Message is already processed") return outbox_message.time_received = timezone.now() outbox_message.save() try: if self.with_transaction: with transaction.atomic(): self.process(data["body"]) outbox_message.refresh_from_db() if outbox_message.time_processed is not None: print("Message already processed, rolling back") raise RollbackException("Just for rolling back") else: outbox_message.time_processed = timezone.now() outbox_message.save() else: self.process(data["body"]) outbox_message.time_processed = timezone.now() outbox_message.save() except RollbackException: pass print("Process finished successfully") def handle(self, *args, **options): if self.queue_name is None: raise NotImplementedError("Queue name must be declared") print("start listening " + self.queue_name) while True: try: with pika.BlockingConnection( pika.ConnectionParameters( host=settings.RABBIT_HOST, credentials=pika.PlainCredentials('guest', settings.RABBIT_PASSWORD) ) ) as connection: channel = connection.channel() channel.queue_declare(queue=self.queue_name) channel.basic_consume(queue=self.queue_name, on_message_callback=self.consume, auto_ack=True) channel.start_consuming() except AMQPConnectorException: print("connection to rabbit failed: reconnecting")