master #28

Merged
emmatveev merged 2 commits from master into dev 2024-12-08 11:22:29 +03:00
10 changed files with 81 additions and 65 deletions

2
.gitignore vendored
View File

@ -119,3 +119,5 @@ GitHub.sublime-settings
.history .history
local_platform.json local_platform.json
*pb2*

View File

@ -4,5 +4,6 @@ WORKDIR /usr/src/app
COPY requirements.txt requirements.txt COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
COPY . . COPY . .
RUN make gen
ENV PYTHONUNBUFFERED 1 ENV PYTHONUNBUFFERED 1
ENTRYPOINT ["python", "main.py"] ENTRYPOINT ["python", "main.py"]

2
Makefile Normal file
View File

@ -0,0 +1,2 @@
gen:
python -m grpc_tools.protoc --proto_path schemas --python_out=. --pyi_out=. --grpc_python_out=. ./schemas/tasks.proto

View File

@ -1,3 +1,18 @@
import os
import grpc
import tasks_pb2_grpc
stage = os.getenv("STAGE", 'local')
if stage == 'local':
QUEUES_URL = 'localhost:50051'
else:
QUEUES_URL = 'queues-grpc:50051'
class Daemon: class Daemon:
def __init__(self):
self.channel = grpc.insecure_channel(QUEUES_URL)
self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel)
def execute(self): def execute(self):
raise NotImplemented raise NotImplemented

View File

@ -5,7 +5,6 @@ from telebot import apihelper
from daemons import base from daemons import base
from utils import platform from utils import platform
from utils import queues from utils import queues
from utils import locks
class Message(pydantic.BaseModel): class Message(pydantic.BaseModel):
@ -23,9 +22,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
def queue_name(self): def queue_name(self):
return 'botalka_mailbox' return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'], 60)
def process(self, payload: dict): def process(self, payload: dict):
message = Message.model_validate(payload) message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name] bot = platform.platform_client.get_config('bots')[message.project][message.name]

View File

@ -47,7 +47,7 @@ class Daemon(base.Daemon):
def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread: def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread:
@bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note'])
def do_action(message: telebot.types.Message): def do_action(message: telebot.types.Message):
queues.set_task(queue, message.json, 1) queues.set_task(self.stub, queue, message.json, 1)
thread = threading.Thread(target=bot.polling) thread = threading.Thread(target=bot.polling)
thread.start() thread.start()
return thread return thread

View File

@ -1,10 +1,14 @@
annotated-types==0.7.0 annotated-types==0.7.0
certifi==2024.8.30 certifi==2024.8.30
charset-normalizer==3.4.0 charset-normalizer==3.4.0
grpcio==1.68.1
grpcio-tools==1.68.1
idna==3.10 idna==3.10
protobuf==5.29.1
pydantic==2.10.2 pydantic==2.10.2
pydantic_core==2.27.1 pydantic_core==2.27.1
pyTelegramBotAPI==4.1.1 pyTelegramBotAPI==4.1.1
requests==2.32.3 requests==2.32.3
setuptools==75.6.0
typing_extensions==4.12.2 typing_extensions==4.12.2
urllib3==2.2.3 urllib3==2.2.3

40
schemas/tasks.proto Normal file
View File

@ -0,0 +1,40 @@
syntax = "proto3";
package queues;
import "google/protobuf/struct.proto";
service Tasks {
rpc Put (PutRequest) returns (EmptyResponse) {}
rpc Take (TakeRequest) returns (TakeResponse) {}
rpc Finish (FinishRequest) returns (EmptyResponse) {}
}
message Task {
string id = 1;
int64 attempt = 2;
google.protobuf.Struct payload = 3;
}
message PutRequest {
string queue = 1;
int64 seconds_to_execute = 2;
optional int64 delay = 3;
google.protobuf.Struct payload = 4;
}
message TakeRequest {
string queue = 1;
}
message FinishRequest {
string id = 1;
}
message EmptyResponse {}
message TakeResponse {
optional Task task = 1;
}

View File

@ -1,27 +0,0 @@
import requests
LOCKS_URL = 'http://locks'
class Conflict(Exception):
pass
def acquire(name: str, ttl: int = 1):
resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={
'name': name,
'ttl': ttl,
})
if resp.status_code == 409:
raise Conflict
if resp.status_code != 202:
raise Exception
def release(name: str):
resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={
'name': name,
})
if resp.status_code != 202:
raise Exception

View File

@ -1,13 +1,6 @@
import os
import requests
import time import time
import tasks_pb2_grpc
import tasks_pb2
stage = os.getenv("STAGE", 'local')
if stage == 'local':
QUEUES_URL = 'http://localhost:1239'
else:
QUEUES_URL = 'http://queues:1239'
class QueuesException(Exception): class QueuesException(Exception):
@ -17,30 +10,20 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def poll(self): def poll(self):
while True: while True:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(self.queue_name))
task = response.get('task') task: tasks_pb2.Task = response.task
if not task: if not task:
time.sleep(0.2) time.sleep(0.2)
continue continue
try: try:
self.before_execute(task) self.process(task.payload)
self.process(task['payload'])
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
continue continue
try: try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) self.stub.Finish(tasks_pb2.FinishRequest(task.id))
if resp.status_code != 202:
raise QueuesException
self.after_execute(task)
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task.id}')
def before_execute(self, task: dict):
pass
def after_execute(self, task: dict):
pass
@property @property
def queue_name(self): def queue_name(self):
@ -49,12 +32,12 @@ class TasksHandlerMixin:
def process(self, payload): def process(self, payload):
raise NotImplemented raise NotImplemented
def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): stub.Put(
resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ tasks_pb2.PutRequest(
'payload': payload, queue=queue_name,
'seconds_to_execute': seconds_to_execute, seconds_to_execute=seconds_to_execute,
'delay': delay, delay=delay,
}) payload=payload
if resp.status_code != 202: )
raise QueuesException )