1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#[cfg(not(feature = "local-clickhouse"))]
use std::env;
use std::fmt::Debug;

use ::clickhouse::DbRow;
use serde::Deserialize;

#[cfg(feature = "local-clickhouse")]
use crate::clickhouse::Clickhouse;
use crate::{
    clickhouse::ClickhouseHandle,
    libmdbx::{
        implementation::compressed_wrappers::utils::CompressedTableRow, types::LibmdbxData,
        LibmdbxReadWriter,
    },
    CompressedTable,
};
#[cfg(feature = "local-clickhouse")]
pub async fn load_clickhouse() -> Clickhouse {
    Clickhouse::new_default(None).await
}

#[cfg(not(feature = "local-clickhouse"))]
pub async fn load_clickhouse() -> crate::clickhouse::ClickhouseHttpClient {
    let clickhouse_api = env::var("CLICKHOUSE_API").expect("No CLICKHOUSE_API in .env");
    let clickhouse_api_key = env::var("CLICKHOUSE_API_KEY").ok();
    crate::clickhouse::ClickhouseHttpClient::new(clickhouse_api, clickhouse_api_key).await
}

pub async fn clickhouse_data<T, D, CH: ClickhouseHandle>(
    clickhouse: &CH,
    block_range: Option<(u64, u64)>,
) -> eyre::Result<Vec<CompressedTableRow<T>>>
where
    T: CompressedTable,
    T::Value: From<T::DecompressedValue> + Into<T::DecompressedValue>,
    D: LibmdbxData<T> + DbRow + for<'de> Deserialize<'de> + Send + Sync + Debug + Unpin + 'static,
{
    let data = if let Some(rang) = block_range {
        clickhouse.query_many_range::<T, D>(rang.0, rang.1).await?
    } else {
        clickhouse.query_many::<T, D>().await?
    };

    let table_row = data
        .into_iter()
        .map(|val| {
            let key_val = val.into_key_val();
            CompressedTableRow(key_val.key, key_val.value)
        })
        .collect();

    Ok(table_row)
}

pub async fn clickhouse_arbitrary_data<T, D, CH: ClickhouseHandle>(
    clickhouse: &CH,
    block_range: &'static [u64],
) -> eyre::Result<Vec<CompressedTableRow<T>>>
where
    T: CompressedTable,
    T::Value: From<T::DecompressedValue> + Into<T::DecompressedValue>,
    D: LibmdbxData<T> + DbRow + for<'de> Deserialize<'de> + Send + Sync + Debug + Unpin + 'static,
{
    let data = clickhouse.query_many_arbitrary::<T, D>(block_range).await?;

    let table_row = data
        .into_iter()
        .map(|val| {
            let key_val = val.into_key_val();
            CompressedTableRow(key_val.key, key_val.value)
        })
        .collect();

    Ok(table_row)
}

pub async fn compare_clickhouse_libmdbx_data<T, D, CH: ClickhouseHandle>(
    _clickhouse: &CH,
    libmdbx: &LibmdbxReadWriter,
    _block_range: Option<(u64, u64)>,
) -> eyre::Result<()>
where
    T: CompressedTable,
    T::Value: From<T::DecompressedValue> + Into<T::DecompressedValue>,
    D: LibmdbxData<T> + DbRow + for<'de> Deserialize<'de> + Send + Sync + Debug + Unpin + 'static,
{
    let _clickhouse_data = clickhouse_data::<T, D, CH>(_clickhouse, _block_range).await?;

    let tx = libmdbx.db.ro_tx()?;
    let _libmdbx_data = tx
        .cursor_read::<T>()?
        .walk_range(..)?
        .collect::<Result<Vec<_>, _>>()?;

    Ok(())
}

pub async fn compare_clickhouse_libmdbx_arbitrary_data<T, D, CH: ClickhouseHandle>(
    _clickhouse: &CH,
    libmdbx: &LibmdbxReadWriter,
    _block_range: &'static [u64],
) -> eyre::Result<()>
where
    T: CompressedTable,
    T::Value: From<T::DecompressedValue> + Into<T::DecompressedValue>,
    D: LibmdbxData<T> + DbRow + for<'de> Deserialize<'de> + Send + Sync + Debug + Unpin + 'static,
{
    let _clickhouse_data = clickhouse_arbitrary_data::<T, D, CH>(_clickhouse, _block_range).await?;

    let tx = libmdbx.db.ro_tx()?;
    let _libmdbx_data = tx
        .cursor_read::<T>()?
        .walk_range(..)?
        .collect::<Result<Vec<_>, _>>()?;

    Ok(())
}