1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
//! Module that interacts with MDBX.

use std::{ops::Deref, path::Path};

use brontes_libmdbx::{
    DatabaseFlags, Environment, EnvironmentFlags, Geometry, MaxReadTransactionDuration, Mode,
    PageSize, SyncMode,
};
use reth_db::{
    database_metrics::{DatabaseMetadata, DatabaseMetadataValue},
    models::client_version::ClientVersion,
    tables::{TableType, Tables},
    DatabaseError,
};
use reth_interfaces::db::LogLevel;
const GIGABYTE: usize = 1024 * 1024 * 1024;

/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to
/// slightly below that
const DEFAULT_MAX_READERS: u64 = 32_000;

/// Space that a read-only transaction can occupy until the warning is emitted.
/// See [brontes_libmdbx::EnvironmentBuilder::set_handle_slow_readers] for more
/// information.
#[cfg(not(windows))]
const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;

/// Environment used when opening a MDBX environment. RO/RW.
#[derive(Debug)]
pub enum DatabaseEnvKind {
    /// Read-only MDBX environment.
    RO,
    /// Read-write MDBX environment.
    RW,
}

impl DatabaseEnvKind {
    /// Returns `true` if the environment is read-write.
    pub fn is_rw(&self) -> bool {
        matches!(self, Self::RW)
    }
}

/// Arguments for database initialization.
#[derive(Clone, Debug, Default)]
pub struct DatabaseArguments {
    /// Client version that accesses the database.
    client_version:                ClientVersion,
    /// Database log level. If [None], the default value is used.
    log_level:                     Option<LogLevel>,
    /// Maximum duration of a read transaction. If [None], the default value is
    /// used.
    max_read_transaction_duration: Option<MaxReadTransactionDuration>,
    /// Open environment in exclusive/monopolistic mode. If [None], the default
    /// value is used.
    ///
    /// This can be used as a replacement for `MDB_NOLOCK`, which don't
    /// supported by MDBX. In this way, you can get the minimal overhead,
    /// but with the correct multi-process and multi-thread locking.
    ///
    /// If `true` = open environment in exclusive/monopolistic mode or return
    /// `MDBX_BUSY` if environment already used by other process. The main
    /// feature of the exclusive mode is the ability to open the environment
    /// placed on a network share.
    ///
    /// If `false` = open environment in cooperative mode, i.e. for
    /// multi-process access/interaction/cooperation. The main requirements
    /// of the cooperative mode are:
    /// - Data files MUST be placed in the LOCAL file system, but NOT on a
    ///   network share.
    /// - Environment MUST be opened only by LOCAL processes, but NOT over a
    ///   network.
    /// - OS kernel (i.e. file system and memory mapping implementation) and all
    ///   processes that open the given environment MUST be running in the
    ///   physically single RAM with cache-coherency. The only exception for
    ///   cache-consistency requirement is Linux on MIPS architecture, but this
    ///   case has not been tested for a long time).
    ///
    /// This flag affects only at environment opening but can't be changed
    /// after.
    exclusive:                     Option<bool>,
}

impl DatabaseArguments {
    /// Create new database arguments with given client version.
    pub fn new(client_version: ClientVersion) -> Self {
        Self {
            client_version,
            log_level: None,
            max_read_transaction_duration: None,
            exclusive: None,
        }
    }

    /// Set the log level.
    pub fn with_log_level(mut self, log_level: Option<LogLevel>) -> Self {
        self.log_level = log_level;
        self
    }

    /// Set the maximum duration of a read transaction.
    pub fn with_max_read_transaction_duration(
        mut self,
        max_read_transaction_duration: Option<MaxReadTransactionDuration>,
    ) -> Self {
        self.max_read_transaction_duration = max_read_transaction_duration;
        self
    }

    /// Set the mdbx exclusive flag.
    pub fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
        self.exclusive = exclusive;
        self
    }

    /// Returns the client version if any.
    pub fn client_version(&self) -> &ClientVersion {
        &self.client_version
    }
}

/// Wrapper for the libmdbx environment: [Environment]
#[derive(Debug)]
pub struct DatabaseEnv {
    /// Libmdbx-sys environment.
    inner: Environment,
}

impl DatabaseMetadata for DatabaseEnv {
    fn metadata(&self) -> DatabaseMetadataValue {
        DatabaseMetadataValue::new(self.freelist().ok())
    }
}

impl DatabaseEnv {
    /// Opens the database at the specified path with the given `EnvKind`.

