import json from json import JSONDecodeError from time import sleep import pika from django import db from BaseLib.BaseDaemon import BaseDaemon from pika.adapters.utils.connection_workflow import AMQPConnectorException from allinvest import settings def blocking_connection(): return pika.BlockingConnection( pika.ConnectionParameters( host=settings.RABBITMQ_HOST, credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD) ) ) def send_to_queue(routing_key, payload): with blocking_connection() as connection: channel = connection.channel() queue_name = f"{settings.RABBITMQ_EXCHANGE}_{routing_key}" channel.exchange_declare( exchange=settings.RABBITMQ_EXCHANGE, exchange_type='direct' ) channel.queue_declare(queue_name) channel.queue_bind( exchange=settings.RABBITMQ_EXCHANGE, queue=queue_name, routing_key=routing_key ) channel.basic_publish( exchange=settings.RABBITMQ_EXCHANGE, routing_key=routing_key, body=json.dumps(payload).encode("UTF-8") ) class MessagingSupport(BaseDaemon): queue_name = None attempts = 3 payload = dict() def process(self): raise NotImplementedError def consume(self, ch, method, properties, body): try: data = json.loads(body.decode('utf-8')) except JSONDecodeError: print("Message is not JSON decodable") return print(f"Got {data}, processing...") finished = False for attempt in range(1, self.attempts + 1): print("attempt", attempt) try: db.close_old_connections() self.payload = data self.process() finished = True except Exception as e: print("failed with", str(e)) raise sleep(1.5) if finished: break if finished: print("Process finished successfully") else: print("Process failed") def handle(self, *args, **options): if self.queue_name is None: raise NotImplementedError("Queue name must be declared") print("start listening " + self.queue_name) while True: try: with pika.BlockingConnection( pika.ConnectionParameters( host=settings.RABBITMQ_HOST, credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD) ) ) as connection: channel = connection.channel() queue = f"{settings.RABBITMQ_EXCHANGE}_{self.queue_name}" channel.queue_declare(queue=queue) channel.basic_consume(queue=queue, on_message_callback=self.consume, auto_ack=True) channel.start_consuming() except AMQPConnectorException: print("connection to rabbit failed: reconnecting")