1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::{sync::OnceLock, time::Instant};

use clickhouse::{error::Error, query};
use db_interfaces::{clickhouse::errors::ClickhouseError, errors::DatabaseError};
use eyre::Report;
use prometheus::{CounterVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec};

fn query_speed() -> &'static HistogramVec {
    static QUERY_SPEED: OnceLock<HistogramVec> = OnceLock::new();
    QUERY_SPEED.get_or_init(|| {
        prometheus::register_histogram_vec!(
            "initialization_query_speed_us",
            "Time taken for each query during initialization in microseconds",
            &["table", "block_count"]
        )
        .unwrap()
    })
}

fn query_errors() -> &'static IntCounterVec {
    static QUERY_ERRORS: OnceLock<IntCounterVec> = OnceLock::new();
    QUERY_ERRORS.get_or_init(|| {
        prometheus::register_int_counter_vec!(
            "initialization_query_errors",
            "Number of query errors for each table",
            &["table"]
        )
        .unwrap()
    })
}

fn query_error_types() -> &'static IntCounterVec {
    static QUERY_SPEED: OnceLock<IntCounterVec> = OnceLock::new();
    QUERY_SPEED.get_or_init(|| {
        prometheus::register_int_counter_vec!(
            "initialization_query_error_types",
            "Types of errors encountered during initialization queries",
            &["table", "error_type"]
        )
        .unwrap()
    })
}
#[derive(Clone)]
pub struct InitializationMetrics {
    query_speed:       &'static HistogramVec,
    query_errors:      &'static IntCounterVec,
    query_error_types: &'static IntCounterVec,
}

impl Default for InitializationMetrics {
    fn default() -> Self {
        Self::new()
    }
}

impl InitializationMetrics {
    pub fn new() -> Self {
        let buckets = prometheus::exponential_buckets(1.0, 2.0, 22).unwrap();
        let query_speed = query_speed();
        let query_errors = query_errors();
        let query_error_types = query_error_types();

        Self { query_speed, query_errors, query_error_types }
    }

    pub fn measure_query<R>(&self, table: &str, block_count: u64, f: impl FnOnce() -> R) -> R {
        let now = Instant::now();
        let res = f();
        let elapsed = now.elapsed().as_micros();
        self.query_speed
            .with_label_values(&[table, &block_count.to_string()])
            .observe(elapsed as f64);
        res
    }

    pub fn increment_query_errors(&self, table: &str, error: &Report) {
        self.query_errors.with_label_values(&[table]).inc();

        let error_type = self.categorize_error(error);
        self.query_error_types
            .with_label_values(&[table, &error_type])
            .inc();
    }

    fn categorize_error(&self, error: &Report) -> String {
        if let Some(db_error) = error.downcast_ref::<DatabaseError>() {
            match db_error {
                DatabaseError::ClickhouseError(ClickhouseError::ClickhouseNative(native_error)) => {
                    match native_error {
                        Error::InvalidParams(_) => "InvalidParams",
                        Error::Network(_) => "Network",
                        Error::Compression(_) => "Compression",
                        Error::Decompression(_) => "Decompression",
                        Error::RowNotFound => "RowNotFound",
                        Error::SequenceMustHaveLength => "SequenceMustHaveLength",
                        Error::DeserializeAnyNotSupported => "DeserializeAnyNotSupported",
                        Error::NotEnoughData => "NotEnoughData",
                        Error::InvalidUtf8Encoding(_) => "InvalidUtf8Encoding",
                        Error::InvalidTagEncoding(_) => "InvalidTagEncoding",
                        Error::Custom(_) => "Custom",
                        Error::BadResponse(_) => "BadResponse",
                        Error::TimedOut => "TimedOut",
                        Error::TooSmallBuffer(_) => "TooSmallBuffer",
                        _ => "OtherClickhouseNative",
                    }
                    .to_string()
                }
                DatabaseError::ClickhouseError(ClickhouseError::SqlFileReadError(_)) => {
                    "SqlFileReadError".to_string()
                }
                _ => "OtherDatabaseError".to_string(),
            }
        } else if error.to_string().contains("no block times found") {
            "EmptyBlockTimes".to_string()
        } else {
            "OtherError".to_string()
        }
    }
}

#[derive(Clone)]
pub struct InitMetrics(Option<InitializationMetrics>);

impl InitMetrics {
    pub fn new(metrics: bool) -> Self {
        if metrics {
            Self(Some(InitializationMetrics::new()))
        } else {
            Self(None)
        }
    }

    pub fn measure_query<R>(&self, table: &str, block_count: u64, f: impl FnOnce() -> R) -> R {
        if let Some(metrics) = &self.0 {
            metrics.measure_query(table, block_count, f)
        } else {
            f()
        }
    }

    pub fn increment_query_errors(&self, table: &str, error: &Report) {
        if let Some(metrics) = &self.0 {
            metrics.increment_query_errors(table, error);
        }
    }
}