1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
use std::{path::Path, sync::Arc};
use brontes_database::{libmdbx::LibmdbxInit, Tables};
use brontes_types::{db::cex::CexExchange, init_thread_pools};
use clap::Parser;
use indicatif::MultiProgress;
use itertools::Itertools;
use crate::{
cli::{get_env_vars, get_tracing_provider, load_clickhouse, load_database, static_object},
runner::CliContext,
};
#[derive(Debug, Parser)]
pub struct Init {
/// Initialize the local Libmdbx DB
#[arg(long, short)]
pub init_libmdbx: bool,
/// Libmdbx tables to initialize:
/// TokenDecimals
/// AddressToTokens
/// AddressToProtocol
/// CexPrice
/// Metadata
/// PoolState
/// DexPrice
/// CexTrades
#[arg(long, short, requires = "init_libmdbx", value_delimiter = ',')]
pub tables_to_init: Option<Vec<Tables>>,
/// The sliding time window (BEFORE) for cex quotes relative to the block
/// time
#[arg(long = "price-tw-before", default_value_t = 3)]
pub quotes_time_window_before: u64,
/// The sliding time window (AFTER) for cex quotes relative to the block
/// time
#[arg(long = "price-tw-after", default_value_t = 3)]
pub quotes_time_window_after: u64,
/// The sliding time window (BEFORE) for cex trades relative to the block
/// number
#[arg(long = "trades-tw-before", default_value_t = 3)]
pub trades_time_window_before: u64,
/// The sliding time window (AFTER) for cex trades relative to the block
/// number
#[arg(long = "trades-tw-after", default_value_t = 3)]
pub trades_time_window_after: u64,
/// Centralized exchanges that the cex-dex inspector will consider
#[arg(
long,
short,
default_value = "Binance,Coinbase,Okex,BybitSpot,Kucoin",
value_delimiter = ','
)]
pub cex_exchanges: Vec<CexExchange>,
/// Start Block to download metadata from Sorella's MEV DB
#[arg(long, short)]
pub start_block: Option<u64>,
/// End Block to download metadata from Sorella's MEV DB
#[arg(long, short)]
pub end_block: Option<u64>,
/// Download Dex Prices from Sorella's MEV DB for the given block range. If
/// false it will run the dex pricing locally using raw on-chain data
#[arg(long, short, default_value = "false")]
pub download_dex_pricing: bool,
}
impl Init {
pub async fn execute(self, brontes_db_path: String, ctx: CliContext) -> eyre::Result<()> {
let db_path = get_env_vars()?;
init_thread_pools(10);
let task_executor = ctx.task_executor;
let libmdbx =
static_object(load_database(&task_executor, brontes_db_path, None, None).await?);
let clickhouse = static_object(load_clickhouse(Default::default(), None).await?);
let tracer = Arc::new(get_tracing_provider(Path::new(&db_path), 10, task_executor.clone()));
if self.init_libmdbx {
// currently inits all tables
let range = if let (Some(start), Some(end)) = (self.start_block, self.end_block) {
Some((start, end))
} else {
None
};
task_executor
.spawn_critical("init", async move {
let tables = Tables::ALL.to_vec();
let multi = MultiProgress::default();
let tables_with_progress = Arc::new(
tables
.clone()
.into_iter()
.map(|table| {
(table, table.build_init_state_progress_bar(&multi, 1000000000))
})
.collect_vec(),
);
futures::future::join_all(
self.tables_to_init
.unwrap_or(tables)
.into_iter()
.map(|table| {
let tracer = tracer.clone();
let tables_with_progress = tables_with_progress.clone();
async move {
libmdbx
.initialize_table(
clickhouse,
tracer,
table,
false,
range,
tables_with_progress,
true,
)
.await
.unwrap();
}
}),
)
.await;
})
.await
.unwrap();
}
Ok(())
}
}