queues-go/app/storage/mongo/collections/tasks.go
Egor Matveev 932f263373
All checks were successful
Deploy Dev / Build (pull_request) Successful in 38s
Deploy Dev / Push (pull_request) Successful in 25s
Deploy Dev / Deploy dev (pull_request) Successful in 8s
fix
2024-12-31 02:54:57 +03:00

112 lines
2.7 KiB
Go

package tasks
import (
"context"
client "queues-go/app/storage/mongo"
errors "queues-go/app/utils"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
type Task struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
Id primitive.ObjectID `bson:"_id"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
type InsertedTask struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
func Add(task InsertedTask) error {
_, err := collection().InsertOne(context.TODO(), task)
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Finish(id string) error {
objectId, err := primitive.ObjectIDFromHex(id)
if err != nil {
return errors.ErrIncorrectFormat
}
_, err = collection().DeleteOne(context.TODO(), bson.M{"_id": objectId})
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Take(queue string) (*Task, error) {
now := time.Now()
task, err := findTask(queue, now)
if err != nil {
return nil, errors.ErrInternalError
}
if task == nil {
return nil, nil
}
_, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
if err != nil {
println("Error updaing")
println(err.Error())
return nil, errors.ErrInternalError
}
return task, nil
}
func findTask(queue string, now time.Time) (*Task, error) {
cursor, err := collection().Find(
context.TODO(),
bson.M{
"queue": queue,
"available_from": bson.M{"$lte": now},
},
)
if err != nil {
println("Error find")
println(err.Error())
return nil, errors.ErrInternalError
}
var results []Task
err = cursor.All(context.TODO(), &results)
if err != nil {
println("Error all")
println(err.Error())
return nil, errors.ErrInternalError
}
for _, task := range results {
if task.TakenAt == nil {
return &task, nil
}
takenAt := *task.TakenAt
if takenAt.Add(time.Second * time.Duration(task.SecondsToExecute)).Before(now) {
return &task, nil
}
}
return nil, nil
}
func collection() *mongo.Collection {
return client.Database.Collection("tasks")
}