1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::{path::Path, sync::Arc};

use brontes_database::{libmdbx::LibmdbxInit, Tables};
use brontes_types::{db::cex::CexExchange, init_thread_pools};
use clap::Parser;
use indicatif::MultiProgress;
use itertools::Itertools;

use crate::{
    cli::{get_env_vars, get_tracing_provider, load_clickhouse, load_database, static_object},
    runner::CliContext,
};

#[derive(Debug, Parser)]
pub struct Init {
    /// Initialize the local Libmdbx DB
    #[arg(long, short)]
    pub init_libmdbx:              bool,
    /// Libmdbx tables to initialize:
    ///     TokenDecimals
    ///     AddressToTokens
    ///     AddressToProtocol
    ///     CexPrice
    ///     Metadata
    ///     PoolState
    ///     DexPrice
    ///     CexTrades
    #[arg(long, short, requires = "init_libmdbx", value_delimiter = ',')]
    pub tables_to_init:            Option<Vec<Tables>>,
    /// The sliding time window (BEFORE) for cex quotes relative to the block
    /// time
    #[arg(long = "price-tw-before", default_value_t = 3)]
    pub quotes_time_window_before: u64,
    /// The sliding time window (AFTER) for cex quotes relative to the block
    /// time
    #[arg(long = "price-tw-after", default_value_t = 3)]
    pub quotes_time_window_after:  u64,
    /// The sliding time window (BEFORE) for cex trades relative to the block
    /// number
    #[arg(long = "trades-tw-before", default_value_t = 3)]
    pub trades_time_window_before: u64,
    /// The sliding time window (AFTER) for cex trades relative to the block
    /// number
    #[arg(long = "trades-tw-after", default_value_t = 3)]
    pub trades_time_window_after:  u64,
    /// Centralized exchanges that the cex-dex inspector will consider
    #[arg(
        long,
        short,
        default_value = "Binance,Coinbase,Okex,BybitSpot,Kucoin",
        value_delimiter = ','
    )]
    pub cex_exchanges:             Vec<CexExchange>,
    /// Start Block to download metadata from Sorella's MEV DB
    #[arg(long, short)]
    pub start_block:               Option<u64>,
    /// End Block to download metadata from Sorella's MEV DB
    #[arg(long, short)]
    pub end_block:                 Option<u64>,
    /// Download Dex Prices from Sorella's MEV DB for the given block range. If
    /// false it will run the dex pricing locally using raw on-chain data
    #[arg(long, short, default_value = "false")]
    pub download_dex_pricing:      bool,
}

impl Init {
    pub async fn execute(self, brontes_db_path: String, ctx: CliContext) -> eyre::Result<()> {
        let db_path = get_env_vars()?;

        init_thread_pools(10);
        let task_executor = ctx.task_executor;

        let libmdbx =
            static_object(load_database(&task_executor, brontes_db_path, None, None).await?);
        let clickhouse = static_object(load_clickhouse(Default::default(), None).await?);

        let tracer = Arc::new(get_tracing_provider(Path::new(&db_path), 10, task_executor.clone()));

        if self.init_libmdbx {
            // currently inits all tables
            let range = if let (Some(start), Some(end)) = (self.start_block, self.end_block) {
                Some((start, end))
            } else {
                None
            };

            task_executor
                .spawn_critical("init", async move {
                    let tables = Tables::ALL.to_vec();

                    let multi = MultiProgress::default();
                    let tables_with_progress = Arc::new(
                        tables
                            .clone()
                            .into_iter()
                            .map(|table| {
                                (table, table.build_init_state_progress_bar(&multi, 1000000000))
                            })
                            .collect_vec(),
                    );

                    futures::future::join_all(
                        self.tables_to_init
                            .unwrap_or(tables)
                            .into_iter()
                            .map(|table| {
                                let tracer = tracer.clone();
                                let tables_with_progress = tables_with_progress.clone();
                                async move {
                                    libmdbx
                                        .initialize_table(
                                            clickhouse,
                                            tracer,
                                            table,
                                            false,
                                            range,
                                            tables_with_progress,
                                            true,
                                        )
                                        .await
                                        .unwrap();
                                }
                            }),
                    )
                    .await;
                })
                .await
                .unwrap();
        }

        Ok(())
    }
}