1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use std::sync::Arc;

use arrow::{
    array::{
        Array, ArrayRef, Float64Array, Float64Builder, StringArray, StringBuilder, StructArray,
        UInt64Builder,
    },
    datatypes::{DataType, Field, Schema},
    error::ArrowError,
    record_batch::RecordBatch,
};
use brontes_types::mev::MevBlock;

use super::utils::{
    build_float64_array, build_record_batch, build_string_array, build_uint64_array,
    u128_to_binary_array,
};

pub fn mev_block_to_record_batch(mev_blocks: Vec<MevBlock>) -> Result<RecordBatch, ArrowError> {
    let block_hash_array = build_string_array(
        mev_blocks
            .iter()
            .map(|mb| mb.block_hash.to_string())
            .collect(),
    );
    let block_number_array =
        build_uint64_array(mev_blocks.iter().map(|mb| mb.block_number).collect());
    let eth_price_array = build_float64_array(mev_blocks.iter().map(|mb| mb.eth_price).collect());

    let gas_used_array =
        u128_to_binary_array(mev_blocks.iter().map(|mb| mb.total_gas_used).collect());
    let priority_fee_array =
        u128_to_binary_array(mev_blocks.iter().map(|mb| mb.total_priority_fee).collect());
    let total_bribe_array =
        u128_to_binary_array(mev_blocks.iter().map(|mb| mb.total_bribe).collect());
    let total_mev_priority_fee_paid_array = u128_to_binary_array(
        mev_blocks
            .iter()
            .map(|mb| mb.total_mev_priority_fee_paid)
            .collect(),
    );

    let builder_address_array = build_string_array(
        mev_blocks
            .iter()
            .map(|mb| mb.builder_address.to_string())
            .collect(),
    );
    let builder_eth_profits_array =
        build_float64_array(mev_blocks.iter().map(|mb| mb.builder_eth_profit).collect());
    let builder_usd_profits_array =
        build_float64_array(mev_blocks.iter().map(|mb| mb.builder_profit_usd).collect());
    let builder_mev_profits_array = build_float64_array(
        mev_blocks
            .iter()
            .map(|mb| mb.builder_mev_profit_usd)
            .collect(),
    );
    let total_mev_profit_usds_array = build_float64_array(
        mev_blocks
            .iter()
            .map(|mb| mb.total_mev_profit_usd)
            .collect(),
    );

    let proposer_mev_reward_array = u128_to_binary_array(
        mev_blocks
            .iter()
            .map(|mb| mb.proposer_mev_reward.unwrap_or_default())
            .collect(),
    );

    let mev_count_array = get_mev_count_array(&mev_blocks);
    let (proposer_fee_recipient_array, proposer_profit_usd_array) =
        get_proposer_arrays(&mev_blocks);

    let schema = build_schema(&mev_count_array);

    build_record_batch(
        schema,
        vec![
            Arc::new(block_hash_array),
            Arc::new(block_number_array),
            Arc::new(mev_count_array),
            Arc::new(eth_price_array),
            Arc::new(gas_used_array),
            Arc::new(priority_fee_array),
            Arc::new(total_bribe_array),
            Arc::new(total_mev_priority_fee_paid_array),
            Arc::new(builder_address_array),
            Arc::new(builder_eth_profits_array),
            Arc::new(builder_usd_profits_array),
            Arc::new(builder_mev_profits_array),
            Arc::new(proposer_fee_recipient_array),
            Arc::new(proposer_mev_reward_array),
            Arc::new(proposer_profit_usd_array),
            Arc::new(total_mev_profit_usds_array),
        ],
    )
}

fn build_schema(mev_count_array: &StructArray) -> Schema {
    Schema::new(vec![
        Field::new("block_hash", DataType::Utf8, false),
        Field::new("block_number", DataType::UInt64, false),
        Field::new("mev_count", mev_count_array.data_type().clone(), false),
        Field::new("eth_price", DataType::Float64, false),
        Field::new("total_gas_used", DataType::Binary, false),
        Field::new("total_priority_fee", DataType::Binary, false),
        Field::new("total_bribe", DataType::Binary, false),
        Field::new("total_mev_priority_fee_paid", DataType::Binary, false),
        Field::new("builder_address", DataType::Utf8, false),
        Field::new("builder_eth_profit", DataType::Float64, false),
        Field::new("builder_profit_usd", DataType::Float64, false),
        Field::new("builder_mev_profit_usd", DataType::Float64, false),
        Field::new("proposer_fee_recipient", DataType::Utf8, true),
        Field::new("proposer_mev_reward", DataType::Binary, true),
        Field::new("proposer_profit_usd", DataType::Float64, true),
        Field::new("total_mev_profit_usd", DataType::Float64, false),
    ])
}

