Merge pull request 'master' (#11) from master into prod
Reviewed-on: #11
This commit is contained in:
commit
6d305f7e98
@ -5,43 +5,12 @@ services:
|
|||||||
|
|
||||||
bot:
|
bot:
|
||||||
image: mathwave/sprint-repo:roulette-bot
|
image: mathwave/sprint-repo:roulette-bot
|
||||||
command: bot
|
|
||||||
environment:
|
environment:
|
||||||
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
|
|
||||||
MONGO_HOST: "mongo.develop.sprinthub.ru"
|
MONGO_HOST: "mongo.develop.sprinthub.ru"
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
||||||
MINIO_HOST: "minio.develop.sprinthub.ru"
|
|
||||||
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV
|
|
||||||
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
|
|
||||||
STAGE: "development"
|
STAGE: "development"
|
||||||
REDIS_HOST: "redis.develop.sprinthub.ru"
|
|
||||||
REDIS_PASSWORD: $REDIS_PASSWORD_DEV
|
|
||||||
networks:
|
networks:
|
||||||
- net
|
- queues-development
|
||||||
deploy:
|
|
||||||
mode: replicated
|
|
||||||
restart_policy:
|
|
||||||
condition: any
|
|
||||||
update_config:
|
|
||||||
parallelism: 1
|
|
||||||
order: start-first
|
|
||||||
|
|
||||||
roulette-nginx:
|
|
||||||
image: mathwave/sprint-repo:roulette-bot
|
|
||||||
command: api
|
|
||||||
environment:
|
|
||||||
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
|
|
||||||
MONGO_HOST: "mongo.develop.sprinthub.ru"
|
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
|
||||||
MINIO_HOST: "minio.develop.sprinthub.ru"
|
|
||||||
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV
|
|
||||||
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
|
|
||||||
STAGE: "development"
|
|
||||||
REDIS_HOST: "redis.develop.sprinthub.ru"
|
|
||||||
REDIS_PASSWORD: $REDIS_PASSWORD_DEV
|
|
||||||
networks:
|
|
||||||
- net
|
|
||||||
- common-infra-nginx
|
|
||||||
deploy:
|
deploy:
|
||||||
mode: replicated
|
mode: replicated
|
||||||
restart_policy:
|
restart_policy:
|
||||||
@ -51,7 +20,5 @@ services:
|
|||||||
order: start-first
|
order: start-first
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
net:
|
queues-development:
|
||||||
driver: overlay
|
|
||||||
common-infra-nginx:
|
|
||||||
external: true
|
external: true
|
@ -5,43 +5,12 @@ services:
|
|||||||
|
|
||||||
bot:
|
bot:
|
||||||
image: mathwave/sprint-repo:roulette-bot
|
image: mathwave/sprint-repo:roulette-bot
|
||||||
command: bot
|
|
||||||
networks:
|
|
||||||
- net
|
|
||||||
environment:
|
environment:
|
||||||
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
|
|
||||||
MONGO_HOST: "mongo.sprinthub.ru"
|
MONGO_HOST: "mongo.sprinthub.ru"
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
||||||
MINIO_HOST: "minio.sprinthub.ru"
|
|
||||||
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD
|
|
||||||
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
|
|
||||||
STAGE: "production"
|
STAGE: "production"
|
||||||
REDIS_HOST: "redis.sprinthub.ru"
|
|
||||||
REDIS_PASSWORD: $REDIS_PASSWORD_PROD
|
|
||||||
deploy:
|
|
||||||
mode: replicated
|
|
||||||
restart_policy:
|
|
||||||
condition: any
|
|
||||||
update_config:
|
|
||||||
parallelism: 1
|
|
||||||
order: start-first
|
|
||||||
|
|
||||||
roulette-nginx:
|
|
||||||
image: mathwave/sprint-repo:roulette-bot
|
|
||||||
command: api
|
|
||||||
networks:
|
networks:
|
||||||
- net
|
- queues
|
||||||
- common-infra-nginx
|
|
||||||
environment:
|
|
||||||
TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
|
|
||||||
MONGO_HOST: "mongo.sprinthub.ru"
|
|
||||||
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
|
||||||
MINIO_HOST: "minio.sprinthub.ru"
|
|
||||||
MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD
|
|
||||||
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
|
|
||||||
STAGE: "production"
|
|
||||||
REDIS_HOST: "redis.sprinthub.ru"
|
|
||||||
REDIS_PASSWORD: $REDIS_PASSWORD_PROD
|
|
||||||
deploy:
|
deploy:
|
||||||
mode: replicated
|
mode: replicated
|
||||||
restart_policy:
|
restart_policy:
|
||||||
@ -51,7 +20,5 @@ services:
|
|||||||
order: start-first
|
order: start-first
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
net:
|
queues:
|
||||||
driver: overlay
|
|
||||||
common-infra-nginx:
|
|
||||||
external: true
|
external: true
|
@ -28,7 +28,7 @@ jobs:
|
|||||||
run: docker push mathwave/sprint-repo:roulette-bot
|
run: docker push mathwave/sprint-repo:roulette-bot
|
||||||
deploy-dev:
|
deploy-dev:
|
||||||
name: Deploy dev
|
name: Deploy dev
|
||||||
runs-on: [dev]
|
runs-on: [prod]
|
||||||
needs: push
|
needs: push
|
||||||
steps:
|
steps:
|
||||||
- name: login
|
- name: login
|
||||||
@ -39,9 +39,5 @@ jobs:
|
|||||||
ref: dev
|
ref: dev
|
||||||
- name: deploy
|
- name: deploy
|
||||||
env:
|
env:
|
||||||
TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }}
|
|
||||||
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
|
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
|
||||||
PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
|
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot-development
|
||||||
MINIO_SECRET_KEY_DEV: ${{ secrets.MINIO_SECRET_KEY_DEV }}
|
|
||||||
REDIS_PASSWORD_DEV: ${{ secrets.REDIS_PASSWORD_DEV }}
|
|
||||||
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot
|
|
||||||
|
@ -39,9 +39,5 @@ jobs:
|
|||||||
ref: prod
|
ref: prod
|
||||||
- name: deploy
|
- name: deploy
|
||||||
env:
|
env:
|
||||||
TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }}
|
|
||||||
MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }}
|
MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }}
|
||||||
PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
|
|
||||||
MINIO_SECRET_KEY_PROD: ${{ secrets.MINIO_SECRET_KEY_PROD }}
|
|
||||||
REDIS_PASSWORD_PROD: ${{ secrets.REDIS_PASSWORD_PROD }}
|
|
||||||
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml roulette-bot
|
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml roulette-bot
|
||||||
|
@ -1,43 +0,0 @@
|
|||||||
stages:
|
|
||||||
- build
|
|
||||||
- deploy-dev
|
|
||||||
- deploy-prod
|
|
||||||
|
|
||||||
build:
|
|
||||||
stage: build
|
|
||||||
tags:
|
|
||||||
- dev
|
|
||||||
before_script:
|
|
||||||
- docker login -u mathwave -p $DOCKERHUB_PASSWORD
|
|
||||||
script:
|
|
||||||
- docker build -t mathwave/sprint-repo:roulette-bot .
|
|
||||||
- docker push mathwave/sprint-repo:roulette-bot
|
|
||||||
|
|
||||||
.deploy:
|
|
||||||
before_script:
|
|
||||||
- docker login -u mathwave -p $DOCKERHUB_PASSWORD
|
|
||||||
|
|
||||||
deploy-dev:
|
|
||||||
extends:
|
|
||||||
- .deploy
|
|
||||||
stage: deploy-dev
|
|
||||||
tags:
|
|
||||||
- dev
|
|
||||||
rules:
|
|
||||||
- if: '$CI_COMMIT_BRANCH == "main"'
|
|
||||||
when: on_success
|
|
||||||
- when: manual
|
|
||||||
script:
|
|
||||||
- docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot
|
|
||||||
|
|
||||||
deploy-prod:
|
|
||||||
extends:
|
|
||||||
- .deploy
|
|
||||||
stage: deploy-prod
|
|
||||||
tags:
|
|
||||||
- prod
|
|
||||||
only:
|
|
||||||
- main
|
|
||||||
when: manual
|
|
||||||
script:
|
|
||||||
- docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml roulette-bot
|
|
@ -5,4 +5,4 @@ COPY requirements.txt requirements.txt
|
|||||||
RUN pip install -r requirements.txt
|
RUN pip install -r requirements.txt
|
||||||
COPY . .
|
COPY . .
|
||||||
ENV PYTHONUNBUFFERED 1
|
ENV PYTHONUNBUFFERED 1
|
||||||
ENTRYPOINT ["python", "main.py"]
|
ENTRYPOINT ["python", "bot.py"]
|
24
api.py
24
api.py
@ -1,24 +0,0 @@
|
|||||||
from bson import ObjectId
|
|
||||||
from flask import Flask, request
|
|
||||||
|
|
||||||
from tools.mongo import mongo
|
|
||||||
|
|
||||||
app = Flask("roulette")
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/dialogs')
|
|
||||||
def main():
|
|
||||||
html = "<html><head></head><body>"
|
|
||||||
for d in mongo.dialogs_collection.find({}).sort([('started_at', -1)]):
|
|
||||||
html += f'<a href="/dialog?id={d["_id"]}">{d["_id"]}</a><br>'
|
|
||||||
html += "</body></html>"
|
|
||||||
return html
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/dialog')
|
|
||||||
def dialog():
|
|
||||||
html = "<html><head></head><body>"
|
|
||||||
for message in mongo.messages_collection.find({"dialog_id": ObjectId(request.args.get('id'))}).sort([('sent_at', 1)]):
|
|
||||||
html += f'{message["sender"]}: {message["text"]}<br>'
|
|
||||||
html += "</body></html>"
|
|
||||||
return html
|
|
111
bot.py
111
bot.py
@ -1,20 +1,17 @@
|
|||||||
import os
|
import json
|
||||||
import uuid
|
|
||||||
|
|
||||||
import requests
|
|
||||||
import telebot
|
|
||||||
from telebot.types import Message, ReplyKeyboardRemove
|
from telebot.types import Message, ReplyKeyboardRemove
|
||||||
|
|
||||||
from tools.minio import minio_client as minio
|
|
||||||
from tools.mongo import mongo
|
from tools.mongo import mongo
|
||||||
from tools.sprint_platform import platform
|
from tools.queues import TasksHandlerMixin, set_task
|
||||||
from tools.redis import redis_client as redis
|
|
||||||
|
|
||||||
bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
|
|
||||||
|
|
||||||
|
|
||||||
class Core:
|
class Core(TasksHandlerMixin):
|
||||||
def __init__(self, message: Message):
|
@property
|
||||||
|
def queue_name(self):
|
||||||
|
return 'roulette_bot_worker'
|
||||||
|
|
||||||
|
def process(self, payload):
|
||||||
|
message: Message = Message.de_json(json.dumps(payload))
|
||||||
self.message = message
|
self.message = message
|
||||||
self.chat_id = message.chat.id
|
self.chat_id = message.chat.id
|
||||||
self.message_text = message.text or message.caption or ""
|
self.message_text = message.text or message.caption or ""
|
||||||
@ -30,8 +27,6 @@ class Core:
|
|||||||
doc = user
|
doc = user
|
||||||
self.doc = doc
|
self.doc = doc
|
||||||
self.state = doc['state']
|
self.state = doc['state']
|
||||||
|
|
||||||
def process(self):
|
|
||||||
if self.message_text.startswith('/'):
|
if self.message_text.startswith('/'):
|
||||||
self.exec_command()
|
self.exec_command()
|
||||||
return
|
return
|
||||||
@ -68,12 +63,42 @@ class Core:
|
|||||||
def handle_state_search(self):
|
def handle_state_search(self):
|
||||||
self.send_message('🤖 Поиски собеседника продолжаются')
|
self.send_message('🤖 Поиски собеседника продолжаются')
|
||||||
|
|
||||||
def send_message(self, text, chat_id=None, reply_markup=None, remove_keyboard=True, **kwargs):
|
def send_message(self, text, chat_id=None, reply_markup=None, remove_keyboard=True):
|
||||||
if not text:
|
if not text:
|
||||||
return
|
return
|
||||||
if reply_markup is None and remove_keyboard:
|
if reply_markup is None and remove_keyboard:
|
||||||
reply_markup = ReplyKeyboardRemove()
|
reply_markup = ReplyKeyboardRemove()
|
||||||
bot.send_message(chat_id or self.chat_id, text, reply_markup=reply_markup, **kwargs)
|
body = {
|
||||||
|
'chat_id': chat_id or self.chat_id,
|
||||||
|
'text': text,
|
||||||
|
}
|
||||||
|
if reply_markup:
|
||||||
|
body['reply_markup'] = reply_markup.to_json()
|
||||||
|
set_task(
|
||||||
|
'botalka_mailbox',
|
||||||
|
{
|
||||||
|
'project': 'roulette-bot',
|
||||||
|
'name': 'telegram-bot',
|
||||||
|
'method': 'send_message',
|
||||||
|
'body': body,
|
||||||
|
},
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
|
def send(self, chat_id, method, **kwargs):
|
||||||
|
set_task(
|
||||||
|
'botalka_mailbox',
|
||||||
|
{
|
||||||
|
'project': 'roulette-bot',
|
||||||
|
'name': 'telegram-bot',
|
||||||
|
'method': method,
|
||||||
|
'body': {
|
||||||
|
'chat_id': chat_id,
|
||||||
|
**kwargs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
|
||||||
def set_state(self, state, chat_ids=None):
|
def set_state(self, state, chat_ids=None):
|
||||||
mongo.chats_collection.update_many({"chat_id": {"$in": chat_ids or [self.chat_id]}}, {"$set": {"state": state}})
|
mongo.chats_collection.update_many({"chat_id": {"$in": chat_ids or [self.chat_id]}}, {"$set": {"state": state}})
|
||||||
@ -87,36 +112,16 @@ class Core:
|
|||||||
def handle_state_dialog(self):
|
def handle_state_dialog(self):
|
||||||
current_dialog = mongo.get_current_dialog(self.chat_id)
|
current_dialog = mongo.get_current_dialog(self.chat_id)
|
||||||
chat_to_send = current_dialog['chat_id_2'] if current_dialog['chat_id_1'] == self.chat_id else current_dialog['chat_id_1']
|
chat_to_send = current_dialog['chat_id_2'] if current_dialog['chat_id_1'] == self.chat_id else current_dialog['chat_id_1']
|
||||||
saves = platform.get_config('save')
|
|
||||||
if saves['messages']:
|
|
||||||
res = mongo.create_message(self.message.content_type, self.message_text, current_dialog['_id'], self.chat_id).inserted_id
|
|
||||||
else:
|
|
||||||
res = uuid.uuid4()
|
|
||||||
if self.message.photo:
|
if self.message.photo:
|
||||||
if saves['photos']:
|
self.send(chat_to_send, 'send_photo', photo=self.message.photo[-1].file_id)
|
||||||
photo = requests.get(bot.get_file_url(self.message.photo[-1].file_id)).content
|
|
||||||
minio.put_object(f"photos/{res}", photo)
|
|
||||||
bot.send_photo(chat_to_send, self.message.photo[-1].file_id)
|
|
||||||
if self.message.sticker:
|
if self.message.sticker:
|
||||||
if saves['stickers']:
|
self.send(chat_to_send, 'send_data', data=self.message.sticker.file_id, data_type='sticker')
|
||||||
sticker = requests.get(bot.get_file_url(self.message.sticker.file_id)).content
|
|
||||||
minio.put_object(f"stickers/{res}", sticker)
|
|
||||||
bot.send_sticker(chat_to_send, self.message.sticker.file_id)
|
|
||||||
if self.message.voice:
|
if self.message.voice:
|
||||||
if saves['voices']:
|
self.send(chat_to_send, 'send_voice', voice=self.message.voice.file_id)
|
||||||
voice = requests.get(bot.get_file_url(self.message.voice.file_id)).content
|
|
||||||
minio.put_object(f"voices/{res}", voice)
|
|
||||||
bot.send_voice(chat_to_send, self.message.voice.file_id)
|
|
||||||
if self.message.video_note:
|
if self.message.video_note:
|
||||||
if saves['video_notes']:
|
self.send(chat_to_send, 'send_video_note', data=self.message.video_note.file_id)
|
||||||
video_note = requests.get(bot.get_file_url(self.message.video_note.file_id)).content
|
|
||||||
minio.put_object(f"video_notes/{res}", video_note)
|
|
||||||
bot.send_video_note(chat_to_send, self.message.video_note.file_id)
|
|
||||||
if self.message.animation:
|
if self.message.animation:
|
||||||
if saves['gifs']:
|
self.send(chat_to_send, 'send_animation', data=self.message.animation.file_id)
|
||||||
video_note = requests.get(bot.get_file_url(self.message.animation.file_id)).content
|
|
||||||
minio.put_object(f"gifs/{res}", video_note)
|
|
||||||
bot.send_animation(chat_to_send, self.message.animation.file_id)
|
|
||||||
self.send_message(self.message_text, chat_to_send)
|
self.send_message(self.message_text, chat_to_send)
|
||||||
|
|
||||||
def start_new_dialog(self, chat_ids):
|
def start_new_dialog(self, chat_ids):
|
||||||
@ -136,25 +141,5 @@ class Core:
|
|||||||
self.set_state('dialog', [chat, next_chat['chat_id']])
|
self.set_state('dialog', [chat, next_chat['chat_id']])
|
||||||
|
|
||||||
|
|
||||||
def run_bot():
|
if __name__ == '__main__':
|
||||||
@bot.message_handler(content_types=[
|
Core().poll()
|
||||||
# 'audio',
|
|
||||||
'photo',
|
|
||||||
'voice',
|
|
||||||
'video_note',
|
|
||||||
# 'document',
|
|
||||||
'text',
|
|
||||||
'animation',
|
|
||||||
# 'location',
|
|
||||||
# 'contact',
|
|
||||||
'sticker'
|
|
||||||
]
|
|
||||||
)
|
|
||||||
def do_action(message: Message):
|
|
||||||
try:
|
|
||||||
Core(message).process()
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
print('bot is starting')
|
|
||||||
bot.polling()
|
|
||||||
|
@ -1,14 +0,0 @@
|
|||||||
{
|
|
||||||
"configs": {
|
|
||||||
"save": {
|
|
||||||
"photos": false,
|
|
||||||
"voices": false,
|
|
||||||
"messages": false,
|
|
||||||
"stickers": false,
|
|
||||||
"video_notes": false,
|
|
||||||
"gifs": false
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"experiments": {},
|
|
||||||
"platform_staff": {}
|
|
||||||
}
|
|
11
main.py
11
main.py
@ -1,11 +0,0 @@
|
|||||||
import sys
|
|
||||||
|
|
||||||
|
|
||||||
if sys.argv[-1] == "bot":
|
|
||||||
from bot import run_bot
|
|
||||||
run_bot()
|
|
||||||
elif sys.argv[-1] == "api":
|
|
||||||
from api import app
|
|
||||||
app.run(host="0.0.0.0", port=1238)
|
|
||||||
else:
|
|
||||||
raise NotImplementedError
|
|
@ -1,45 +0,0 @@
|
|||||||
import io
|
|
||||||
|
|
||||||
from minio import Minio
|
|
||||||
from minio.error import MinioException
|
|
||||||
|
|
||||||
import settings
|
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
|
||||||
|
|
||||||
def __init__(self, host: str, access_key: str, secret_key: str, bucket_name: str):
|
|
||||||
self.bucket_name = bucket_name
|
|
||||||
self.cli = Minio(
|
|
||||||
host,
|
|
||||||
access_key=access_key,
|
|
||||||
secret_key=secret_key,
|
|
||||||
secure=False
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
self.cli.make_bucket(bucket_name)
|
|
||||||
except MinioException:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def put_object(self, name: str, data: bytes):
|
|
||||||
self.cli.put_object(self.bucket_name, name, io.BytesIO(data), len(data))
|
|
||||||
|
|
||||||
def get_object(self, name: str) -> bytes:
|
|
||||||
try:
|
|
||||||
return self.cli.get_object(self.bucket_name, name).data
|
|
||||||
except MinioException:
|
|
||||||
return b""
|
|
||||||
|
|
||||||
def delete_object(self, name: str):
|
|
||||||
try:
|
|
||||||
self.cli.remove_object(self.bucket_name, name)
|
|
||||||
except MinioException:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
minio_client = Client(
|
|
||||||
settings.MINIO_HOST,
|
|
||||||
settings.MINIO_ACCESS_KEY,
|
|
||||||
settings.MINIO_SECRET_KEY,
|
|
||||||
settings.MINIO_BUCKET_NAME
|
|
||||||
)
|
|
52
tools/queues.py
Normal file
52
tools/queues.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
stage = os.getenv("STAGE", 'local')
|
||||||
|
if stage == 'local':
|
||||||
|
QUEUES_URL = 'http://localhost:1239'
|
||||||
|
else:
|
||||||
|
QUEUES_URL = 'http://queues:1239'
|
||||||
|
|
||||||
|
|
||||||
|
class QueuesException(Exception):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
class TasksHandlerMixin:
|
||||||
|
def poll(self):
|
||||||
|
while True:
|
||||||
|
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
|
||||||
|
task = response.get('task')
|
||||||
|
if not task:
|
||||||
|
time.sleep(0.2)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
self.process(task['payload'])
|
||||||
|
except Exception as exc:
|
||||||
|
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
|
||||||
|
if resp.status_code != 202:
|
||||||
|
raise QueuesException
|
||||||
|
except:
|
||||||
|
print(f'Failed to finish task id={task["id"]}')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def queue_name(self):
|
||||||
|
raise NotImplemented
|
||||||
|
|
||||||
|
def process(self, payload):
|
||||||
|
raise NotImplemented
|
||||||
|
|
||||||
|
|
||||||
|
def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
|
||||||
|
resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={
|
||||||
|
'payload': payload,
|
||||||
|
'seconds_to_execute': seconds_to_execute,
|
||||||
|
'delay': delay,
|
||||||
|
})
|
||||||
|
if resp.status_code != 202:
|
||||||
|
raise QueuesException
|
@ -1,30 +0,0 @@
|
|||||||
import contextlib
|
|
||||||
|
|
||||||
import redis
|
|
||||||
|
|
||||||
import settings
|
|
||||||
|
|
||||||
|
|
||||||
class RedisClient:
|
|
||||||
|
|
||||||
def __init__(self, host, password=None):
|
|
||||||
kwargs = {
|
|
||||||
"host": host,
|
|
||||||
}
|
|
||||||
if password:
|
|
||||||
kwargs['password'] = password
|
|
||||||
self.cli = redis.Redis(**kwargs)
|
|
||||||
|
|
||||||
def get(self, key):
|
|
||||||
with self.cli as cli:
|
|
||||||
return cli.get(f"ruletka_{key}")
|
|
||||||
|
|
||||||
def set(self, key, value):
|
|
||||||
with self.cli as cli:
|
|
||||||
cli.set(f"ruletka_{key}", value)
|
|
||||||
|
|
||||||
|
|
||||||
redis_client = RedisClient(
|
|
||||||
settings.REDIS_HOST,
|
|
||||||
settings.REDIS_PASSWORD
|
|
||||||
)
|
|
@ -1,98 +0,0 @@
|
|||||||
import json
|
|
||||||
import os
|
|
||||||
import typing
|
|
||||||
import urllib.parse
|
|
||||||
from threading import Thread
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
from requests import get
|
|
||||||
|
|
||||||
|
|
||||||
class PlatformClient:
|
|
||||||
def __init__(self, platform_security_token: str, app_name: str, stage: str, need_poll: bool = True):
|
|
||||||
self.platform_security_token = platform_security_token
|
|
||||||
self.app_name = app_name
|
|
||||||
self.stage = stage
|
|
||||||
self.endpoint = 'https://platform.sprinthub.ru/'
|
|
||||||
self.configs_url = urllib.parse.urljoin(self.endpoint, 'configs/get')
|
|
||||||
self.experiments_url = urllib.parse.urljoin(self.endpoint, 'experiments/get')
|
|
||||||
self.staff_url = urllib.parse.urljoin(self.endpoint, 'is_staff')
|
|
||||||
self.fetch_url = urllib.parse.urljoin(self.endpoint, 'fetch')
|
|
||||||
self.config_storage = {}
|
|
||||||
self.experiment_storage = {}
|
|
||||||
self.staff_storage = {}
|
|
||||||
self.poll_data()
|
|
||||||
if need_poll:
|
|
||||||
self.poll_data_in_thread()
|
|
||||||
|
|
||||||
def poll_data_in_thread(self):
|
|
||||||
def inner():
|
|
||||||
while True:
|
|
||||||
sleep(30)
|
|
||||||
self.fetch()
|
|
||||||
|
|
||||||
Thread(target=inner, daemon=True).start()
|
|
||||||
|
|
||||||
def poll_data(self):
|
|
||||||
self.fetch(with_exception=True)
|
|
||||||
|
|
||||||
def request_with_retries(self, url, params, with_exception=False, retries_count=3):
|
|
||||||
exception_to_throw = None
|
|
||||||
for _ in range(retries_count):
|
|
||||||
try:
|
|
||||||
response = get(
|
|
||||||
url,
|
|
||||||
headers={'X-Security-Token': self.platform_security_token},
|
|
||||||
params=params
|
|
||||||
)
|
|
||||||
if response.status_code == 200:
|
|
||||||
return response.json()
|
|
||||||
print(f'Failed to request {url}, status_code={response.status_code}')
|
|
||||||
exception_to_throw = Exception('Not 200 status')
|
|
||||||
except Exception as exc:
|
|
||||||
print(exc)
|
|
||||||
exception_to_throw = exc
|
|
||||||
sleep(1)
|
|
||||||
print(f'Failed fetching with retries: {url}, {params}')
|
|
||||||
if with_exception:
|
|
||||||
raise exception_to_throw
|
|
||||||
|
|
||||||
def fetch(self, with_exception=False):
|
|
||||||
if self.stage == 'local':
|
|
||||||
local_platform = json.loads(open('local_platform.json', 'r').read())
|
|
||||||
self.config_storage = local_platform['configs']
|
|
||||||
self.experiment_storage = local_platform['experiments']
|
|
||||||
self.staff_storage = {
|
|
||||||
key: set(value)
|
|
||||||
for key, value in local_platform['platform_staff'].items()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
response_data = self.request_with_retries(self.fetch_url, {
|
|
||||||
'project': self.app_name,
|
|
||||||
'stage': self.stage,
|
|
||||||
}, with_exception)
|
|
||||||
self.config_storage = response_data['configs']
|
|
||||||
self.experiment_storage = response_data['experiments']
|
|
||||||
self.staff_storage = {
|
|
||||||
key: set(value)
|
|
||||||
for key, value in response_data['platform_staff'].items()
|
|
||||||
}
|
|
||||||
|
|
||||||
def is_staff(self, **kwargs):
|
|
||||||
for key, value in kwargs.items():
|
|
||||||
if value in self.staff_storage[key]:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_config(self, name):
|
|
||||||
return self.config_storage[name]
|
|
||||||
|
|
||||||
def get_experiment(self, name):
|
|
||||||
return self.experiment_storage[name]
|
|
||||||
|
|
||||||
|
|
||||||
platform = PlatformClient(
|
|
||||||
os.getenv('PLATFORM_SECURITY_TOKEN'),
|
|
||||||
'Ruletka',
|
|
||||||
os.getenv('STAGE', 'local')
|
|
||||||
)
|
|
Loading…
Reference in New Issue
Block a user