queues-go/app/storage/mongo/collections/tasks.go
Egor Matveev bc9c8ad85d
All checks were successful
Deploy Dev / Build (pull_request) Successful in 53s
Deploy Dev / Push (pull_request) Successful in 26s
Deploy Dev / Deploy dev (pull_request) Successful in 9s
Deploy Prod / Build (pull_request) Successful in 1m25s
Deploy Prod / Push (pull_request) Successful in 45s
Deploy Prod / Deploy prod (pull_request) Successful in 8s
fix
2025-01-13 13:41:45 +03:00

119 lines
2.7 KiB
Go

package tasks
import (
"context"
client "queues-go/app/storage/mongo"
errors "queues-go/app/utils"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type Task struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
Id primitive.ObjectID `bson:"_id"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
type InsertedTask struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
func Add(task InsertedTask) error {
_, err := collection().InsertOne(context.TODO(), task)
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Finish(id string) error {
objectId, err := primitive.ObjectIDFromHex(id)
if err != nil {
return errors.ErrIncorrectFormat
}
_, err = collection().DeleteOne(context.TODO(), bson.M{"_id": objectId})
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Take(queue string) (*Task, error) {
now := time.Now()
task, err := findTask(queue, now)
if err != nil {
return nil, errors.ErrInternalError
}
if task == nil {
return nil, nil
}
_, err = collection().UpdateByID(
context.TODO(),
task.Id,
bson.M{
"$set": bson.M{
"taken_at": now,
"attempts": task.Attempts + 1,
"available_from": now.Add(
time.Duration(task.SecondsToExecute) * time.Second,
),
},
},
)
if err != nil {
println("Error updaing")
println(err.Error())
return nil, errors.ErrInternalError
}
return task, nil
}
func findTask(queue string, now time.Time) (*Task, error) {
cursor, err := collection().Find(
context.TODO(),
bson.M{
"queue": queue,
"available_from": bson.M{"$lte": now},
},
options.Find().SetLimit(1),
)
if err != nil {
println("Error find")
println(err.Error())
return nil, errors.ErrInternalError
}
var results []Task
err = cursor.All(context.TODO(), &results)
if err != nil {
println("Error all")
println(err.Error())
return nil, errors.ErrInternalError
}
for _, task := range results {
return &task, nil
}
return nil, nil
}
func collection() *mongo.Collection {
return client.Database.Collection("tasks")
}