1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use std::{
    pin::Pin,
    task::{Context, Poll},
};

use brontes_classifier::discovery_only::DiscoveryOnlyClassifier;
use brontes_core::decoding::{Parser, TracingProvider};
use brontes_database::libmdbx::{DBWriter, LibmdbxReader};
use futures::{pin_mut, stream::FuturesUnordered, Future, StreamExt};
use reth_tasks::shutdown::GracefulShutdown;

use crate::executors::ProgressBar;

const MAX_PENDING_TREE_BUILDING: usize = 5;

/// only runs discovery
pub struct DiscoveryExecutor<T: TracingProvider, DB: DBWriter + LibmdbxReader> {
    current_block: u64,
    end_block:     u64,
    parser:        &'static Parser<T, DB>,
    classifier:    DiscoveryOnlyClassifier<'static, T, DB>,
    running:       FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
    progress_bar:  ProgressBar,
}

impl<T: TracingProvider, DB: LibmdbxReader + DBWriter> DiscoveryExecutor<T, DB> {
    pub fn new(
        start_block: u64,
        end_block: u64,
        db: &'static DB,
        parser: &'static Parser<T, DB>,
        progress_bar: ProgressBar,
    ) -> Self {
        let classifier = DiscoveryOnlyClassifier::new(db, parser.get_tracer());
        Self {
            progress_bar,
            current_block: start_block,
            end_block,
            parser,
            classifier,
            running: FuturesUnordered::default(),
        }
    }

    pub async fn run_until_graceful_shutdown(self, shutdown: GracefulShutdown) {
        let data_batching = self;
        pin_mut!(data_batching, shutdown);

        let mut graceful_guard = None;
        tokio::select! {
            _ = &mut data_batching => {
            },
            guard = shutdown => {
                graceful_guard = Some(guard);
            },
        }
        while (data_batching.running.next().await).is_some() {}

        drop(graceful_guard);
    }

    async fn process_next(
        block: u64,
        parser: &'static Parser<T, DB>,
        classifier: DiscoveryOnlyClassifier<'static, T, DB>,
    ) {
        if let Some((_, traces, header)) = parser.execute_discovery(block).await {
            classifier.run_discovery(traces, header).await
        }
    }
}

impl<T: TracingProvider, DB: LibmdbxReader + DBWriter> Future for DiscoveryExecutor<T, DB> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.current_block != self.end_block && self.running.len() < MAX_PENDING_TREE_BUILDING {
            cx.waker().wake_by_ref();
            let fut = Box::pin(Self::process_next(
                self.current_block,
                self.parser,
                self.classifier.clone(),
            ));
            self.running.push(fut);

            self.current_block += 1;
        }

        while match self.running.poll_next_unpin(cx) {
            Poll::Ready(Some(_)) => {
                self.progress_bar.inc(1);
                true
            }
            Poll::Pending => false,
            Poll::Ready(None) if self.current_block == self.end_block => return Poll::Ready(()),
            Poll::Ready(None) => {
                cx.waker().wake_by_ref();
                false
            }
        } {}

        Poll::Pending
    }
}