monitoring/app/storage/clickhouse/client.go
Egor Matveev ea63cd794c
All checks were successful
Deploy Dev / Build (pull_request) Successful in 1m4s
Deploy Dev / Push (pull_request) Successful in 36s
Deploy Dev / Deploy dev (pull_request) Successful in 10s
Deploy Prod / Build (pull_request) Successful in 1m0s
Deploy Prod / Push (pull_request) Successful in 36s
Deploy Prod / Deploy prod (pull_request) Successful in 16s
fix
2025-06-21 12:04:27 +03:00

262 lines
5.4 KiB
Go

package storage
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
type EndpointMetric struct {
Timestamp time.Time `json:"timestamp"`
Service string `json:"service"`
Environment string `json:"environment"`
Endpoint string `json:"endpoint"`
StatusCode int `json:"status_code"`
ResponseTime int `json:"response_time"`
Method string `json:"method"`
}
type IncrementMetric struct {
Timestamp time.Time `json:"timestamp"`
Service string `json:"service"`
Environment string `json:"environment"`
Name string `json:"name"`
Count int `json:"count"`
}
type TaskMetric struct {
Timestamp time.Time `json:"timestamp"`
Service string `json:"service"`
Environment string `json:"environment"`
Queue string `json:"queue"`
Success bool `json:"success"`
ExecutionTimeMs int `json:"execution_time_ms"`
}
var Connection driver.Conn
var EndpointsCol []EndpointMetric
var EndpointsMutex sync.Mutex
var IncrementsCol []IncrementMetric
var IncrementsMutex sync.Mutex
var TasksCol []TaskMetric
var TasksMutex sync.Mutex
func Connect() error {
conn, err := connect()
if err != nil {
return err
}
Connection = *conn
EndpointsCol = make([]EndpointMetric, 0)
IncrementsCol = make([]IncrementMetric, 0)
TasksCol = make([]TaskMetric, 0)
EndpointsMutex = sync.Mutex{}
IncrementsMutex = sync.Mutex{}
TasksMutex = sync.Mutex{}
go pushMetrics()
return nil
}
func Close() {
Connection.Close()
}
func connect() (*driver.Conn, error) {
var (
ctx = context.Background()
conn, err = clickhouse.Open(&clickhouse.Options{
Addr: []string{"clickhouse:9000"},
Auth: clickhouse.Auth{
Database: "monitoring",
Username: "default",
Password: os.Getenv("CLICKHOUSE_PASSWORD"),
},
TLS: nil,
})
)
if err != nil {
return nil, err
}
if err := conn.Ping(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("Exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
}
return nil, err
}
return &conn, nil
}
func pushEndpoints() error {
if len(EndpointsCol) == 0 {
return nil
}
EndpointsMutex.Lock()
newCollection := EndpointsCol
EndpointsCol = make([]EndpointMetric, 0)
EndpointsMutex.Unlock()
batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO endpoints")
if err != nil {
return err
}
for _, metric := range newCollection {
err := batch.Append(
metric.Timestamp,
metric.Service,
metric.Environment,
metric.Endpoint,
metric.StatusCode,
metric.ResponseTime,
metric.Method,
)
if err != nil {
return err
}
}
return batch.Send()
}
func pushIncrements() error {
if len(IncrementsCol) == 0 {
return nil
}
IncrementsMutex.Lock()
newCollection := IncrementsCol
IncrementsCol = make([]IncrementMetric, 0)
IncrementsMutex.Unlock()
batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO increments")
if err != nil {
return err
}
for _, metric := range newCollection {
err := batch.Append(
metric.Timestamp,
metric.Service,
metric.Environment,
metric.Name,
metric.Count,
)
if err != nil {
return err
}
}
return batch.Send()
}
func pushTasks() error {
if len(IncrementsCol) == 0 {
return nil
}
TasksMutex.Lock()
newCollection := TasksCol
TasksCol = make([]TaskMetric, 0)
TasksMutex.Unlock()
batch, err := Connection.PrepareBatch(context.Background(), "INSERT INTO tasks")
if err != nil {
return err
}
for _, metric := range newCollection {
err := batch.Append(
metric.Timestamp,
metric.Service,
metric.Environment,
metric.Queue,
metric.Success,
metric.ExecutionTimeMs,
)
if err != nil {
return err
}
}
return batch.Send()
}
func pushMetrics() {
for {
pushEndpoints()
pushIncrements()
pushTasks()
time.Sleep(time.Second)
}
}
func Migrate() error {
err := Connection.Exec(
context.TODO(),
`CREATE TABLE IF NOT EXISTS endpoints (
timestamp DateTime,
service LowCardinality(String),
environment LowCardinality(String),
endpoint LowCardinality(String),
status_code UInt16,
response_time_ms UInt32,
method LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service, environment, endpoint, method, timestamp);`,
)
if err != nil {
log.Fatal(err)
return err
}
err = Connection.Exec(
context.TODO(),
`CREATE TABLE IF NOT EXISTS tasks (
timestamp DateTime,
service LowCardinality(String),
environment LowCardinality(String),
queue LowCardinality(String),
success Bool,
execution_time_ms UInt32
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service, environment, queue, timestamp);`,
)
if err != nil {
log.Fatal(err)
return err
}
err = Connection.Exec(
context.TODO(),
`CREATE TABLE IF NOT EXISTS increments (
timestamp DateTime,
service LowCardinality(String),
environment LowCardinality(String),
name LowCardinality(String),
count UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (service, environment, name, timestamp);`,
)
if err != nil {
log.Fatal(err)
return err
}
return nil
}