97 lines
3.1 KiB
Python
97 lines
3.1 KiB
Python
import json
|
|
from json import JSONDecodeError
|
|
from time import sleep
|
|
|
|
import pika
|
|
from django import db
|
|
from BaseLib.BaseDaemon import BaseDaemon
|
|
from pika.adapters.utils.connection_workflow import AMQPConnectorException
|
|
|
|
from allinvest import settings
|
|
|
|
|
|
def blocking_connection():
|
|
return pika.BlockingConnection(
|
|
pika.ConnectionParameters(
|
|
host=settings.RABBITMQ_HOST,
|
|
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
|
|
)
|
|
)
|
|
|
|
|
|
def send_to_queue(routing_key, payload):
|
|
with blocking_connection() as connection:
|
|
channel = connection.channel()
|
|
queue_name = f"{settings.RABBITMQ_EXCHANGE}_{routing_key}"
|
|
channel.exchange_declare(
|
|
exchange=settings.RABBITMQ_EXCHANGE,
|
|
exchange_type='direct'
|
|
)
|
|
channel.queue_declare(queue_name)
|
|
channel.queue_bind(
|
|
exchange=settings.RABBITMQ_EXCHANGE,
|
|
queue=queue_name,
|
|
routing_key=routing_key
|
|
)
|
|
channel.basic_publish(
|
|
exchange=settings.RABBITMQ_EXCHANGE,
|
|
routing_key=routing_key,
|
|
body=json.dumps(payload).encode("UTF-8")
|
|
)
|
|
|
|
|
|
class MessagingSupport(BaseDaemon):
|
|
queue_name = None
|
|
attempts = 3
|
|
payload = dict()
|
|
|
|
def process(self):
|
|
raise NotImplementedError
|
|
|
|
def consume(self, ch, method, properties, body):
|
|
try:
|
|
data = json.loads(body.decode('utf-8'))
|
|
except JSONDecodeError:
|
|
print("Message is not JSON decodable")
|
|
return
|
|
print(f"Got {data}, processing...")
|
|
finished = False
|
|
|
|
for attempt in range(1, self.attempts + 1):
|
|
print("attempt", attempt)
|
|
try:
|
|
db.close_old_connections()
|
|
self.payload = data
|
|
self.process()
|
|
finished = True
|
|
except Exception as e:
|
|
print("failed with", str(e))
|
|
raise
|
|
sleep(1.5)
|
|
if finished:
|
|
break
|
|
if finished:
|
|
print("Process finished successfully")
|
|
else:
|
|
print("Process failed")
|
|
|
|
def handle(self, *args, **options):
|
|
if self.queue_name is None:
|
|
raise NotImplementedError("Queue name must be declared")
|
|
print("start listening " + self.queue_name)
|
|
while True:
|
|
try:
|
|
with pika.BlockingConnection(
|
|
pika.ConnectionParameters(
|
|
host=settings.RABBITMQ_HOST,
|
|
credentials=pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD)
|
|
)
|
|
) as connection:
|
|
channel = connection.channel()
|
|
queue = f"{settings.RABBITMQ_EXCHANGE}_{self.queue_name}"
|
|
channel.queue_declare(queue=queue)
|
|
channel.basic_consume(queue=queue, on_message_callback=self.consume, auto_ack=True)
|
|
channel.start_consuming()
|
|
except AMQPConnectorException:
|
|
print("connection to rabbit failed: reconnecting")
|