From 4d2e0198f37870ba93a84d243024f16448525c59 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 23 Mar 2025 11:30:46 +0300 Subject: [PATCH 1/3] fix --- app/storage/mongo/collections/tasks.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/storage/mongo/collections/tasks.go b/app/storage/mongo/collections/tasks.go index 2a29c7f..05c521c 100644 --- a/app/storage/mongo/collections/tasks.go +++ b/app/storage/mongo/collections/tasks.go @@ -71,7 +71,7 @@ func Take(queue string) (*Task, error) { "taken_at": now, "attempts": task.Attempts + 1, "available_from": now.Add( - time.Duration(task.SecondsToExecute) * time.Second, + time.Duration(task.SecondsToExecute+task.Attempts) * time.Second, ), }, }, @@ -91,7 +91,7 @@ func findTask(queue string, now time.Time) (*Task, error) { "queue": queue, "available_from": bson.M{"$lte": now}, }, - options.Find().SetLimit(1), + options.Find().SetSort(bson.D{{Key: "available_from", Value: 1}}).SetLimit(1), ) if err != nil { println("Error find") From ff040e1a6d482cbc3f108ecb986b9cb8450ce693 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sat, 14 Jun 2025 04:19:59 +0300 Subject: [PATCH 2/3] fix --- .deploy/deploy-dev.yaml | 4 ++++ .deploy/deploy-prod.yaml | 4 ++++ main.go | 23 +++++++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 81fabf6..54393d7 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -6,9 +6,11 @@ services: image: mathwave/sprint-repo:queues networks: - queues-development + - monitoring environment: MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV + STAGE: "development" deploy: mode: replicated restart_policy: @@ -22,3 +24,5 @@ services: networks: queues-development: external: true + monitoring: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 60d8d2a..3426f08 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -6,9 +6,11 @@ services: image: mathwave/sprint-repo:queues networks: - queues + - monitoring environment: MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD + STAGE: "production" deploy: mode: replicated restart_policy: @@ -22,3 +24,5 @@ services: networks: queues: external: true + monitoring: + external: true diff --git a/main.go b/main.go index 1f9650e..ed4c9d4 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,37 @@ package main import ( + "bytes" "encoding/json" + "fmt" "log" "net/http" + "os" "queues-go/app/routers" client "queues-go/app/storage/mongo" + "strconv" "sync" "time" ) +func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) { + s := fmt.Sprintf( + `{"timestamp":"%s","service":"queues","environment":"%s","endpoint":"%s","status_code":%s,"response_time":%s,"method":"%s"}`, + timestamp.Format("2006-01-02T15:04:05Z"), + os.Getenv("STAGE"), + endpoint, + strconv.Itoa(statusCode), + strconv.Itoa(responseTime), + method, + ) + data := []byte(s) + r := bytes.NewReader(data) + _, err := http.Post("http://monitoring:1237/api/v1/metrics/endpoint", "application/json", r) + if err != nil { + log.Printf("Error sending metrics %s", err.Error()) + } +} + func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { start := time.Now() @@ -25,6 +47,7 @@ func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.Response } else { w.WriteHeader(status) } + go writeMetric(start, r.URL.Path, status, int(time.Since(start).Milliseconds()), r.Method) log.Printf("%s %d %s", r.URL, status, time.Since(start)) } } From 6567a1dc354d175d3446ffe24a6554fa0940add8 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 00:56:54 +0300 Subject: [PATCH 3/3] fix --- main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index ed4c9d4..c7d2fdb 100644 --- a/main.go +++ b/main.go @@ -15,9 +15,10 @@ import ( ) func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) { + loc, _ := time.LoadLocation("Europe/Moscow") s := fmt.Sprintf( `{"timestamp":"%s","service":"queues","environment":"%s","endpoint":"%s","status_code":%s,"response_time":%s,"method":"%s"}`, - timestamp.Format("2006-01-02T15:04:05Z"), + timestamp.In(loc).Format("2006-01-02T15:04:05Z"), os.Getenv("STAGE"), endpoint, strconv.Itoa(statusCode),