Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
parallel_for_mutex_pool.cpp
Go to the documentation of this file.
2#ifndef NO_MULTITHREADING
3#include "log.hpp"
4#include "thread.hpp"
5#include <atomic>
6#include <condition_variable>
7#include <functional>
8#include <mutex>
9#include <queue>
10#include <thread>
11#include <vector>
12
14
15namespace {
16
17class ThreadPool {
18 public:
19 ThreadPool(size_t num_threads);
20 ThreadPool(const ThreadPool& other) = delete;
21 ThreadPool(ThreadPool&& other) = delete;
22 ~ThreadPool();
23
24 ThreadPool& operator=(const ThreadPool& other) = delete;
25 ThreadPool& operator=(ThreadPool&& other) = delete;
26
27 void start_tasks(size_t num_iterations, const std::function<void(size_t)>& func)
28 {
29 {
30 std::unique_lock<std::mutex> lock(tasks_mutex);
31 task_ = func;
32 num_iterations_ = num_iterations;
33 iteration_ = 0;
34 complete_ = 0;
35 }
36 condition.notify_all();
37
38 do_iterations();
39
40 {
41 std::unique_lock<std::mutex> lock(tasks_mutex);
42 complete_condition_.wait(lock, [this] { return complete_ == num_iterations_; });
43 }
44 }
45
46 private:
47 std::vector<std::thread> workers;
48 std::mutex tasks_mutex;
49 std::function<void(size_t)> task_;
50 size_t num_iterations_ = 0;
51 size_t iteration_ = 0;
52 size_t complete_ = 0;
54 std::condition_variable complete_condition_;
55 bool stop = false;
56
57 BB_NO_PROFILE void worker_loop(size_t thread_index);
58
59 void do_iterations()
60 {
61 while (true) {
62 size_t iteration = 0;
63 {
64 std::unique_lock<std::mutex> lock(tasks_mutex);
65 if (iteration_ == num_iterations_) {
66 return;
67 }
68 iteration = iteration_++;
69 }
70 task_(iteration);
71 {
72 std::unique_lock<std::mutex> lock(tasks_mutex);
73 if (++complete_ == num_iterations_) {
74 complete_condition_.notify_one();
75 return;
76 }
77 }
78 }
79 }
80};
81
82ThreadPool::ThreadPool(size_t num_threads)
83{
84 workers.reserve(num_threads);
85 for (size_t i = 0; i < num_threads; ++i) {
86 workers.emplace_back(&ThreadPool::worker_loop, this, i);
87 }
88}
89
90ThreadPool::~ThreadPool()
91{
92 {
93 std::unique_lock<std::mutex> lock(tasks_mutex);
94 stop = true;
95 }
96 condition.notify_all();
97 for (auto& worker : workers) {
98 worker.join();
99 }
100}
101
102void ThreadPool::worker_loop(size_t /*unused*/)
103{
104 // info("created worker ", worker_num);
105 while (true) {
106 {
107 std::unique_lock<std::mutex> lock(tasks_mutex);
108 condition.wait(lock, [this] { return (iteration_ < num_iterations_) || stop; });
109
110 if (stop) {
111 break;
112 }
113 }
114 do_iterations();
115 }
116 // info("worker exit ", worker_num);
117}
118} // namespace
119
120namespace bb {
125void parallel_for_mutex_pool(size_t num_iterations, const std::function<void(size_t)>& func)
126{
127 static ThreadPool pool(get_num_cpus() - 1);
128 // Note that if this is used safely, we don't need the std::atomic_bool (can use bool), but if we are catching the
129 // mess up case of nesting parallel_for this should be atomic
130 static std::atomic_bool nested = false;
131 // Check if we are already in a nested parallel_for_mutex_pool call
132 bool expected = false;
133 if (!nested.compare_exchange_strong(expected, true)) {
134 throw_or_abort("Error: Nested parallel_for_mutex_pool calls are not allowed.");
135 }
136 // info("starting job with iterations: ", num_iterations);
137 pool.start_tasks(num_iterations, func);
138 // info("done");
139 nested = false;
140}
141} // namespace bb
142#endif
#define BB_NO_PROFILE
Entry point for Barretenberg command-line interface.
void parallel_for_mutex_pool(size_t num_iterations, const std::function< void(size_t)> &func)
size_t get_num_cpus()
Definition thread.hpp:12
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
void throw_or_abort(std::string const &err)