Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
parallel_for_atomic_pool.cpp
Go to the documentation of this file.
1#ifndef NO_MULTITHREADING
2#include "log.hpp"
3#include "thread.hpp"
4#include <atomic>
5#include <condition_variable>
6#include <functional>
7#include <mutex>
8#include <queue>
9#include <thread>
10#include <vector>
11
12namespace {
13
14class ThreadPool {
15 public:
16 ThreadPool(size_t num_threads);
17 ThreadPool(const ThreadPool& other) = delete;
18 ThreadPool(ThreadPool&& other) = delete;
19 ~ThreadPool();
20
21 ThreadPool& operator=(const ThreadPool& other) = delete;
22 ThreadPool& operator=(ThreadPool&& other) = delete;
23
24 void start_tasks(size_t num_iterations, const std::function<void(size_t)>& func)
25 {
26 {
27 std::unique_lock<std::mutex> lock(tasks_mutex);
28 task_ = func;
29 num_iterations_ = num_iterations;
30 iteration_ = 0;
31 iterations_completed_ = 0;
32 }
33 condition.notify_all();
34
35 do_iterations();
36
37 while (iterations_completed_ != num_iterations) {
38 }
39 }
40
41 private:
42 std::vector<std::thread> workers;
43 std::mutex tasks_mutex;
44 std::function<void(size_t)> task_;
45 size_t num_iterations_ = 0;
46 std::atomic<size_t> iteration_;
47 std::atomic<size_t> iterations_completed_;
49 bool stop = false;
50
51 void worker_loop(size_t thread_index);
52
53 void do_iterations()
54 {
55 size_t iteration = 0;
56 while ((iteration = iteration_.fetch_add(1, std::memory_order_seq_cst)) < num_iterations_) {
57 // info("main thread processing iteration ", iteration);
58 task_(iteration);
59 iterations_completed_++;
60 }
61 }
62};
63
64ThreadPool::ThreadPool(size_t num_threads)
65 : iteration_(0)
66 , iterations_completed_(0)
67{
68 workers.reserve(num_threads);
69 for (size_t i = 0; i < num_threads; ++i) {
70 workers.emplace_back(&ThreadPool::worker_loop, this, i);
71 }
72}
73
74ThreadPool::~ThreadPool()
75{
76 {
77 std::unique_lock<std::mutex> lock(tasks_mutex);
78 stop = true;
79 }
80 condition.notify_all();
81 for (auto& worker : workers) {
82 worker.join();
83 }
84}
85
86void ThreadPool::worker_loop(size_t /*unused*/)
87{
88 // info("created worker ", worker_num);
89 while (true) {
90 {
91 std::unique_lock<std::mutex> lock(tasks_mutex);
92 condition.wait(lock, [this] { return (iteration_ < num_iterations_) || stop; });
93
94 if (stop) {
95 break;
96 }
97 }
98 do_iterations();
99 }
100 // info("worker exit ", worker_num);
101}
102} // namespace
103
104namespace bb {
109void parallel_for_atomic_pool(size_t num_iterations, const std::function<void(size_t)>& func)
110{
111 static ThreadPool pool(get_num_cpus() - 1);
112
113 // info("starting job with iterations: ", num_iterations);
114 pool.start_tasks(num_iterations, func);
115 // info("done");
116}
117} // namespace bb
118#endif
Entry point for Barretenberg command-line interface.
size_t get_num_cpus()
Definition thread.hpp:12
void parallel_for_atomic_pool(size_t num_iterations, const std::function< void(size_t)> &func)
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13