master #26
@ -6,9 +6,11 @@ services:
|
||||
image: mathwave/sprint-repo:queues
|
||||
networks:
|
||||
- queues-development
|
||||
- monitoring
|
||||
environment:
|
||||
MONGO_HOST: "mongo.develop.sprinthub.ru"
|
||||
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
|
||||
STAGE: "development"
|
||||
deploy:
|
||||
mode: replicated
|
||||
restart_policy:
|
||||
@ -22,3 +24,5 @@ services:
|
||||
networks:
|
||||
queues-development:
|
||||
external: true
|
||||
monitoring:
|
||||
external: true
|
||||
|
@ -6,9 +6,11 @@ services:
|
||||
image: mathwave/sprint-repo:queues
|
||||
networks:
|
||||
- queues
|
||||
- monitoring
|
||||
environment:
|
||||
MONGO_HOST: "mongo.sprinthub.ru"
|
||||
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
|
||||
STAGE: "production"
|
||||
deploy:
|
||||
mode: replicated
|
||||
restart_policy:
|
||||
@ -22,3 +24,5 @@ services:
|
||||
networks:
|
||||
queues:
|
||||
external: true
|
||||
monitoring:
|
||||
external: true
|
||||
|
@ -71,7 +71,7 @@ func Take(queue string) (*Task, error) {
|
||||
"taken_at": now,
|
||||
"attempts": task.Attempts + 1,
|
||||
"available_from": now.Add(
|
||||
time.Duration(task.SecondsToExecute) * time.Second,
|
||||
time.Duration(task.SecondsToExecute+task.Attempts) * time.Second,
|
||||
),
|
||||
},
|
||||
},
|
||||
@ -91,7 +91,7 @@ func findTask(queue string, now time.Time) (*Task, error) {
|
||||
"queue": queue,
|
||||
"available_from": bson.M{"$lte": now},
|
||||
},
|
||||
options.Find().SetLimit(1),
|
||||
options.Find().SetSort(bson.D{{Key: "available_from", Value: 1}}).SetLimit(1),
|
||||
)
|
||||
if err != nil {
|
||||
println("Error find")
|
||||
|
24
main.go
24
main.go
@ -1,15 +1,38 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"queues-go/app/routers"
|
||||
client "queues-go/app/storage/mongo"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) {
|
||||
loc, _ := time.LoadLocation("Europe/Moscow")
|
||||
s := fmt.Sprintf(
|
||||
`{"timestamp":"%s","service":"queues","environment":"%s","endpoint":"%s","status_code":%s,"response_time":%s,"method":"%s"}`,
|
||||
timestamp.In(loc).Format("2006-01-02T15:04:05Z"),
|
||||
os.Getenv("STAGE"),
|
||||
endpoint,
|
||||
strconv.Itoa(statusCode),
|
||||
strconv.Itoa(responseTime),
|
||||
method,
|
||||
)
|
||||
data := []byte(s)
|
||||
r := bytes.NewReader(data)
|
||||
_, err := http.Post("http://monitoring:1237/api/v1/metrics/endpoint", "application/json", r)
|
||||
if err != nil {
|
||||
log.Printf("Error sending metrics %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
@ -25,6 +48,7 @@ func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.Response
|
||||
} else {
|
||||
w.WriteHeader(status)
|
||||
}
|
||||
go writeMetric(start, r.URL.Path, status, int(time.Since(start).Milliseconds()), r.Method)
|
||||
log.Printf("%s %d %s", r.URL, status, time.Since(start))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user