use alloy_primitives::Address;
use brontes_types::{pair::Pair, FastHashMap, FastHashSet, ToFloatNearest};
use itertools::Itertools;
use malachite::{num::basic::traits::Zero, Rational};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::{error_span, instrument};
use super::{
state_tracker::StateTracker,
subgraph::{BadEdge, PairSubGraph, VerificationOutcome},
};
use crate::{types::PairWithFirstPoolHop, AllPairGraph, PoolPairInfoDirection, SubGraphEdge};
#[derive(Debug, Clone)]
pub struct SubgraphVerifier {
pending_subgraphs: FastHashMap<PairWithFirstPoolHop, Subgraph>,
subgraph_verification_state: FastHashMap<PairWithFirstPoolHop, SubgraphVerificationState>,
}
impl Drop for SubgraphVerifier {
fn drop(&mut self) {
tracing::debug!(
target: "brontes::mem",
verification_state_rem = self.subgraph_verification_state.len(),
pending_subgraph_count = self.pending_subgraphs.len(),
"amount of remaining state in verifier"
);
}
}
impl Default for SubgraphVerifier {
fn default() -> Self {
Self::new()
}
}
impl SubgraphVerifier {
pub fn new() -> Self {
Self {
pending_subgraphs: FastHashMap::default(),
subgraph_verification_state: FastHashMap::default(),
}
}
pub fn get_subgraph_extends(&self, pair: PairWithFirstPoolHop) -> Option<Pair> {
self.pending_subgraphs
.get(&pair)
.and_then(|graph| graph.subgraph.extends_to())
}
pub fn has_go_through(&self, pair: PairWithFirstPoolHop) -> bool {
self.pending_subgraphs.contains_key(&pair)
}
pub fn print_rem(&self, block: u64) {
self.pending_subgraphs
.values()
.filter(|v| v.block == block)
.for_each(|v| {
tracing::debug!(pair=?v.subgraph.complete_pair(), "pending");
})
}
pub fn get_rem_for_block(&self, block: u64) -> Vec<PairWithFirstPoolHop> {
self.pending_subgraphs
.iter()
.filter(|(_, v)| v.block == block)
.map(|(k, _)| *k)
.collect()
}
pub fn is_done_block(&self, block: u64) -> bool {
self.pending_subgraphs
.values()
.filter(|v| v.block == block)
.count()
== 0
}
pub fn is_verifying_with_block(&self, pair: PairWithFirstPoolHop, block: u64) -> bool {
self.pending_subgraphs
.get(&pair)
.map(|s| s.block == block)
.unwrap_or(false)
}
pub fn pool_dep_failure(
&mut self,
pair: &PairWithFirstPoolHop,
pool_addr: Address,
pool_pair: Pair,
) -> bool {
tracing::debug!(%pair, "dep failure");
let Some(graph) = self.pending_subgraphs.get_mut(pair) else { return true };
graph.subgraph.remove_bad_node(pool_pair, pool_addr);
if graph.subgraph.is_disjoint() {
self.subgraph_verification_state.remove(pair);
self.pending_subgraphs.remove(pair);
return true
}
false
}
pub fn create_new_subgraph(
&mut self,
pair: PairWithFirstPoolHop,
extends_to: Option<Pair>,
block: u64,
path: Vec<SubGraphEdge>,
state_tracker: &mut StateTracker,
) -> Vec<PoolPairInfoDirection> {
if self.pending_subgraphs.contains_key(&pair) {
return vec![]
};
let query_state = state_tracker.missing_state(block, &path);
let complete_pair = pair.get_pair();
let gt = pair.get_goes_through();
let extend_pair = Pair(complete_pair.0, extends_to.map(|e| e.0).unwrap_or(complete_pair.1));
let subgraph = PairSubGraph::init(extend_pair, complete_pair, gt, extends_to, path, block);
if self
.pending_subgraphs
.insert(
pair,
Subgraph {
subgraph,
block,
frayed_end_extensions: FastHashMap::default(),
id: 0,
in_rundown: false,
iters: 0,
},
)
.is_some()
{
tracing::error!(?pair, ?block, "duplicate subgraph");
};
query_state
}
#[instrument(skip(self), level = "trace")]
pub fn verify_subgraph_on_new_path_failure(
&mut self,
pair: PairWithFirstPoolHop,
) -> Option<Vec<Pair>> {
self.pending_subgraphs
.get_mut(&pair)
.or_else(|| {
tracing::debug!(?pair, "missing pending subgraph");
None
})?
.in_rundown = true;
let state = self
.subgraph_verification_state
.get_mut(&pair)
.or_else(|| {
tracing::debug!(?pair, "missing state");
None
})?;
Some(state.sorted_ignore_nodes_by_liquidity())
}
fn store_edges_with_liq(
&mut self,
pair: PairWithFirstPoolHop,
removals: &FastHashMap<Pair, FastHashSet<BadEdge>>,
all_graph: &AllPairGraph,
) {
removals
.iter()
.filter_map(|(k, v)| {
if all_graph.edge_count(k.0, k.1) == v.len() {
Some(
v.clone()
.into_iter()
.filter(|v| v.liquidity != Rational::ZERO),
)
} else {
None
}
})
.flatten()
.for_each(|edge| {
let state = self.subgraph_verification_state.entry(pair).or_default();
state.add_edge_with_liq(edge.pair.0, edge.clone());
state.add_edge_with_liq(edge.pair.1, edge.clone());
});
}
pub fn add_frayed_end_extension(
&mut self,
pair: PairWithFirstPoolHop,
block: u64,
state_tracker: &mut StateTracker,
frayed_end_extensions: Vec<SubGraphEdge>,
) -> Option<(Vec<PoolPairInfoDirection>, u64, bool)> {
Some((
state_tracker.missing_state(block, &frayed_end_extensions),
self.pending_subgraphs
.get_mut(&pair)
.or_else(|| {
tracing::trace!("frayed ext no pair in pending_subgraphs");
None
})?
.add_extension(frayed_end_extensions),
true,
))
}
pub fn verify_subgraph(
&mut self,
pair: Vec<(u64, Option<u64>, PairWithFirstPoolHop, Rational, Address)>,
all_graph: &AllPairGraph,
state_tracker: &mut StateTracker,
) -> Vec<VerificationResults> {
let span = error_span!("Subgraph Verifier");
span.in_scope(|| {
let pairs = self.get_subgraphs(pair);
let res = self.verify_par(pairs, state_tracker);
res.into_iter()
.map(|(pair, block, result, subgraph)| {
self.store_edges_with_liq(pair, &result.removals, all_graph);
let state = self.subgraph_verification_state.entry(pair).or_default();
let ignores = state.get_nodes_to_ignore();
let removals = result
.removals
.clone()
.into_iter()
.filter(|(k, _)| !(ignores.contains(k)))
.collect::<FastHashMap<_, _>>();
if result.should_abandon {
self.subgraph_verification_state.remove(&pair);
tracing::trace!(?pair, "aborting");
return VerificationResults::Abort(pair, block)
}
if result.should_requery {
let extends = subgraph.subgraph.extends_to();
self.pending_subgraphs.insert(pair, subgraph);
tracing::trace!(?pair, "requerying");
return VerificationResults::Failed(VerificationFailed {
pair,
extends,
block,
prune_state: removals,
ignore_state: ignores,
frayed_ends: result.frayed_ends,
})
}
self.passed_verification(pair, block, subgraph, removals, state_tracker)
})
.collect_vec()
})
}
fn get_subgraphs(
&mut self,
pair: Vec<(u64, Option<u64>, PairWithFirstPoolHop, Rational, Address)>,
) -> Vec<(PairWithFirstPoolHop, u64, bool, Subgraph, Rational, Address)> {
pair.into_iter()
.map(|(block, frayed, pair, price, quote)| {
(
pair,
block,
frayed,
self.pending_subgraphs.remove(&pair).or_else(|| {
tracing::debug!(?pair, "not found in pending subgraphs");
None
}),
price,
quote,
)
})
.filter_map(|(pair, block, _, subgraph, price, quote)| {
let mut subgraph = subgraph?;
subgraph.iters += 1;
Some((pair, block, subgraph.in_rundown, subgraph, price, quote))
})
.collect_vec()
}
fn verify_par(
&self,
pairs: Vec<(PairWithFirstPoolHop, u64, bool, Subgraph, Rational, Address)>,
state_tracker: &mut StateTracker,
) -> Vec<(PairWithFirstPoolHop, u64, VerificationOutcome, Subgraph)> {
pairs
.into_par_iter()
.map(|(pair, block, rundown, mut subgraph, price, quote)| {
let edge_state = state_tracker.state_for_verification(block);
let result = if rundown {
subgraph
.subgraph
.rundown_subgraph_check(quote, price, &edge_state)
} else {
subgraph.subgraph.verify_subgraph(quote, price, edge_state)
};
(pair, block, result, subgraph)
})
.collect::<Vec<_>>()
}
fn passed_verification(
&mut self,
pair: PairWithFirstPoolHop,
block: u64,
subgraph: Subgraph,
removals: FastHashMap<Pair, FastHashSet<BadEdge>>,
state_tracker: &mut StateTracker,
) -> VerificationResults {
self.subgraph_verification_state.remove(&pair);
let subgraph = subgraph.subgraph;
subgraph.get_all_pools().flatten().for_each(|pool| {
state_tracker.mark_state_as_finalized(block, pool.pool_addr);
});
VerificationResults::Passed(VerificationPass {
pair,
block,
subgraph,
prune_state: removals,
})
}
}
#[derive(Debug, Clone)]
pub struct Subgraph {
pub subgraph: PairSubGraph,
pub frayed_end_extensions: FastHashMap<u64, Vec<SubGraphEdge>>,
pub id: u64,
pub in_rundown: bool,
pub iters: usize,
pub block: u64,
}
impl Subgraph {
pub fn add_extension(&mut self, edges: Vec<SubGraphEdge>) -> u64 {
let id = self.id;
self.id += 1;
self.frayed_end_extensions.insert(id, edges);
id
}
}
#[derive(Debug)]
pub struct VerificationPass {
pub pair: PairWithFirstPoolHop,
pub block: u64,
pub subgraph: PairSubGraph,
pub prune_state: FastHashMap<Pair, FastHashSet<BadEdge>>,
}
#[derive(Debug)]
pub struct VerificationFailed {
pub pair: PairWithFirstPoolHop,
pub extends: Option<Pair>,
pub block: u64,
pub prune_state: FastHashMap<Pair, FastHashSet<BadEdge>>,
pub ignore_state: FastHashSet<Pair>,
pub frayed_ends: Vec<Address>,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum VerificationResults {
Passed(VerificationPass),
Failed(VerificationFailed),
Abort(PairWithFirstPoolHop, u64),
}
#[derive(Debug, Default, Clone)]
pub struct SubgraphVerificationState {
edges: EdgesWithLiq,
removed_recusing: FastHashMap<Pair, Address>,
}
impl SubgraphVerificationState {
fn sorted_ignore_nodes_by_liquidity(&self) -> Vec<Pair> {
self.edges
.0
.values()
.flat_map(|node| {
node.iter()
.map(|n| (n.pair, n.liquidity.clone()))
.collect_vec()
})
.unique()
.sorted_by(|a, b| a.1.cmp(&b.1))
.map(|n| n.0)
.collect_vec()
}
#[allow(unused)]
fn highest_liq_for_pair(&self, pair: Pair) -> (Address, f64) {
self.edges
.0
.values()
.flat_map(|node| {
node.iter()
.map(|n| (n.pair, n.pool_address, n.liquidity.clone()))
.collect_vec()
})
.unique()
.filter(|f| f.0 == pair)
.sorted_by(|a, b| a.2.cmp(&b.2))
.collect_vec()
.pop()
.map(|(_, addr, liq)| (addr, liq.to_float()))
.unwrap()
}
fn add_edge_with_liq(&mut self, addr: Address, bad_edge: BadEdge) {
if !self.removed_recusing.contains_key(&bad_edge.pair) {
self.edges.0.entry(addr).or_default().insert(bad_edge);
}
}
fn get_nodes_to_ignore(&self) -> FastHashSet<Pair> {
self.edges
.0
.values()
.flatten()
.map(|node| node.pair.ordered())
.collect::<FastHashSet<_>>()
}
}
#[derive(Debug, Default, Clone)]
pub struct EdgesWithLiq(FastHashMap<Address, FastHashSet<BadEdge>>);