1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
use std::{path::Path, time::Duration};

use brontes_core::decoding::Parser as DParser;
use brontes_database::clickhouse::cex_config::CexDownloadConfig;
use brontes_inspect::Inspectors;
use brontes_metrics::ParserMetricsListener;
use brontes_types::{
    constants::USDT_ADDRESS_STRING,
    db::cex::{trades::CexDexTradeConfig, CexExchange},
    db_write_trigger::{backup_server_heartbeat, start_hr_monitor, HeartRateMonitor},
    init_thread_pools, UnboundedYapperReceiver,
};
use clap::Parser;
use tokio::sync::mpsc::unbounded_channel;

use super::{determine_max_tasks, get_env_vars, load_clickhouse, load_database, static_object};
use crate::{
    banner::rain,
    cli::{get_tracing_provider, init_inspectors, load_tip_database},
    runner::CliContext,
    BrontesRunConfig, MevProcessor, RangeType,
};

const SECONDS_TO_US_FLOAT: f64 = 1_000_000.0;

#[derive(Debug, Parser)]
pub struct RunArgs {
    /// Optional Start Block, if omitted it will run at tip until killed
    #[arg(long, short)]
    pub start_block:          Option<u64>,
    /// Optional End Block, if omitted it will run historically & at tip until
    /// killed
    #[arg(long, short)]
    pub end_block:            Option<u64>,
    /// starts running at tip from where brontes was last left at.
    #[arg(long, default_value_t = false)]
    pub from_db_tip:          bool,
    /// Optional Multiple Ranges, format: "start1-end1 start2-end2 ..."
    /// Use this if you want to specify the exact, non continuous block ranges
    /// you want to run
    #[arg(long, num_args = 1.., value_delimiter = ' ')]
    pub ranges:               Option<Vec<String>>,
    /// Optional Max Tasks, if omitted it will default to 80% of the number of
    /// physical cores on your machine
    #[arg(long, short)]
    pub max_tasks:            Option<u64>,
    /// Optional minimum batch size
    #[arg(long, default_value = "500")]
    pub min_batch_size:       u64,
    /// Optional quote asset, if omitted it will default to USDT
    #[arg(long, short, default_value = USDT_ADDRESS_STRING)]
    pub quote_asset:          String,
    /// Inspectors to run. If omitted it defaults to running all inspectors
    #[arg(long, short, value_delimiter = ',')]
    pub inspectors:           Option<Vec<Inspectors>>,
    /// Time window arguments for cex data downloads
    #[clap(flatten)]
    pub time_window_args:     TimeWindowArgs,
    /// CEX exchanges to consider for cex-dex analysis
    #[arg(
        long,
        short,
        default_value = "Binance,Coinbase,Okex,BybitSpot,Kucoin",
        value_delimiter = ','
    )]
    pub cex_exchanges:        Vec<CexExchange>,
    /// Force DEX price calculation for every block, ignoring existing database
    /// values.
    #[arg(long, short, default_value = "false")]
    pub force_dex_pricing:    bool,
    /// Disables DEX pricing. Inspectors needing DEX prices will only calculate
    /// token PnL, not USD PnL, if DEX pricing is unavailable in the
    /// database.
    #[arg(long, default_value = "false")]
    pub force_no_dex_pricing: bool,
    /// Number of blocks to lag behind the chain tip when processing.
    #[arg(long, default_value = "10")]
    pub behind_tip:           u64,
    /// Legacy, run in CLI only mode (no TUI) - will output progress bars to
    /// stdout
    #[arg(long, default_value = "true")]
    pub cli_only:             bool,
    /// Export metrics
    #[arg(long, default_value = "false")]
    pub with_metrics:         bool,
    /// Wether or not to use a fallback server.
    #[arg(long, default_value_t = false)]
    pub enable_fallback:      bool,
    /// Address of the fallback server.
    /// Triggers database writes if the main connection fails, preventing data
    /// loss.
    #[arg(long)]
    pub fallback_server:      Option<String>,
    /// Set a custom run ID used when inserting data into the Clickhouse
    ///
    /// If omitted, the ID will be automatically incremented from the last run
    /// stored in the Clickhouse database.
    #[arg(long, short)]
    pub run_id:               Option<u64>,

    /// shows a cool display at startup
    #[arg(long, short, default_value_t = false)]
    pub waterfall: bool,
}

