Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
lmdb_store_wrapper.cpp
Go to the documentation of this file.
5#include "napi.h"
6#include <algorithm>
7#include <bits/chrono.h>
8#include <chrono>
9#include <cstdint>
10#include <iterator>
11#include <memory>
12#include <optional>
13#include <ratio>
14#include <stdexcept>
15#include <utility>
16
17using namespace bb::nodejs;
18using namespace bb::nodejs::lmdb_store;
19
20const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
21const uint64_t DEFAULT_MAX_READERS = 16;
22const uint64_t DEFAULT_CURSOR_PAGE_SIZE = 10;
23
24LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)
25 : ObjectWrap(info)
26{
27 Napi::Env env = info.Env();
28
29 size_t data_dir_index = 0;
30 std::string data_dir;
31 if (info.Length() > data_dir_index && info[data_dir_index].IsString()) {
32 data_dir = info[data_dir_index].As<Napi::String>();
33 } else {
34 throw Napi::TypeError::New(env, "Directory needs to be a string");
35 }
36
37 size_t map_size_index = 1;
38 uint64_t map_size = DEFAULT_MAP_SIZE;
39 if (info.Length() > map_size_index) {
41 map_size = info[map_size_index].As<Napi::Number>().Uint32Value();
42 } else {
43 throw Napi::TypeError::New(env, "Map size must be a number or an object");
44 }
45 }
46
47 size_t max_readers_index = 2;
49 if (info.Length() > max_readers_index) {
51 max_readers = info[max_readers_index].As<Napi::Number>().Uint32Value();
52 } else if (!info[max_readers_index].IsUndefined()) {
53 throw Napi::TypeError::New(env, "The number of readers must be a number");
54 }
55 }
56
58
60
63
69
71
73
74 // The close operation requires exclusive execution, no other operations can be run concurrently with it
76
78}
79
80Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
81{
83}
84
85Napi::Function LMDBStoreWrapper::get_class(Napi::Env env)
86{
87 return DefineClass(env,
88 "Store",
89 {
91 });
92}
93
94// Simply verify that the store is still valid and that close has not been called
96{
97 if (_store) {
98 return;
99 }
100 throw std::runtime_error(format("LMDB store unavailable, was close already called?"));
101}
102
104{
105 verify_store();
106 _store->open_database(req.db, !req.uniqueKeys.value_or(true));
107 return { true };
108}
109
111{
112 verify_store();
114 lmdblib::KeysVector keys = req.keys;
115 _store->get(keys, vals, req.db);
116 return { vals };
117}
118
120{
121 verify_store();
123 for (const auto& entry : req.entries) {
124 key_set.insert(entry.first);
125 }
126
127 lmdblib::KeysVector keys(key_set.begin(), key_set.end());
129 _store->get(keys, vals, req.db);
130
131 std::vector<bool> exists;
132
133 for (const auto& entry : req.entries) {
134 const auto& key = entry.first;
135 const auto& requested_values = entry.second;
136
137 const auto& key_it = std::find(keys.begin(), keys.end(), key);
138 if (key_it == keys.end()) {
139 // this shouldn't happen. It means we missed a key when we created the key_set
140 exists.push_back(false);
141 continue;
142 }
143
144 // should be fine to convert this to an index in the array?
145 const auto& values = vals[static_cast<size_t>(key_it - keys.begin())];
146
147 if (!values.has_value()) {
148 exists.push_back(false);
149 continue;
150 }
151
152 // client just wanted to know if the key exists
153 if (!requested_values.has_value()) {
154 exists.push_back(true);
155 continue;
156 }
157
158 exists.push_back(std::all_of(requested_values->begin(), requested_values->end(), [&](const auto& val) {
159 return std::find(values->begin(), values->end(), val) != values->begin();
160 }));
161 }
162
163 return { exists };
164}
165
167{
168 verify_store();
169 bool reverse = req.reverse.value_or(false);
171 bool one_page = req.onePage.value_or(false);
172 lmdblib::Key key = req.key;
173
174 auto tx = _store->create_shared_read_transaction();
175 lmdblib::LMDBCursor::SharedPtr cursor = _store->create_cursor(tx, req.db);
176 bool start_ok = cursor->set_at_key(key);
177
178 if (!start_ok) {
179 // we couldn't find exactly the requested key. Find the next biggest one.
180 start_ok = cursor->set_at_key_gte(key);
181 // if we found a key that's greater _and_ we want to go in reverse order
182 // then we're actually outside the requested bounds, we need to go back one position
183 if (start_ok && reverse) {
185 // read_prev returns `true` if there's nothing more to read
186 // turn this into a "not ok" because there's nothing in the db for this cursor to read
187 start_ok = !cursor->read_prev(1, entries);
188 } else if (!start_ok && reverse) {
189 // we couldn't find a key greater than our starting point _and_ we want to go in reverse..
190 // then we start at the end of the database (the client requested to start at a key greater than anything in
191 // the DB)
192 start_ok = cursor->set_at_end();
193 }
194
195 // in case we're iterating in ascending order and we can't find the exact key or one that's greater than it
196 // then that means theren's nothing in the DB for the cursor to read
197 }
198
199 // we couldn't find a starting position
200 if (!start_ok) {
201 return { std::nullopt, {} };
202 }
203
204 auto [done, first_page] = _advance_cursor(*cursor, reverse, page_size);
205 // cursor finished after reading a single page or client only wanted the first page
206 if (done || one_page) {
207 return { std::nullopt, first_page };
208 }
209
210 auto cursor_id = cursor->id();
211 {
213 _cursors[cursor_id] = { cursor, reverse };
214 }
215
216 return { cursor_id, first_page };
217}
218
220{
221 {
223 _cursors.erase(req.cursor);
224 }
225 return { true };
226}
227
229{
231
232 {
234 data = _cursors.at(req.cursor);
235 }
236
238 auto [done, entries] = _advance_cursor(*data.cursor, data.reverse, page_size);
239 return { entries, done };
240}
241
243{
245
246 {
248 data = _cursors.at(req.cursor);
249 }
250
251 auto [done, count] = _advance_cursor_count(*data.cursor, data.reverse, req.endKey);
252 return { count, done };
253}
254
256{
257 verify_store();
259 batches.reserve(req.batches.size());
260
261 for (const auto& data : req.batches) {
262 lmdblib::LMDBStore::PutData batch{ data.second.addEntries, data.second.removeEntries, data.first };
263 batches.push_back(batch);
264 }
265
266 auto start = std::chrono::high_resolution_clock::now();
267 _store->put(batches);
268 auto end = std::chrono::high_resolution_clock::now();
269 std::chrono::duration<uint64_t, std::nano> duration_ns = end - start;
270
271 return { duration_ns.count() };
272}
273
275{
276 verify_store();
278 auto [map_size, physical_file_size] = _store->get_stats(stats);
279 return { stats, map_size, physical_file_size };
280}
281
283{
284 // prevent this store from receiving further messages
286
287 {
288 // close all of the open read cursors
289 std::lock_guard cursors(_cursor_mutex);
290 _cursors.clear();
291 }
292
293 // and finally close the database handle
294 _store.reset(nullptr);
295
296 return { true };
297}
298
300{
301 verify_store();
302 _store->copy_store(req.dstPath, req.compact.value_or(false));
303
304 return { true };
305}
306
308 bool reverse,
309 uint64_t page_size)
310{
312 bool done = reverse ? cursor.read_prev(page_size, entries) : cursor.read_next(page_size, entries);
313 return std::make_pair(done, entries);
314}
315
317 bool reverse,
318 const lmdblib::Key& end_key)
319{
320 uint64_t count = 0;
321 bool done = reverse ? cursor.count_until_prev(end_key, count) : cursor.count_until_next(end_key, count);
322 return std::make_pair(done, count);
323}
bool count_until_next(const Key &key, uint64_t &count) const
bool read_next(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
bool read_prev(uint64_t numKeysToRead, KeyDupValuesVector &keyValuePairs) const
std::shared_ptr< LMDBCursor > SharedPtr
bool count_until_prev(const Key &key, uint64_t &count) const
void register_handler(uint32_t msgType, T *self, R(T::*handler)() const, bool unique=false)
Napi::Promise process_message(const Napi::CallbackInfo &info)
StartCursorResponse start_cursor(const StartCursorRequest &req)
GetResponse get(const GetRequest &req)
static Napi::Function get_class(Napi::Env env)
BoolResponse close_cursor(const CloseCursorRequest &req)
BoolResponse open_database(const OpenDatabaseRequest &req)
bb::nodejs::AsyncMessageProcessor _msg_processor
HasResponse has(const HasRequest &req)
BatchResponse batch(const BatchRequest &req)
BoolResponse copy_store(const CopyStoreRequest &req)
std::unordered_map< uint64_t, CursorData > _cursors
static std::pair< bool, uint64_t > _advance_cursor_count(const lmdblib::LMDBCursor &cursor, bool reverse, const lmdblib::Key &end_key)
AdvanceCursorResponse advance_cursor(const AdvanceCursorRequest &req)
AdvanceCursorCountResponse advance_cursor_count(const AdvanceCursorCountRequest &req)
Napi::Value call(const Napi::CallbackInfo &)
The only instance method exposed to JavaScript. Takes a msgpack Message and returns a Promise.
static std::pair< bool, lmdblib::KeyDupValuesVector > _advance_cursor(const lmdblib::LMDBCursor &cursor, bool reverse, uint64_t page_size)
std::unique_ptr< lmdblib::LMDBStore > _store
std::string format(Args... args)
Definition log.hpp:20
void info(Args... args)
Definition log.hpp:70
const std::vector< FF > data
const uint64_t DEFAULT_MAP_SIZE
const uint64_t DEFAULT_MAX_READERS
const uint64_t DEFAULT_CURSOR_PAGE_SIZE
std::vector< Key > KeysVector
Definition types.hpp:13
std::vector< uint8_t > Key
Definition types.hpp:11
std::vector< KeyValuesPair > KeyDupValuesVector
Definition types.hpp:18
std::vector< OptionalValues > OptionalValuesVector
Definition types.hpp:17
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13