platform/BaseLib/queue.py
Administrator 9e7fc7b4c1 initial
2023-09-23 16:13:11 +03:00

97 lines
3.1 KiB
Python

import json
from json import JSONDecodeError
from time import sleep
import pika
from django import db
from BaseLib.BaseDaemon import BaseDaemon
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from allinvest import settings
def blocking_connection():
return pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBITMQ_HOST,
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
)
)
def send_to_queue(routing_key, payload):
with blocking_connection() as connection:
channel = connection.channel()
queue_name = f"{settings.RABBITMQ_EXCHANGE}_{routing_key}"
channel.exchange_declare(
exchange=settings.RABBITMQ_EXCHANGE,
exchange_type='direct'
)
channel.queue_declare(queue_name)
channel.queue_bind(
exchange=settings.RABBITMQ_EXCHANGE,
queue=queue_name,
routing_key=routing_key
)
channel.basic_publish(
exchange=settings.RABBITMQ_EXCHANGE,
routing_key=routing_key,
body=json.dumps(payload).encode("UTF-8")
)
class MessagingSupport(BaseDaemon):
queue_name = None
attempts = 3
payload = dict()
def process(self):
raise NotImplementedError
def consume(self, ch, method, properties, body):
try:
data = json.loads(body.decode('utf-8'))
except JSONDecodeError:
print("Message is not JSON decodable")
return
print(f"Got {data}, processing...")
finished = False
for attempt in range(1, self.attempts + 1):
print("attempt", attempt)
try:
db.close_old_connections()
self.payload = data
self.process()
finished = True
except Exception as e:
print("failed with", str(e))
raise
sleep(1.5)
if finished:
break
if finished:
print("Process finished successfully")
else:
print("Process failed")
def handle(self, *args, **options):
if self.queue_name is None:
raise NotImplementedError("Queue name must be declared")
print("start listening " + self.queue_name)
while True:
try:
with pika.BlockingConnection(
pika.ConnectionParameters(
host=settings.RABBITMQ_HOST,
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
)
) as connection:
channel = connection.channel()
queue = f"{settings.RABBITMQ_EXCHANGE}_{self.queue_name}"
channel.queue_declare(queue=queue)
channel.basic_consume(queue=queue, on_message_callback=self.consume, auto_ack=True)
channel.start_consuming()
except AMQPConnectorException:
print("connection to rabbit failed: reconnecting")