impl RunArgs {
    pub async fn execute(mut self, brontes_db_path: String, ctx: CliContext) -> eyre::Result<()> {
        self.check_proper_range()?;

        if self.waterfall {
            rain();
        }

        let snapshot_mode = !cfg!(feature = "local-clickhouse");
        tracing::info!(%snapshot_mode);

        // Fetch required environment variables.
        let reth_db_path = get_env_vars()?;
        tracing::info!(target: "brontes", "got env vars");
        let quote_asset = self.quote_asset.parse()?;
        tracing::info!(target: "brontes", "parsed quote asset");
        let task_executor = ctx.task_executor;

        let max_tasks = determine_max_tasks(self.max_tasks);
        init_thread_pools(max_tasks as usize);

        let (metrics_tx, metrics_rx) = unbounded_channel();
        let metrics_listener = ParserMetricsListener::new(UnboundedYapperReceiver::new(
            metrics_rx,
            10_000,
            "metrics".to_string(),
        ));

        task_executor.spawn_critical("metrics", metrics_listener);

        let hr = self.try_start_fallback_server().await;

        tracing::info!(target: "brontes", "starting database initialization at: '{}'", brontes_db_path);
        let libmdbx =
            static_object(load_database(&task_executor, brontes_db_path, hr, self.run_id).await?);

        let tip = static_object(load_tip_database(libmdbx)?);
        tracing::info!(target: "brontes", "initialized libmdbx database");

        let load_window = self.load_time_window();

        let cex_download_config = CexDownloadConfig::new(
            // the run time window. notably we download the max window
            (load_window as u64, load_window as u64),
            self.cex_exchanges.clone(),
        );

        let range_type = self.get_range_type()?;
        let clickhouse = static_object(load_clickhouse(cex_download_config, self.run_id).await?);
        tracing::info!(target: "brontes", "Databases initialized");

        let only_cex_dex = self
            .inspectors
            .as_ref()
            .map(|f| {
                f.len() == 1
                    && (f.contains(&Inspectors::CexDex) || f.contains(&Inspectors::CexDexMarkout))
            })
            .unwrap_or(false);

        if only_cex_dex {
            self.force_no_dex_pricing = true;
        }

        let trade_config = self.time_window_args.trade_config();

        let inspectors = init_inspectors(
            quote_asset,
            libmdbx,
            self.inspectors,
            self.cex_exchanges,
            trade_config,
            self.with_metrics,
        );

        let tracer =
            get_tracing_provider(Path::new(&reth_db_path), max_tasks, task_executor.clone());
        let parser = static_object(DParser::new(metrics_tx, libmdbx, tracer.clone()).await);

        let executor = task_executor.clone();
        let result = executor
            .clone()
            .spawn_critical_with_graceful_shutdown_signal("run init", |shutdown| async move {
                if let Ok(brontes) = BrontesRunConfig::<_, _, _, MevProcessor>::new(
                    range_type,
                    max_tasks,
                    self.min_batch_size,
                    quote_asset,
                    self.force_dex_pricing,
                    self.force_no_dex_pricing,
                    inspectors,
                    clickhouse,
                    parser,
                    libmdbx,
                    tip,
                    self.cli_only,
                    self.with_metrics,
                    snapshot_mode,
                    load_window,
                )
                .build(task_executor, shutdown)
                .await
                .map_err(|e| {
                    tracing::error!(%e);
                    e
                }) {
                    brontes.await;
                }
            });

        result.await?;

        Ok(())
    }

    pub fn get_range_type(&self) -> eyre::Result<RangeType> {
        if let Some(ranges) = &self.ranges {
            let parsed_ranges = parse_ranges(ranges).map_err(|e| eyre::eyre!(e))?;
            Ok(RangeType::MultipleRanges(parsed_ranges))
        } else {
            Ok(RangeType::SingleRange {
                start_block:   self.start_block,
                end_block:     self.end_block,
                back_from_tip: self.behind_tip,
                from_db_tip:   self.from_db_tip,
            })
        }
    }