fn get_mev_count_array(mev_blocks: &Vec<MevBlock>) -> StructArray {
    let mut mev_count_builder = UInt64Builder::new();
    let mut sandwich_count_builder = UInt64Builder::new();
    let mut liquidation_count_builder = UInt64Builder::new();
    let mut atomic_backrun_count_builder = UInt64Builder::new();
    let mut cex_dex_count_builder = UInt64Builder::new();
    let mut jit_count_builder = UInt64Builder::new();
    let mut jit_sandwich_count_builder = UInt64Builder::new();
    let mut searcher_tx_count_builder = UInt64Builder::new();

    for block in mev_blocks {
        mev_count_builder.append_value(block.mev_count.bundle_count);
        sandwich_count_builder.append_option(block.mev_count.sandwich_count);
        liquidation_count_builder.append_option(block.mev_count.liquidation_count);
        atomic_backrun_count_builder.append_option(block.mev_count.atomic_backrun_count);
        cex_dex_count_builder.append_option(block.mev_count.cex_dex_trade_count);
        jit_count_builder.append_option(block.mev_count.jit_count);
        jit_sandwich_count_builder.append_option(block.mev_count.jit_sandwich_count);
        searcher_tx_count_builder.append_option(block.mev_count.searcher_tx_count);
    }

    let mev_count_array = mev_count_builder.finish();
    let sandwich_count_array = sandwich_count_builder.finish();
    let liquidation_count_array = liquidation_count_builder.finish();
    let atomic_backrun_count_array = atomic_backrun_count_builder.finish();
    let cex_dex_count_array = cex_dex_count_builder.finish();
    let jit_count_array = jit_count_builder.finish();
    let jit_sandwich_count_array = jit_sandwich_count_builder.finish();
    let searcher_tx_count_array = searcher_tx_count_builder.finish();

    let fields = vec![
        Field::new("mev_count", DataType::UInt64, false),
        Field::new("sandwich_count", DataType::UInt64, true),
        Field::new("liquidation_count", DataType::UInt64, true),
        Field::new("atomic_backrun_count", DataType::UInt64, true),
        Field::new("cex_dex_count", DataType::UInt64, true),
        Field::new("jit_count", DataType::UInt64, true),
        Field::new("jit_sandwich_count", DataType::UInt64, true),
        Field::new("searcher_tx_count", DataType::UInt64, true),
    ];

    let arrays = vec![
        Arc::new(mev_count_array) as ArrayRef,
        Arc::new(sandwich_count_array) as ArrayRef,
        Arc::new(liquidation_count_array) as ArrayRef,
        Arc::new(atomic_backrun_count_array) as ArrayRef,
        Arc::new(cex_dex_count_array) as ArrayRef,
        Arc::new(jit_count_array) as ArrayRef,
        Arc::new(jit_sandwich_count_array) as ArrayRef,
        Arc::new(searcher_tx_count_array) as ArrayRef,
    ];

    StructArray::try_new(fields.into(), arrays, None).expect("Failed to init struct arrays")
}

fn get_proposer_arrays(mev_blocks: &Vec<MevBlock>) -> (StringArray, Float64Array) {
    let fee_recipient_data_capacity = mev_blocks[0].builder_address.len() * mev_blocks.len();
    let mut proposer_fee_recipient_builder =
        StringBuilder::with_capacity(mev_blocks.len(), fee_recipient_data_capacity);
    let mut proposer_profit_usd_builder = Float64Builder::with_capacity(mev_blocks.len());

    for block in mev_blocks {
        proposer_fee_recipient_builder.append_option(
            block
                .proposer_fee_recipient
                .as_ref()
                .map(|addr| addr.to_string()),
        );
        proposer_profit_usd_builder.append_option(block.proposer_profit_usd);
    }

    (proposer_fee_recipient_builder.finish(), proposer_profit_usd_builder.finish())
}