use std::time::{Duration, Instant};
use prometheus::{Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec};
use reth_interfaces::db::DatabaseError;
#[derive(Clone)]
pub struct LibmdbxWriterMetrics {
initialized_blocks: IntGaugeVec,
commit_latency: HistogramVec,
write_latency: HistogramVec,
write_latency_batch: Histogram,
write_errors: IntCounterVec,
write_error_types: IntCounterVec,
queue_size: IntGauge,
}
impl Default for LibmdbxWriterMetrics {
fn default() -> Self {
Self::new()
}
}
impl LibmdbxWriterMetrics {
pub fn new() -> Self {
let initialized_blocks = prometheus::register_int_gauge_vec!(
"libmdbx_initialized_blocks",
"Number of initialized blocks for each table",
&["table"]
)
.unwrap();
let commit_latency = prometheus::register_histogram_vec!(
"libmdbx_commit_latency_ms",
"Time taken from receiving data to completing the write operation",
&["msg_type"],
prometheus::exponential_buckets(0.001, 2.0, 25).unwrap()
)
.unwrap();
let write_latency = prometheus::register_histogram_vec!(
"libmdbx_write_latency_ms",
"Latency of a single-element write operation",
&["table"],
prometheus::exponential_buckets(0.001, 2.0, 25).unwrap()
)
.unwrap();
let write_latency_batch = prometheus::register_histogram!(
"libmdbx_write_latency_batch_ms",
"Latency of a batch write operation",
prometheus::exponential_buckets(0.001, 2.0, 25).unwrap()
)
.unwrap();
let write_errors = prometheus::register_int_counter_vec!(
"libmdbx_write_errors",
"Number of write errors for each table",
&["table"]
)
.unwrap();
let write_error_types = prometheus::register_int_counter_vec!(
"libmdbx_write_error_types",
"Types of errors encountered during write operations",
&["table", "error_type"]
)
.unwrap();
let queue_size = prometheus::register_int_gauge!(
"libmdbx_queue_size",
"Current size of the write queue"
)
.unwrap();
Self {
initialized_blocks,
commit_latency,
write_latency,
write_latency_batch,
write_errors,
write_error_types,
queue_size,
}
}
pub fn increment_initialized_blocks(&self, table: &str, count: i64) {
self.initialized_blocks
.with_label_values(&[table])
.add(count);
}
pub fn observe_commit_latency(
&self,
msg_type: &str,
start_time: Instant,
end_time: Option<Instant>,
) {
let final_time = end_time.unwrap_or_else(Instant::now);
let t_total = final_time - start_time;
self.commit_latency
.with_label_values(&[msg_type])
.observe(t_total.as_secs_f64() * 1000_f64);
}
pub fn observe_write_latency(&self, table: &str, duration: Duration) {
self.write_latency
.with_label_values(&[table])
.observe(duration.as_secs_f64() * 1000_f64);
}
pub fn observe_write_latency_batch(&self, duration: Duration) {
self.write_latency_batch
.observe(duration.as_secs_f64() * 1000_f64);
}
pub fn increment_write_errors(&self, table: &str, error: &DatabaseError) {
self.write_errors.with_label_values(&[table]).inc();
let error_type = match error {
DatabaseError::Open(_) => "Open",
DatabaseError::CreateTable(_) => "CreateTable",
DatabaseError::Write(_) => "Write",
DatabaseError::Read(_) => "Read",
DatabaseError::Delete(_) => "Delete",
DatabaseError::Commit(_) => "Commit",
DatabaseError::InitTx(_) => "InitTx",
DatabaseError::InitCursor(_) => "InitCursor",
DatabaseError::Decode => "Decode",
DatabaseError::Stats(_) => "Stats",
DatabaseError::LogLevelUnavailable(_) => "LogLevelUnavailable",
};
self.write_error_types
.with_label_values(&[table, error_type])
.inc();
}
pub fn set_queue_size(&self, size: usize) {
let s = size.try_into().unwrap_or(i64::MAX);
self.queue_size.set(s);
}
}
#[derive(Clone)]
pub struct WriterMetrics(Option<LibmdbxWriterMetrics>);
impl WriterMetrics {
pub fn new(metrics: bool) -> Self {
if metrics {
Self(Some(LibmdbxWriterMetrics::new()))
} else {
Self(None)
}
}
pub fn increment_initialized_blocks(&self, table: &str, count: i64) {
if let Some(metrics) = &self.0 {
metrics.increment_initialized_blocks(table, count);
}
}
pub fn observe_commit_latency(
&self,
msg_type: &str,
start_time: Instant,
end_time: Option<Instant>,
) {
if let Some(metrics) = &self.0 {
metrics.observe_commit_latency(msg_type, start_time, end_time);
}
}
pub fn observe_write_latency(&self, table: &str, duration: Duration) {
if let Some(metrics) = &self.0 {
metrics.observe_write_latency(table, duration);
}
}
pub fn observe_write_latency_batch(&self, duration: Duration) {
if let Some(metrics) = &self.0 {
metrics.observe_write_latency_batch(duration);
}
}
pub fn increment_write_errors(&self, table: &str, error: &DatabaseError) {
if let Some(metrics) = &self.0 {
metrics.increment_write_errors(table, error);
}
}
pub fn set_queue_size(&self, size: usize) {
if let Some(metrics) = &self.0 {
metrics.set_queue_size(size);
}
}
}