Merge pull request 'master' (#6) from master into prod

Reviewed-on: #6
This commit is contained in:
emmatveev 2024-12-31 03:04:59 +03:00
commit bf152e6d13
6 changed files with 30 additions and 12 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
queues-go

View File

@ -1,6 +1,6 @@
FROM golang:alpine FROM golang:alpine
RUN mkdir /usr/src/app RUN mkdir /code
WORKDIR /usr/src/app WORKDIR /code
COPY . . COPY . .
RUN go build RUN go build
ENTRYPOINT ["./queues-go"] ENTRYPOINT ["./queues-go"]

View File

@ -5,12 +5,14 @@ import (
"net/http" "net/http"
tasks "queues-go/app/storage/mongo/collections" tasks "queues-go/app/storage/mongo/collections"
"time" "time"
"go.mongodb.org/mongo-driver/bson"
) )
type PutRequestBody struct { type PutRequestBody struct {
Payload json.RawMessage `json:"payload"` Payload bson.M `json:"payload"`
SecondsToExecute int `json:"seconds_to_execute"` SecondsToExecute int `json:"seconds_to_execute"`
Delay *int `json:"delay"` Delay *int `json:"delay"`
} }
func Put(w http.ResponseWriter, r *http.Request) { func Put(w http.ResponseWriter, r *http.Request) {
@ -21,6 +23,7 @@ func Put(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
queue := r.Header.Get("queue") queue := r.Header.Get("queue")
var availableFrom time.Time var availableFrom time.Time
if body.Delay == nil { if body.Delay == nil {
@ -28,6 +31,7 @@ func Put(w http.ResponseWriter, r *http.Request) {
} else { } else {
availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay)) availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay))
} }
task := tasks.InsertedTask{ task := tasks.InsertedTask{
Queue: queue, Queue: queue,
Payload: body.Payload, Payload: body.Payload,
@ -37,10 +41,12 @@ func Put(w http.ResponseWriter, r *http.Request) {
TakenAt: nil, TakenAt: nil,
Attempts: 0, Attempts: 0,
} }
err = tasks.Add(task) err = tasks.Add(task)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusAccepted) w.WriteHeader(http.StatusAccepted)
} }

View File

@ -20,9 +20,11 @@ func Take(w http.ResponseWriter, r *http.Request) {
queue := r.Header.Get("queue") queue := r.Header.Get("queue")
task, err := tasks.Take(queue) task, err := tasks.Take(queue)
if err != nil { if err != nil {
println("Error taking")
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
var response TakeResponse var response TakeResponse
if task == nil { if task == nil {
response.Task = nil response.Task = nil
@ -33,10 +35,13 @@ func Take(w http.ResponseWriter, r *http.Request) {
Payload: task.Payload, Payload: task.Payload,
} }
} }
data, err := json.Marshal(response) data, err := json.Marshal(response)
if err != nil { if err != nil {
println("Error marshalling")
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
w.Write(data) w.Write(data)
} }

View File

@ -23,13 +23,13 @@ type Task struct {
} }
type InsertedTask struct { type InsertedTask struct {
Queue string `bson:"queue"` Queue string `bson:"queue"`
Payload interface{} `bson:"payload"` Payload bson.M `bson:"payload"`
PutAt time.Time `bson:"put_at"` PutAt time.Time `bson:"put_at"`
AvailableFrom time.Time `bson:"available_from"` AvailableFrom time.Time `bson:"available_from"`
SecondsToExecute int `bson:"seconds_to_execute"` SecondsToExecute int `bson:"seconds_to_execute"`
TakenAt *time.Time `bson:"taken_at"` TakenAt *time.Time `bson:"taken_at"`
Attempts int `bson:"attempts"` Attempts int `bson:"attempts"`
} }
func Add(task InsertedTask) error { func Add(task InsertedTask) error {
@ -64,6 +64,8 @@ func Take(queue string) (*Task, error) {
} }
_, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}}) _, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
if err != nil { if err != nil {
println("Error updaing")
println(err.Error())
return nil, errors.ErrInternalError return nil, errors.ErrInternalError
} }
return task, nil return task, nil
@ -78,12 +80,16 @@ func findTask(queue string, now time.Time) (*Task, error) {
}, },
) )
if err != nil { if err != nil {
println("Error find")
println(err.Error())
return nil, errors.ErrInternalError return nil, errors.ErrInternalError
} }
var results []Task var results []Task
err = cursor.All(context.TODO(), &results) err = cursor.All(context.TODO(), &results)
if err != nil { if err != nil {
println("Error all")
println(err.Error())
return nil, errors.ErrInternalError return nil, errors.ErrInternalError
} }

BIN
queues-go

Binary file not shown.