    pub fn open(
        path: &Path,
        kind: DatabaseEnvKind,
        args: DatabaseArguments,
    ) -> Result<DatabaseEnv, DatabaseError> {
        let mut inner_env = Environment::builder();

        let mode = match kind {
            DatabaseEnvKind::RO => Mode::ReadOnly,
            DatabaseEnvKind::RW => {
                inner_env.write_map();
                Mode::ReadWrite { sync_mode: SyncMode::SafeNoSync }
            }
        };

        // Note: We set max dbs to 256 here to allow for custom tables. This needs to be
        // set on environment creation.
        debug_assert!(Tables::ALL.len() <= 256, "number of tables exceed max dbs");
        inner_env.set_max_dbs(256);
        inner_env.set_geometry(Geometry {
            // Maximum database size of 4 TB
            size:             Some(0..(4000 * GIGABYTE)),
            // We grow the database in increments of a gigabyte
            growth_step:      Some(GIGABYTE as isize),
            shrink_threshold: Some(GIGABYTE as isize),
            page_size:        Some(PageSize::Set(default_page_size())),
        });
        #[cfg(not(windows))]
        {
            fn is_current_process(id: u32) -> bool {
                #[cfg(unix)]
                {
                    id == std::os::unix::process::parent_id() || id == std::process::id()
                }

                #[cfg(not(unix))]
                {
                    id == std::process::id()
                }
            }
            inner_env.set_handle_slow_readers(
                |process_id: u32,
                 thread_id: u32,
                 read_txn_id: u64,
                 gap: usize,
                 space: usize,
                 retry: isize| {
                    if space > MAX_SAFE_READER_SPACE {
                        let message = if is_current_process(process_id) {
                            "Current process has a long-lived database transaction that grows the \
                             database file."
                        } else {
                            "External process has a long-lived database transaction that grows the \
                             database file. Use shorter-lived read transactions or shut down the \
                             node."
                        };
                        tracing::warn!(
                            target: "brontes::db::mdbx",
                            ?process_id,
                            ?thread_id,
                            ?read_txn_id,
                            ?gap,
                            ?space,
                            ?retry,
                            message
                        )
                    }

                    brontes_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
                },
            );
        }
        inner_env.set_flags(EnvironmentFlags {
            mode,
            no_rdahead: true,
            coalesce: true,
            exclusive: args.exclusive.unwrap_or_default(),
            ..Default::default()
        });
        // Configure more readers
        inner_env.set_max_readers(DEFAULT_MAX_READERS);

        inner_env.set_rp_augment_limit(256 * 1024);

        if let Some(log_level) = args.log_level {
            // Levels higher than [LogLevel::Notice] require libmdbx built with `MDBX_DEBUG`
            // option.
            let is_log_level_available = if cfg!(debug_assertions) {
                true
            } else {
                matches!(
                    log_level,
                    LogLevel::Fatal | LogLevel::Error | LogLevel::Warn | LogLevel::Notice
                )
            };
            if is_log_level_available {
                inner_env.set_log_level(match log_level {
                    LogLevel::Fatal => 0,
                    LogLevel::Error => 1,
                    LogLevel::Warn => 2,
                    LogLevel::Notice => 3,
                    LogLevel::Verbose => 4,
                    LogLevel::Debug => 5,
                    LogLevel::Trace => 6,
                    LogLevel::Extra => 7,
                });
            } else {
                return Err(DatabaseError::LogLevelUnavailable(log_level))
            }
        }

        if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
            inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
        }

        let env = DatabaseEnv {
            inner: inner_env
                .open(path)
                .map_err(|e| DatabaseError::Open(e.into()))?,
        };

        Ok(env)
    }

    /// Creates all the defined tables, if necessary.
    pub fn create_tables(&self) -> Result<(), DatabaseError> {
        let tx = self
            .inner
            .begin_rw_txn()
            .map_err(|e| DatabaseError::InitTx(e.into()))?;

        for table in Tables::ALL {
            let flags = match table.table_type() {
                TableType::Table => DatabaseFlags::default(),
                TableType::DupSort => DatabaseFlags::DUP_SORT,
            };

            tx.create_db(Some(table.name()), flags)
                .map_err(|e| DatabaseError::CreateTable(e.into()))?;
        }

        tx.commit().map_err(|e| DatabaseError::Commit(e.into()))?;

        Ok(())
    }
}

impl Deref for DatabaseEnv {
    type Target = Environment;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

/// Returns the default page size that can be used in this OS.
pub(crate) fn default_page_size() -> usize {
    let os_page_size = page_size::get();

    // source: https://gitflic.ru/project/erthink/libmdbx/blob?file=mdbx.h#line-num-821
    let libmdbx_max_page_size = 0x10000;

    // May lead to errors if it's reduced further because of the potential size of
    // the data.
    let min_page_size = 4096;

    os_page_size.clamp(min_page_size, libmdbx_max_page_size)
}