package routers import ( "bytes" "fmt" "log" "net/http" "os" tasks "queues-go/app/storage/mongo/collections" "strconv" "sync" "time" ) type TaskResponse struct { Id string `json:"id"` Attempt int `json:"attempt"` Payload interface{} `json:"payload"` } type TakeResponse struct { Task *TaskResponse `json:"task"` } var MutexMap map[string]*sync.Mutex func sendLatency(timestamp time.Time, latency int) error { loc, _ := time.LoadLocation("Europe/Moscow") s := fmt.Sprintf( `{"timestamp":"%s","service":"queues","environment":"%s","name":"latency","count":%s}`, timestamp.In(loc).Format("2006-01-02T15:04:05Z"), os.Getenv("STAGE"), strconv.Itoa(latency), ) data := []byte(s) r := bytes.NewReader(data) _, err := http.Post("http://monitoring:1237/api/v1/metrics/increment", "application/json", r) if err != nil { log.Printf("ERROR %s", err.Error()) } return err } func Take(r *http.Request) (interface{}, int) { queue := r.Header.Get("queue") mutex, ok := MutexMap[queue] if !ok { mutex = &sync.Mutex{} MutexMap[queue] = mutex } mutex.Lock() task, err := tasks.Take(queue) mutex.Unlock() if err != nil { return nil, http.StatusInternalServerError } var response TakeResponse if task == nil { response.Task = nil } else { response.Task = &TaskResponse{ Id: task.Id.Hex(), Attempt: task.Attempts, Payload: task.Payload, } now := time.Now() go sendLatency(now, int(now.Sub(task.PutAt).Milliseconds())) } return response, http.StatusOK }