diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e1514fd --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +queues-go \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 90334fd..9058b0c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM golang:alpine -RUN mkdir /usr/src/app -WORKDIR /usr/src/app +RUN mkdir /code +WORKDIR /code COPY . . RUN go build ENTRYPOINT ["./queues-go"] \ No newline at end of file diff --git a/app/routers/put.go b/app/routers/put.go index 7a2759b..cfd9834 100644 --- a/app/routers/put.go +++ b/app/routers/put.go @@ -5,12 +5,14 @@ import ( "net/http" tasks "queues-go/app/storage/mongo/collections" "time" + + "go.mongodb.org/mongo-driver/bson" ) type PutRequestBody struct { - Payload json.RawMessage `json:"payload"` - SecondsToExecute int `json:"seconds_to_execute"` - Delay *int `json:"delay"` + Payload bson.M `json:"payload"` + SecondsToExecute int `json:"seconds_to_execute"` + Delay *int `json:"delay"` } func Put(w http.ResponseWriter, r *http.Request) { @@ -21,6 +23,7 @@ func Put(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + queue := r.Header.Get("queue") var availableFrom time.Time if body.Delay == nil { @@ -28,6 +31,7 @@ func Put(w http.ResponseWriter, r *http.Request) { } else { availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay)) } + task := tasks.InsertedTask{ Queue: queue, Payload: body.Payload, @@ -37,10 +41,12 @@ func Put(w http.ResponseWriter, r *http.Request) { TakenAt: nil, Attempts: 0, } + err = tasks.Add(task) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } + w.WriteHeader(http.StatusAccepted) } diff --git a/app/routers/take.go b/app/routers/take.go index 8fa92ea..90e4545 100644 --- a/app/routers/take.go +++ b/app/routers/take.go @@ -20,9 +20,11 @@ func Take(w http.ResponseWriter, r *http.Request) { queue := r.Header.Get("queue") task, err := tasks.Take(queue) if err != nil { + println("Error taking") w.WriteHeader(http.StatusInternalServerError) return } + var response TakeResponse if task == nil { response.Task = nil @@ -33,10 +35,13 @@ func Take(w http.ResponseWriter, r *http.Request) { Payload: task.Payload, } } + data, err := json.Marshal(response) if err != nil { + println("Error marshalling") w.WriteHeader(http.StatusInternalServerError) return } + w.Write(data) } diff --git a/app/storage/mongo/collections/tasks.go b/app/storage/mongo/collections/tasks.go index a479d4c..a0733b2 100644 --- a/app/storage/mongo/collections/tasks.go +++ b/app/storage/mongo/collections/tasks.go @@ -23,13 +23,13 @@ type Task struct { } type InsertedTask struct { - Queue string `bson:"queue"` - Payload interface{} `bson:"payload"` - PutAt time.Time `bson:"put_at"` - AvailableFrom time.Time `bson:"available_from"` - SecondsToExecute int `bson:"seconds_to_execute"` - TakenAt *time.Time `bson:"taken_at"` - Attempts int `bson:"attempts"` + Queue string `bson:"queue"` + Payload bson.M `bson:"payload"` + PutAt time.Time `bson:"put_at"` + AvailableFrom time.Time `bson:"available_from"` + SecondsToExecute int `bson:"seconds_to_execute"` + TakenAt *time.Time `bson:"taken_at"` + Attempts int `bson:"attempts"` } func Add(task InsertedTask) error { @@ -64,6 +64,8 @@ func Take(queue string) (*Task, error) { } _, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}}) if err != nil { + println("Error updaing") + println(err.Error()) return nil, errors.ErrInternalError } return task, nil @@ -78,12 +80,16 @@ func findTask(queue string, now time.Time) (*Task, error) { }, ) if err != nil { + println("Error find") + println(err.Error()) return nil, errors.ErrInternalError } var results []Task err = cursor.All(context.TODO(), &results) if err != nil { + println("Error all") + println(err.Error()) return nil, errors.ErrInternalError } diff --git a/queues-go b/queues-go deleted file mode 100755 index 4fd67a1..0000000 Binary files a/queues-go and /dev/null differ