From ea63cd794cacf57790a2510b557cfe5fa85b0fd3 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sat, 21 Jun 2025 12:04:27 +0300 Subject: [PATCH] fix --- app/routers/metrics/endpoint.go | 9 +- app/routers/metrics/increment.go | 5 +- app/routers/metrics/task.go | 5 +- app/storage/clickhouse/client.go | 147 ++++++++++++++++++++ app/storage/clickhouse/tables/endpoints.go | 42 +----- app/storage/clickhouse/tables/increments.go | 33 +---- app/storage/clickhouse/tables/tasks.go | 35 +---- main.go | 5 +- 8 files changed, 173 insertions(+), 108 deletions(-) diff --git a/app/routers/metrics/endpoint.go b/app/routers/metrics/endpoint.go index 5ffee7d..71971ab 100644 --- a/app/routers/metrics/endpoint.go +++ b/app/routers/metrics/endpoint.go @@ -3,23 +3,24 @@ package routers import ( "encoding/json" "log" + client "monitoring/app/storage/clickhouse" endpoints "monitoring/app/storage/clickhouse/tables" "net/http" ) -func AddEndpointMetric (r *http.Request) (interface{}, int) { +func AddEndpointMetric(r *http.Request) (interface{}, int) { d := json.NewDecoder(r.Body) - body := endpoints.EndpointMetric{} + body := client.EndpointMetric{} err := d.Decode(&body) if err != nil { return nil, http.StatusBadRequest } - err = endpoints.AddEndpointMetric(body) + endpoints.AddEndpointMetric(body) if err != nil { log.Print(err.Error()) return nil, http.StatusInternalServerError } return nil, http.StatusAccepted -} \ No newline at end of file +} diff --git a/app/routers/metrics/increment.go b/app/routers/metrics/increment.go index 5e3502b..a716736 100644 --- a/app/routers/metrics/increment.go +++ b/app/routers/metrics/increment.go @@ -3,19 +3,20 @@ package routers import ( "encoding/json" "log" + client "monitoring/app/storage/clickhouse" increments "monitoring/app/storage/clickhouse/tables" "net/http" ) func AddIncrementMetric(r *http.Request) (interface{}, int) { d := json.NewDecoder(r.Body) - body := increments.IncrementMetric{} + body := client.IncrementMetric{} err := d.Decode(&body) if err != nil { return nil, http.StatusBadRequest } - err = increments.AddIncrementMetric(body) + increments.AddIncrementMetric(body) if err != nil { log.Print(err.Error()) return nil, http.StatusInternalServerError diff --git a/app/routers/metrics/task.go b/app/routers/metrics/task.go index 409bfcb..2fd2ee1 100644 --- a/app/routers/metrics/task.go +++ b/app/routers/metrics/task.go @@ -3,19 +3,20 @@ package routers import ( "encoding/json" "log" + client "monitoring/app/storage/clickhouse" tasks "monitoring/app/storage/clickhouse/tables" "net/http" ) func AddTaskMetric(r *http.Request) (interface{}, int) { d := json.NewDecoder(r.Body) - body := tasks.TaskMetric{} + body := client.TaskMetric{} err := d.Decode(&body) if err != nil { return nil, http.StatusBadRequest } - err = tasks.AddTaskMetric(body) + tasks.AddTaskMetric(body) if err != nil { log.Print(err.Error()) return nil, http.StatusInternalServerError diff --git a/app/storage/clickhouse/client.go b/app/storage/clickhouse/client.go index f2f067e..e62a979 100644 --- a/app/storage/clickhouse/client.go +++ b/app/storage/clickhouse/client.go @@ -5,12 +5,47 @@ import ( "fmt" "log" "os" + "sync" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) +type EndpointMetric struct { + Timestamp time.Time `json:"timestamp"` + Service string `json:"service"` + Environment string `json:"environment"` + Endpoint string `json:"endpoint"` + StatusCode int `json:"status_code"` + ResponseTime int `json:"response_time"` + Method string `json:"method"` +} + +type IncrementMetric struct { + Timestamp time.Time `json:"timestamp"` + Service string `json:"service"` + Environment string `json:"environment"` + Name string `json:"name"` + Count int `json:"count"` +} + +type TaskMetric struct { + Timestamp time.Time `json:"timestamp"` + Service string `json:"service"` + Environment string `json:"environment"` + Queue string `json:"queue"` + Success bool `json:"success"` + ExecutionTimeMs int `json:"execution_time_ms"` +} + var Connection driver.Conn +var EndpointsCol []EndpointMetric +var EndpointsMutex sync.Mutex +var IncrementsCol []IncrementMetric +var IncrementsMutex sync.Mutex +var TasksCol []TaskMetric +var TasksMutex sync.Mutex func Connect() error { conn, err := connect() @@ -18,6 +53,13 @@ func Connect() error { return err } Connection = *conn + EndpointsCol = make([]EndpointMetric, 0) + IncrementsCol = make([]IncrementMetric, 0) + TasksCol = make([]TaskMetric, 0) + EndpointsMutex = sync.Mutex{} + IncrementsMutex = sync.Mutex{} + TasksMutex = sync.Mutex{} + go pushMetrics() return nil } @@ -52,6 +94,111 @@ func connect() (*driver.Conn, error) { return &conn, nil } +func pushEndpoints() error { + if len(EndpointsCol) == 0 { + return nil + } + + EndpointsMutex.Lock() + newCollection := EndpointsCol + EndpointsCol = make([]EndpointMetric, 0) + EndpointsMutex.Unlock() + + batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO endpoints") + if err != nil { + return err + } + + for _, metric := range newCollection { + err := batch.Append( + metric.Timestamp, + metric.Service, + metric.Environment, + metric.Endpoint, + metric.StatusCode, + metric.ResponseTime, + metric.Method, + ) + if err != nil { + return err + } + } + + return batch.Send() +} + +func pushIncrements() error { + if len(IncrementsCol) == 0 { + return nil + } + + IncrementsMutex.Lock() + newCollection := IncrementsCol + IncrementsCol = make([]IncrementMetric, 0) + IncrementsMutex.Unlock() + + batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO increments") + if err != nil { + return err + } + + for _, metric := range newCollection { + err := batch.Append( + metric.Timestamp, + metric.Service, + metric.Environment, + metric.Name, + metric.Count, + ) + if err != nil { + return err + } + } + + return batch.Send() +} + +func pushTasks() error { + if len(IncrementsCol) == 0 { + return nil + } + + TasksMutex.Lock() + newCollection := TasksCol + TasksCol = make([]TaskMetric, 0) + TasksMutex.Unlock() + + batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO tasks") + if err != nil { + return err + } + + for _, metric := range newCollection { + err := batch.Append( + metric.Timestamp, + metric.Service, + metric.Environment, + metric.Queue, + metric.Success, + metric.ExecutionTimeMs, + ) + if err != nil { + return err + } + } + + return batch.Send() +} + +func pushMetrics() { + for { + pushEndpoints() + pushIncrements() + pushTasks() + time.Sleep(time.Second) + } +} + func Migrate() error { err := Connection.Exec( context.TODO(), diff --git a/app/storage/clickhouse/tables/endpoints.go b/app/storage/clickhouse/tables/endpoints.go index 530b11e..0f1fd76 100644 --- a/app/storage/clickhouse/tables/endpoints.go +++ b/app/storage/clickhouse/tables/endpoints.go @@ -1,45 +1,11 @@ package storage import ( - "context" client "monitoring/app/storage/clickhouse" - "time" - - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) -type EndpointMetric struct { - Timestamp time.Time `json:"timestamp"` - Service string `json:"service"` - Environment string `json:"environment"` - Endpoint string `json:"endpoint"` - StatusCode int `json:"status_code"` - ResponseTime int `json:"response_time"` - Method string `json:"method"` -} - -func AddEndpointMetric(metric EndpointMetric) error { - batch, err := connection().PrepareBatch(context.Background(), "INSERT INTO endpoints") - if err != nil { - return err - } - - err = batch.Append( - metric.Timestamp, - metric.Service, - metric.Environment, - metric.Endpoint, - metric.StatusCode, - metric.ResponseTime, - metric.Method, - ) - if err != nil { - return err - } - - return batch.Send() -} - -func connection() driver.Conn { - return client.Connection +func AddEndpointMetric(metric client.EndpointMetric) { + client.EndpointsMutex.Lock() + defer client.EndpointsMutex.Unlock() + client.EndpointsCol = append(client.EndpointsCol, metric) } diff --git a/app/storage/clickhouse/tables/increments.go b/app/storage/clickhouse/tables/increments.go index 4f0123b..6613b14 100644 --- a/app/storage/clickhouse/tables/increments.go +++ b/app/storage/clickhouse/tables/increments.go @@ -1,34 +1,11 @@ package storage import ( - "context" - "time" + client "monitoring/app/storage/clickhouse" ) -type IncrementMetric struct { - Timestamp time.Time `json:"timestamp"` - Service string `json:"service"` - Environment string `json:"environment"` - Name string `json:"name"` - Count int `json:"count"` -} - -func AddIncrementMetric(metric IncrementMetric) error { - batch, err := connection().PrepareBatch(context.Background(), "INSERT INTO increments") - if err != nil { - return err - } - - err = batch.Append( - metric.Timestamp, - metric.Service, - metric.Environment, - metric.Name, - metric.Count, - ) - if err != nil { - return err - } - - return batch.Send() +func AddIncrementMetric(metric client.IncrementMetric) { + client.IncrementsMutex.Lock() + defer client.IncrementsMutex.Unlock() + client.IncrementsCol = append(client.IncrementsCol, metric) } diff --git a/app/storage/clickhouse/tables/tasks.go b/app/storage/clickhouse/tables/tasks.go index 8da7ada..5d2d593 100644 --- a/app/storage/clickhouse/tables/tasks.go +++ b/app/storage/clickhouse/tables/tasks.go @@ -1,36 +1,11 @@ package storage import ( - "context" - "time" + client "monitoring/app/storage/clickhouse" ) -type TaskMetric struct { - Timestamp time.Time `json:"timestamp"` - Service string `json:"service"` - Environment string `json:"environment"` - Queue string `json:"queue"` - Success bool `json:"success"` - ExecutionTimeMs int `json:"execution_time_ms"` -} - -func AddTaskMetric(metric TaskMetric) error { - batch, err := connection().PrepareBatch(context.Background(), "INSERT INTO tasks") - if err != nil { - return err - } - - err = batch.Append( - metric.Timestamp, - metric.Service, - metric.Environment, - metric.Queue, - metric.Success, - metric.ExecutionTimeMs, - ) - if err != nil { - return err - } - - return batch.Send() +func AddTaskMetric(metric client.TaskMetric) { + client.TasksMutex.Lock() + defer client.TasksMutex.Unlock() + client.TasksCol = append(client.TasksCol, metric) } diff --git a/main.go b/main.go index e8ed6f7..a13c282 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( ) func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseTime int, method string) { - err := endpoints.AddEndpointMetric(endpoints.EndpointMetric{ + endpoints.AddEndpointMetric(client.EndpointMetric{ Timestamp: timestamp.Add(time.Hour * 3), Service: "monitoring", Environment: os.Getenv("STAGE"), @@ -23,9 +23,6 @@ func writeMetric(timestamp time.Time, endpoint string, statusCode int, responseT ResponseTime: responseTime, Method: method, }) - if err != nil { - log.Printf("Error sending metrics %s", err.Error()) - } } func handlerWrapper(f func(*http.Request) (interface{}, int)) func(http.ResponseWriter, *http.Request) {