diff --git a/app/routers/take.go b/app/routers/take.go index 1e5bd02..e2477ba 100644 --- a/app/routers/take.go +++ b/app/routers/take.go @@ -1,9 +1,15 @@ package routers import ( + "bytes" + "fmt" + "log" "net/http" + "os" tasks "queues-go/app/storage/mongo/collections" + "strconv" "sync" + "time" ) type TaskResponse struct { @@ -18,6 +24,24 @@ type TakeResponse struct { var MutexMap map[string]*sync.Mutex +func sendLatency(timestamp time.Time, latency int) error { + loc, _ := time.LoadLocation("Europe/Moscow") + + s := fmt.Sprintf( + `{"timestamp":"%s","service":"queues","environment":"%s","name":"latency","count":%s}`, + timestamp.In(loc).Format("2006-01-02T15:04:05Z"), + os.Getenv("STAGE"), + strconv.Itoa(latency), + ) + data := []byte(s) + r := bytes.NewReader(data) + _, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r) + if err != nil { + log.Printf("ERROR %s", err.Error()) + } + return err +} + func Take(r *http.Request) (interface{}, int) { queue := r.Header.Get("queue") mutex, ok := MutexMap[queue] @@ -41,6 +65,8 @@ func Take(r *http.Request) (interface{}, int) { Attempt: task.Attempts, Payload: task.Payload, } + now := time.Now() + go sendLatency(now, int(now.Sub(task.PutAt).Milliseconds())) } return response, http.StatusOK