    async fn try_start_fallback_server(&self) -> Option<HeartRateMonitor> {
        if self.enable_fallback {
            if let Some(fallback_server) = self.fallback_server.clone() {
                tracing::info!("starting heartbeat");
                backup_server_heartbeat(fallback_server, Duration::from_secs(4)).await;
                None
            } else {
                tracing::info!("starting monitor");
                let (tx, rx) = tokio::sync::mpsc::channel(10);
                if let Err(e) = start_hr_monitor(tx).await {
                    tracing::error!(err=%e);
                }
                tracing::info!("monitor server started");
                Some(HeartRateMonitor::new(Duration::from_secs(7), rx))
            }
        } else {
            None
        }
    }

    /// the time window in seconds for downloading
    fn load_time_window(&self) -> usize {
        self.time_window_args
            .max_vwap_pre
            .max(self.time_window_args.max_vwap_post)
            .max(self.time_window_args.max_optimistic_pre)
            .max(self.time_window_args.max_optimistic_post) as usize
    }

    fn check_proper_range(&self) -> eyre::Result<()> {
        if let (Some(start), Some(end)) = (&self.start_block, &self.end_block) {
            if start > end {
                return Err(eyre::eyre!("start block must be less than end block"))
            }
        }
        Ok(())
    }
}

fn parse_ranges(ranges: &[String]) -> Result<Vec<(u64, u64)>, String> {
    ranges
        .iter()
        .map(|range| {
            let (start, end) = range
                .split_once('-')
                .ok_or_else(|| format!("invalid range: {}", range))?;
            let start: u64 = start
                .parse()
                .map_err(|_| format!("invalid start block: {}", start))?;
            let end: u64 = end
                .parse()
                .map_err(|_| format!("invalid end block: {}", end))?;
            if start > end {
                return Err(format!(
                    "start block {} must be less than or equal to end block {}",
                    start, end
                ))
            }
            Ok((start, end))
        })
        .collect()
}

#[derive(Debug, Parser)]
pub struct TimeWindowArgs {
    /// The initial sliding time window (BEFORE) for cex prices or trades
    /// relative to the block timestamp
    #[arg(long = "initial-pre", default_value = "0.05")]
    pub initial_vwap_pre: f64,

    /// The initial sliding time window (AFTER) for cex prices or trades
    /// relative to the block timestamp
    #[arg(long = "initial-post", default_value = "0.05")]
    pub initial_vwap_post: f64,

    /// The maximum sliding time window (BEFORE) for cex prices or trades
    /// relative to the block timestamp
    #[arg(long = "max-vwap-pre", short = 'b', default_value = "10.0")]
    pub max_vwap_pre: f64,

    /// The maximum sliding time window (AFTER) for cex prices or trades
    /// relative to the block timestamp
    #[arg(long = "max-vwap-post", short = 'a', default_value = "20.0")]
    pub max_vwap_post: f64,

    /// Defines how much to extend the post-block time window before the
    /// pre-block.
    #[arg(long = "vwap-scaling-diff", default_value = "0.3")]
    pub vwap_scaling_diff: f64,

    /// Size of each extension to the vwap calculations time window
    #[arg(long = "vwap-time-step", default_value = "0.01")]
    pub vwap_time_step: f64,

    /// Use block time weights to favour prices closer to the block time
    #[arg(long = "weights-vwap", default_value = "true")]
    pub block_time_weights_vwap: bool,

    /// Rate of decay of bi-exponential decay function see calculate_weight in
    /// brontes_types::db::cex
    #[arg(long = "weights-pre-vwap", default_value = "-0.0000005")]
    pub pre_decay_weight_vwap: f64,

    /// Rate of decay of bi-exponential decay function see calculate_weight in
    /// brontes_types::db::ce
    #[arg(long = "weights-post-vwap", default_value = "-0.0000002")]
    pub post_decay_weight_vwap: f64,

    /// The initial time window (BEFORE) for cex prices or trades relative to
    /// the block timestamp for fully optimistic calculations
    #[arg(long = "initial-op-pre", default_value = "0.05")]
    pub initial_optimistic_pre: f64,

