1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#[cfg(feature = "local-clickhouse")]
use std::sync::Arc;

use brontes_database::libmdbx::{DBWriter, LibmdbxReader};
use brontes_inspect::{
    composer::{run_block_inspection, ComposerResults},
    Inspector,
};
#[cfg(feature = "local-clickhouse")]
use brontes_types::frontend_prunes::{
    remove_burn_transfers, remove_collect_transfers, remove_mint_transfers, remove_swap_transfers,
};
#[cfg(feature = "local-clickhouse")]
use brontes_types::normalized_actions::Action;
#[cfg(feature = "local-clickhouse")]
use brontes_types::tree::BlockTree;
use brontes_types::{
    db::block_analysis::BlockAnalysis,
    execute_on,
    mev::{Bundle, MevBlock, MevType},
    BlockData, MultiBlockData,
};
use tracing::debug;

use crate::Processor;

#[derive(Debug, Clone, Copy)]
pub struct MevProcessor;

impl Processor for MevProcessor {
    type InspectType = Vec<Bundle>;

    #[allow(unused_variables)]
    async fn process_results<DB: DBWriter + LibmdbxReader>(
        db: &'static DB,
        inspectors: &'static [&dyn Inspector<Result = Self::InspectType>],
        data: MultiBlockData,
    ) {
        let last = data.get_most_recent_block().clone();
        let BlockData { metadata, tree } = last;
        if let Err(e) = db
            .write_dex_quotes(metadata.block_num, metadata.dex_quotes.clone())
            .await
        {
            tracing::error!(err=%e, block_num=metadata.block_num, "failed to insert dex pricing and state into db");
        }

        #[cfg(feature = "local-clickhouse")]
        {
            let inner_tree = Arc::unwrap_or_clone(tree.clone());
            insert_tree(db, inner_tree, metadata.block_num).await;
        }

        let ComposerResults { block_details, mev_details, block_analysis, .. } =
            execute_on!(async_inspect, { run_block_inspection(inspectors, data, db) }).await;

        insert_mev_results(db, block_details, mev_details, block_analysis).await;
    }
}

#[cfg(feature = "local-clickhouse")]
async fn insert_tree<DB: DBWriter + LibmdbxReader>(
    db: &DB,
    mut tree_owned: BlockTree<Action>,
    block_num: u64,
) {
    remove_swap_transfers(&mut tree_owned);
    remove_mint_transfers(&mut tree_owned);
    remove_burn_transfers(&mut tree_owned);
    remove_collect_transfers(&mut tree_owned);

    if let Err(e) = db.insert_tree(tree_owned).await {
        tracing::error!(err=%e, %block_num, "failed to insert tree into db");
    }
}

async fn insert_mev_results<DB: DBWriter + LibmdbxReader>(
    database: &'static DB,
    block_details: MevBlock,
    mev_details: Vec<Bundle>,
    analysis: BlockAnalysis,
) {
    debug!(
        target: "brontes::results",
        "block details\n {}",
        block_details.to_string()
    );

    let block_number = block_details.block_number;
    output_mev_and_update_searcher_info(database, &mev_details).await;

    // Attempt to save the MEV block details
    if let Err(e) = database
        .save_mev_blocks(block_details.block_number, block_details, mev_details)
        .await
    {
        tracing::error!(
            "Failed to insert classified data into libmdbx: {:?} at block: {}",
            e,
            block_number
        );
    }
    if let Err(e) = database.write_block_analysis(analysis).await {
        tracing::error!(
            "Failed to insert block analysis data into db: {:?} at block: {}",
            e,
            block_number
        );
    }
}
async fn output_mev_and_update_searcher_info<DB: DBWriter + LibmdbxReader>(
    database: &DB,
    mev_details: &Vec<Bundle>,
) {
    for mev in mev_details {
        debug!(
            target: "brontes::results",
            "mev details\n {}",
            mev.to_string()
        );

        if mev.header.mev_type == MevType::Unknown || mev.header.mev_type == MevType::SearcherTx {
            continue
        }

        let (eoa_info, contract_info) = database
            .try_fetch_searcher_info(mev.header.eoa, mev.header.mev_contract)
            .expect("Failed to fetch searcher info from the database");

        let mut eoa_info = eoa_info.unwrap_or_default();
        let mut contract_info = contract_info.unwrap_or_default();

        eoa_info.update_with_bundle(&mev.header);
        contract_info.update_with_bundle(&mev.header);

        if let Err(e) = database
            .write_searcher_info(
                mev.header.eoa,
                mev.header.mev_contract,
                eoa_info,
                Some(contract_info),
            )
            .await
        {
            tracing::error!("Failed to update searcher info in the database: {:?}", e);
        }
    }
}