Compare commits

..

No commits in common. "ecc9bef1727e51faf0292ed07b81dc68d6084a8e" and "a9f14411ced9e140eeb3675c1941993b48192adf" have entirely different histories.

3 changed files with 27 additions and 20 deletions

View File

@ -26,6 +26,13 @@ jobs:
steps: steps:
- name: push - name: push
run: docker push mathwave/sprint-repo:queues run: docker push mathwave/sprint-repo:queues
create_dir:
name: Create dir
runs-on: [ dev ]
needs: build
steps:
- name: create_dir
run: mkdir /sprint-data/queues-mongo || true
deploy-dev: deploy-dev:
name: Deploy dev name: Deploy dev
runs-on: [prod] runs-on: [prod]

View File

@ -26,6 +26,13 @@ jobs:
steps: steps:
- name: push - name: push
run: docker push mathwave/sprint-repo:queues run: docker push mathwave/sprint-repo:queues
create_dir:
name: Create dir
runs-on: [ prod ]
needs: build
steps:
- name: create_dir
run: mkdir /sprint-data/queues-mongo || true
deploy-prod: deploy-prod:
name: Deploy prod name: Deploy prod
runs-on: [prod] runs-on: [prod]

View File

@ -9,7 +9,6 @@ import (
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
type Task struct { type Task struct {
@ -63,19 +62,7 @@ func Take(queue string) (*Task, error) {
if task == nil { if task == nil {
return nil, nil return nil, nil
} }
_, err = collection().UpdateByID( _, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
context.TODO(),
task.Id,
bson.M{
"$set": bson.M{
"taken_at": now,
"attempts": task.Attempts + 1,
"available_from": now.Add(
time.Duration(task.SecondsToExecute) * time.Second,
),
},
},
)
if err != nil { if err != nil {
println("Error updaing") println("Error updaing")
println(err.Error()) println(err.Error())
@ -91,7 +78,6 @@ func findTask(queue string, now time.Time) (*Task, error) {
"queue": queue, "queue": queue,
"available_from": bson.M{"$lte": now}, "available_from": bson.M{"$lte": now},
}, },
options.Find().SetLimit(1),
) )
if err != nil { if err != nil {
println("Error find") println("Error find")
@ -108,8 +94,15 @@ func findTask(queue string, now time.Time) (*Task, error) {
} }
for _, task := range results { for _, task := range results {
if task.TakenAt == nil {
return &task, nil return &task, nil
} }
takenAt := *task.TakenAt
if takenAt.Add(time.Second * time.Duration(task.SecondsToExecute)).Before(now) {
return &task, nil
}
}
return nil, nil return nil, nil
} }