    /// The initial time window (AFTER) for cex prices or trades relative to the
    /// block timestamp for fully optimistic calculations
    #[arg(long = "initial-op-post", default_value = "0.3")]
    pub initial_optimistic_post: f64,

    /// The maximum time window (BEFORE) for cex prices or trades relative to
    /// the block timestamp for fully optimistic calculations
    #[arg(long = "max-op-pre", default_value = "5.0")]
    pub max_optimistic_pre: f64,

    /// The maximum time window (AFTER) for cex prices or trades relative to the
    /// block timestamp for fully optimistic calculations
    #[arg(long = "max-op-post", default_value = "10.0")]
    pub max_optimistic_post: f64,

    /// Defines how much to extend the post-block time window before the
    /// pre-block.
    #[arg(long = "optimistic-scaling-diff", default_value = "0.2")]
    pub optimistic_scaling_diff: f64,

    /// Size of each extension to the optimistic calculations time window
    #[arg(long = "optimistic-time-step", default_value = "0.1")]
    pub optimistic_time_step: f64,

    /// Use block time weights to favour prices closer to the block time
    #[arg(long = "weights-op", default_value = "true")]
    pub block_time_weights_optimistic: bool,

    /// Rate of decay of bi-exponential decay function see calculate_weight in
    /// brontes_types::db::cex
    #[arg(long = "weights-pre-op", default_value = "-0.0000003")]
    pub pre_decay_weight_optimistic: f64,

    /// Rate of decay of bi-exponential decay function see calculate_weight in
    /// brontes_types::db::ce
    #[arg(long = "weights-post-op", default_value = "-0.00000012")]
    pub post_decay_weight_optimistic: f64,

    /// Cex Dex Quotes price time offset from block timestamp
    #[arg(long = "quote-offset", default_value = "0.0")]
    pub quote_offset: f64,
}

impl TimeWindowArgs {
    fn trade_config(&self) -> CexDexTradeConfig {
        CexDexTradeConfig {
            initial_vwap_pre_block_us:  (self.initial_vwap_pre * SECONDS_TO_US_FLOAT) as u64,
            initial_vwap_post_block_us: (self.initial_vwap_post * SECONDS_TO_US_FLOAT) as u64,
            max_vwap_pre_block_us:      (self.max_vwap_pre * SECONDS_TO_US_FLOAT) as u64,
            max_vwap_post_block_us:     (self.max_vwap_post * SECONDS_TO_US_FLOAT) as u64,
            vwap_scaling_diff_us:       (self.vwap_scaling_diff * SECONDS_TO_US_FLOAT) as u64,
            vwap_time_step_us:          (self.vwap_time_step * SECONDS_TO_US_FLOAT) as u64,

            use_block_time_weights_vwap:       self.block_time_weights_vwap,
            pre_decay_weight_vwap:             self.pre_decay_weight_vwap,
            post_decay_weight_vwap:            self.post_decay_weight_vwap,
            initial_optimistic_pre_block_us:   (self.initial_optimistic_pre * SECONDS_TO_US_FLOAT)
                as u64,
            initial_optimistic_post_block_us:  (self.initial_optimistic_post * SECONDS_TO_US_FLOAT)
                as u64,
            max_optimistic_pre_block_us:       (self.max_optimistic_pre * SECONDS_TO_US_FLOAT)
                as u64,
            max_optimistic_post_block_us:      (self.max_optimistic_post * SECONDS_TO_US_FLOAT)
                as u64,
            optimistic_scaling_diff_us:        (self.optimistic_scaling_diff * SECONDS_TO_US_FLOAT)
                as u64,
            optimistic_time_step_us:           (self.optimistic_time_step * SECONDS_TO_US_FLOAT)
                as u64,
            use_block_time_weights_optimistic: self.block_time_weights_optimistic,
            pre_decay_weight_op:               self.pre_decay_weight_optimistic,
            post_decay_weight_op:              self.post_decay_weight_optimistic,
            quote_offset_from_block_us:        (self.quote_offset * SECONDS_TO_US_FLOAT) as u64,
        }
    }
}