Compare commits
No commits in common. "bf152e6d139512eb1903d4a7213a41f658ada12f" and "c2659fb49c8bbb9bac8ce726b89693c1351fb1a9" have entirely different histories.
bf152e6d13
...
c2659fb49c
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +0,0 @@
|
|||||||
queues-go
|
|
@ -1,6 +1,6 @@
|
|||||||
FROM golang:alpine
|
FROM golang:alpine
|
||||||
RUN mkdir /code
|
RUN mkdir /usr/src/app
|
||||||
WORKDIR /code
|
WORKDIR /usr/src/app
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN go build
|
RUN go build
|
||||||
ENTRYPOINT ["./queues-go"]
|
ENTRYPOINT ["./queues-go"]
|
@ -5,14 +5,12 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
tasks "queues-go/app/storage/mongo/collections"
|
tasks "queues-go/app/storage/mongo/collections"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PutRequestBody struct {
|
type PutRequestBody struct {
|
||||||
Payload bson.M `json:"payload"`
|
Payload json.RawMessage `json:"payload"`
|
||||||
SecondsToExecute int `json:"seconds_to_execute"`
|
SecondsToExecute int `json:"seconds_to_execute"`
|
||||||
Delay *int `json:"delay"`
|
Delay *int `json:"delay"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func Put(w http.ResponseWriter, r *http.Request) {
|
func Put(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -23,7 +21,6 @@ func Put(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.WriteHeader(http.StatusBadRequest)
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
queue := r.Header.Get("queue")
|
queue := r.Header.Get("queue")
|
||||||
var availableFrom time.Time
|
var availableFrom time.Time
|
||||||
if body.Delay == nil {
|
if body.Delay == nil {
|
||||||
@ -31,7 +28,6 @@ func Put(w http.ResponseWriter, r *http.Request) {
|
|||||||
} else {
|
} else {
|
||||||
availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay))
|
availableFrom = time.Now().Add(time.Second + time.Duration(*body.Delay))
|
||||||
}
|
}
|
||||||
|
|
||||||
task := tasks.InsertedTask{
|
task := tasks.InsertedTask{
|
||||||
Queue: queue,
|
Queue: queue,
|
||||||
Payload: body.Payload,
|
Payload: body.Payload,
|
||||||
@ -41,12 +37,10 @@ func Put(w http.ResponseWriter, r *http.Request) {
|
|||||||
TakenAt: nil,
|
TakenAt: nil,
|
||||||
Attempts: 0,
|
Attempts: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tasks.Add(task)
|
err = tasks.Add(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,9 @@ func Take(w http.ResponseWriter, r *http.Request) {
|
|||||||
queue := r.Header.Get("queue")
|
queue := r.Header.Get("queue")
|
||||||
task, err := tasks.Take(queue)
|
task, err := tasks.Take(queue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Error taking")
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var response TakeResponse
|
var response TakeResponse
|
||||||
if task == nil {
|
if task == nil {
|
||||||
response.Task = nil
|
response.Task = nil
|
||||||
@ -35,13 +33,10 @@ func Take(w http.ResponseWriter, r *http.Request) {
|
|||||||
Payload: task.Payload,
|
Payload: task.Payload,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := json.Marshal(response)
|
data, err := json.Marshal(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Error marshalling")
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Write(data)
|
w.Write(data)
|
||||||
}
|
}
|
||||||
|
@ -23,13 +23,13 @@ type Task struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type InsertedTask struct {
|
type InsertedTask struct {
|
||||||
Queue string `bson:"queue"`
|
Queue string `bson:"queue"`
|
||||||
Payload bson.M `bson:"payload"`
|
Payload interface{} `bson:"payload"`
|
||||||
PutAt time.Time `bson:"put_at"`
|
PutAt time.Time `bson:"put_at"`
|
||||||
AvailableFrom time.Time `bson:"available_from"`
|
AvailableFrom time.Time `bson:"available_from"`
|
||||||
SecondsToExecute int `bson:"seconds_to_execute"`
|
SecondsToExecute int `bson:"seconds_to_execute"`
|
||||||
TakenAt *time.Time `bson:"taken_at"`
|
TakenAt *time.Time `bson:"taken_at"`
|
||||||
Attempts int `bson:"attempts"`
|
Attempts int `bson:"attempts"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func Add(task InsertedTask) error {
|
func Add(task InsertedTask) error {
|
||||||
@ -64,8 +64,6 @@ func Take(queue string) (*Task, error) {
|
|||||||
}
|
}
|
||||||
_, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
|
_, err = collection().UpdateByID(context.TODO(), task.Id, bson.M{"$set": bson.M{"taken_at": now, "attempts": task.Attempts + 1}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Error updaing")
|
|
||||||
println(err.Error())
|
|
||||||
return nil, errors.ErrInternalError
|
return nil, errors.ErrInternalError
|
||||||
}
|
}
|
||||||
return task, nil
|
return task, nil
|
||||||
@ -80,16 +78,12 @@ func findTask(queue string, now time.Time) (*Task, error) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Error find")
|
|
||||||
println(err.Error())
|
|
||||||
return nil, errors.ErrInternalError
|
return nil, errors.ErrInternalError
|
||||||
}
|
}
|
||||||
|
|
||||||
var results []Task
|
var results []Task
|
||||||
err = cursor.All(context.TODO(), &results)
|
err = cursor.All(context.TODO(), &results)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
println("Error all")
|
|
||||||
println(err.Error())
|
|
||||||
return nil, errors.ErrInternalError
|
return nil, errors.ErrInternalError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user