queues-go/app/storage/mongo/collections/tasks.go
Egor Matveev 4d2e0198f3
All checks were successful
Deploy Dev / Build (pull_request) Successful in 41s
Deploy Dev / Push (pull_request) Successful in 26s
Deploy Dev / Deploy dev (pull_request) Successful in 7s
fix
2025-03-23 11:30:46 +03:00

119 lines
2.8 KiB
Go

package tasks
import (
"context"
client "queues-go/app/storage/mongo"
errors "queues-go/app/utils"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type Task struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
Id primitive.ObjectID `bson:"_id"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
type InsertedTask struct {
Queue string `bson:"queue"`
Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"`
TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"`
}
func Add(task InsertedTask) error {
_, err := collection().InsertOne(context.TODO(), task)
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Finish(id string) error {
objectId, err := primitive.ObjectIDFromHex(id)
if err != nil {
return errors.ErrIncorrectFormat
}
_, err = collection().DeleteOne(context.TODO(), bson.M{"_id": objectId})
if err != nil {
return errors.ErrInternalError
}
return nil
}
func Take(queue string) (*Task, error) {
now := time.Now()
task, err := findTask(queue, now)
if err != nil {
return nil, errors.ErrInternalError
}
if task == nil {
return nil, nil
}
_, err = collection().UpdateByID(
context.TODO(),
task.Id,
bson.M{
"$set": bson.M{
"taken_at": now,
"attempts": task.Attempts + 1,
"available_from": now.Add(
time.Duration(task.SecondsToExecute+task.Attempts) * time.Second,
),
},
},
)
if err != nil {
println("Error updaing")
println(err.Error())
return nil, errors.ErrInternalError
}
return task, nil
}
func findTask(queue string, now time.Time) (*Task, error) {
cursor, err := collection().Find(
context.TODO(),
bson.M{
"queue": queue,
"available_from": bson.M{"$lte": now},
},
options.Find().SetSort(bson.D{{Key: "available_from", Value: 1}}).SetLimit(1),
)
if err != nil {
println("Error find")
println(err.Error())
return nil, errors.ErrInternalError
}
var results []Task
err = cursor.All(context.TODO(), &results)
if err != nil {
println("Error all")
println(err.Error())
return nil, errors.ErrInternalError
}
for _, task := range results {
return &task, nil
}
return nil, nil
}
func collection() *mongo.Collection {
return client.Database.Collection("tasks")
}