1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Utils around grouping operations to specific thread pools
//! this is done to allow for more precise control over our
//! par_iter allocations.
use std::sync::OnceLock;

/// NOTE: we exceed 100% due to the call operation flow.
/// we still expect to keep cpu usage near given value
pub fn init_thread_pools(max_tasks: usize) {
    // expensive ops, up to 200 ms
    let pricing_tasks = (max_tasks as f64 * 0.70) as usize + 1;
    // inspector runtime ~ 50ms
    let inspect_tasks = max_tasks;

    let downalod_tasks = (max_tasks as f64 * 0.50) as usize + 1;

    init_download_thread_pool(downalod_tasks);
    init_pricing_thread_pool(pricing_tasks);
    init_inspect_threadpool(inspect_tasks);
}

/// To use
/// ```ignore
/// execute_on!(target = ?, { code });
/// execute_on!(target = ?,  fn_call );
/// ```
/// where ? can be,
/// - pricing
/// - inspect
#[macro_export]
macro_rules! execute_on {
    (target=$t:tt, $block:block) => {
        execute_on!($t, $block)
    };
    (target=$t:tt, $($block:tt)+) => {
        execute_on!($t, { $($block)+ })
    };
    (download, $block:block) => {
        $crate::execute_on_download_thread_pool(|| $block)
    };
    (pricing, $block:block) => {
        ::brontes_types::execute_on_pricing_thread_pool(|| $block)
    };
    (inspect, $block:block) => {
        ::brontes_types::execute_on_inspect_thread_pool(|| $block)
    };
    (async_inspect, $block:block) => {
        ::brontes_types::execute_on_inspect_thread_pool_async(move || $block)
    };
}

static RAYON_DOWNLOAD_THREADPOOL: OnceLock<rayon::ThreadPool> = OnceLock::new();

fn init_download_thread_pool(threads: usize) {
    let threadpool = rayon::ThreadPoolBuilder::new()
        .num_threads(threads)
        .thread_name(|idx| format!("DB-DOWNLOAD: {}", idx))
        .build()
        .unwrap();

    let _ = RAYON_DOWNLOAD_THREADPOOL.set(threadpool);
}
pub fn execute_on_download_thread_pool<OP, R>(op: OP) -> R
where
    OP: FnOnce() -> R + Send,
    R: Send,
{
    RAYON_DOWNLOAD_THREADPOOL
        .get()
        .expect("threadpool not initialized")
        .install(op)
}

/// ThreadPool for pricing operations
static RAYON_PRICING_THREADPOOL: OnceLock<rayon::ThreadPool> = OnceLock::new();

fn init_pricing_thread_pool(threads: usize) {
    let threadpool = rayon::ThreadPoolBuilder::new()
        .num_threads(threads)
        .thread_name(|idx| format!("Pricing: {}", idx))
        .build()
        .unwrap();

    let _ = RAYON_PRICING_THREADPOOL.set(threadpool);
}

pub fn execute_on_pricing_thread_pool<OP, R>(op: OP) -> R
where
    OP: FnOnce() -> R + Send,
    R: Send,
{
    RAYON_PRICING_THREADPOOL
        .get()
        .expect("threadpool not initialized")
        .install(op)
}

/// ThreadPool for inspect operations
static RAYON_INSPECT_THREADPOOL: OnceLock<rayon::ThreadPool> = OnceLock::new();

fn init_inspect_threadpool(threads: usize) {
    let threadpool = rayon::ThreadPoolBuilder::new()
        .num_threads(threads)
        .thread_name(|idx| format!("Inspect: {}", idx))
        .stack_size(1024 * 1024 * 32)
        .build()
        .unwrap();
    let _ = RAYON_INSPECT_THREADPOOL.set(threadpool);
}

pub fn execute_on_inspect_thread_pool<OP, R>(op: OP) -> R
where
    OP: FnOnce() -> R + Send,
    R: Send,
{
    RAYON_INSPECT_THREADPOOL
        .get()
        .expect("threadpool not initialized")
        .install(op)
}

pub async fn execute_on_inspect_thread_pool_async<OP, R>(op: OP) -> R
where
    OP: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let (tx, rx) = tokio::sync::oneshot::channel();
    RAYON_INSPECT_THREADPOOL
        .get()
        .expect("threadpool not initialized")
        .spawn(move || {
            let res = op();
            let _ = tx.send(res);
        });

    rx.await.unwrap()
}