b-jokes/helpers/events.py
2022-10-07 01:08:58 +03:00

62 lines
1.9 KiB
Python

import csv
import datetime
import logging
from io import StringIO
from typing import Generator
from helpers.mongo import mongo
class Events:
def __init__(self):
self.collection = mongo.events_collection
self.metrics = ("launch", "joke", "ping")
@property
def today(self):
date = datetime.datetime.now()
return datetime.datetime(year=date.year, month=date.month, day=date.day)
@property
def date_range(self) -> Generator[datetime.date, None, None]:
today = self.today
first = today - datetime.timedelta(days=30)
while first <= today:
yield today
today = today - datetime.timedelta(days=1)
def inc(self, event: str):
if event not in self.metrics:
logging.error("metric is not allowed")
return
query = {"date": self.today, "event": event}
event = self.collection.find_one(query)
if event is None:
query["count"] = 1
self.collection.insert_one(query)
else:
self.collection.update_one({"_id": event["_id"]}, {"$inc": {"count": 1}})
def form_data(self):
si = StringIO()
cw = csv.writer(si)
cw.writerow(["Date"] + list(self.metrics))
for date in self.date_range:
row = [date.strftime("%y-%m-%d")]
for metric in self.metrics:
data = self.collection.find_one({"date": date, "event": metric}) or {"count": 0}
row.append(str(data["count"]))
cw.writerow(row)
return si.getvalue()
def today_json(self):
values = self.collection.find({"date": self.today})
values = {value['event']: value['count'] for value in values}
json_data = {}
for metric in self.metrics:
json_data[metric] = values.get(metric) or 0
return json_data
events = Events()