cyqlone develop
Fast, parallel and vectorized solver for linear systems with optimal control structure.
Loading...
Searching...
No Matches
parallel.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file
4/// Parallel execution context and synchronization primitives.
5/// @ingroup topic-parallelization
6
7#include <cyqlone/barrier.hpp>
8#include <cyqlone/config.hpp>
9#include <batmat/config.hpp>
10#if BATMAT_WITH_OPENMP
11#include <batmat/openmp.h>
12#else
13#include <batmat/thread-pool.hpp>
14#endif
15#include <guanaqo/trace.hpp>
16#include <cstdint>
17#include <functional>
18#include <type_traits>
19#include <utility>
20
22
23struct SharedContext;
24template <class SC = SharedContext>
25struct Context;
26
27/// Abstraction for a parallel execution context: a set of threads that can synchronize and
28/// communicate with each other using barriers.
29/// @see Context
30/// @ingroup topic-parallelization
32#if GUANAQO_WITH_TRACING
33 struct completion_type {
34 void operator()() const noexcept { GUANAQO_TRACE_INSTANT("barrier-complete", 0); }
35 };
36#else
38#endif
40 const index_t num_thr;
41 barrier_type barrier{static_cast<uint32_t>(num_thr), {}};
42#if !BATMAT_WITH_OPENMP
43 batmat::thread_pool thread_pool{static_cast<size_t>(num_thr)};
44#endif
45 /// Execute the given function in parallel on all threads, blocking until completion.
46 /// The function will be called with a @ref Context that contains the thread index, and that can
47 /// be used to synchronize and communicate between threads.
48 template <class F>
49 void run(F &&);
50
51 /// Configure the barrier spin count used in parallel synchronization before falling back to a
52 /// futex wait.
53 uint32_t set_barrier_spin_count(uint32_t spin_count) {
54 static_assert(std::is_same_v<decltype(barrier.spin_count), decltype(spin_count)>);
55 return std::exchange(barrier.spin_count, spin_count);
56 }
57};
58
59/// Thread context for parallel execution. Each thread has a unique thread index, and can
60/// synchronize and communicate with other threads in the same shared context.
61/// @see SharedContext
62/// @ingroup topic-parallelization
63template <class SC>
64struct Context {
66#if GUANAQO_WITH_TRACING && !GUANAQO_WITH_PERFETTO
67 struct arrival_token {
68 using token_t = typename shared_context_type::barrier_type::arrival_token;
69 token_t token;
71 };
72#else
73 using arrival_token = typename shared_context_type::barrier_type::arrival_token;
74#endif
75
77 const index_t index, num_thr = shared.num_thr;
78
79 friend constexpr bool operator==(const Context &a, const Context &b) {
80 return &a.shared == &b.shared && a.index == b.index;
81 }
82
83 /// Check if this thread is the master thread (thread index 0).
84 /// Useful for determining which thread should perform operations like printing to the console,
85 /// which should be done by a single thread and does not require synchronization.
86 [[nodiscard]] bool is_master() const { return index == 0; }
87
88 /// Arrive at the barrier and obtain a token that can be used to wait for completion of the
89 /// current barrier phase.
90 /// @note Token must be awaited before any other call to arrive.
91 [[nodiscard]] arrival_token arrive() {
92#if GUANAQO_WITH_TRACING && !GUANAQO_WITH_PERFETTO
93 auto trace = guanaqo::get_trace_logger().trace("barrier-arrive", index);
94 return {shared.barrier.arrive(static_cast<uint32_t>(index)), std::move(trace)};
95#else
96 return shared.barrier.arrive(static_cast<uint32_t>(index));
97#endif
98 }
99 /// Await a token returned by @ref arrive(), waiting for the barrier phase to complete.
100 void wait(arrival_token &&token) {
101#if GUANAQO_WITH_TRACING && !GUANAQO_WITH_PERFETTO
102 auto trace = std::move(token.trace);
103 shared.barrier.wait(std::move(token.token));
104#else
105 shared.barrier.wait(std::move(token));
106#endif
107 }
108
109 /// Arrive at the barrier and wait for the barrier phase to complete. This is a convenience
110 /// wrapper around @ref arrive() and @ref wait() for the common case where the thread does not
111 /// have other work to do while waiting.
113#if !GUANAQO_WITH_PERFETTO
114 GUANAQO_TRACE("barrier-arrive-and-wait", index);
115#endif
116 shared.barrier.arrive_and_wait(static_cast<uint32_t>(index));
117 }
118 /// Debug version of @ref arrive_and_wait() that performs a sanity check to ensure that all
119 /// threads are arriving at the same line of code. The @p line parameter should be the same
120 /// for all threads arriving at the same barrier. It is only verified in debug builds, and is
121 /// equivalent to @ref arrive_and_wait() in release builds.
122 void arrive_and_wait(int line) {
123#if !GUANAQO_WITH_PERFETTO
124 GUANAQO_TRACE("barrier-arrive-and-wait", index);
125#endif
126 shared.barrier.arrive_and_wait(static_cast<uint32_t>(index), line);
127 }
128
129 /// Broadcast a value @p x from the thread with index @p src to all threads.
130 template <class T>
131 T broadcast(T x, index_t src = 0) {
132 return shared.barrier.broadcast(static_cast<uint32_t>(index), std::move(x),
133 static_cast<uint32_t>(src));
134 }
135
136 /// Call a function @p f with the given @p args on a single thread and broadcast the return
137 /// value to all threads.
138 template <class F, class... Args>
139 auto call_broadcast(F &&f, Args &&...args) -> std::invoke_result_t<F, Args...> {
140 using T = std::invoke_result_t<F, Args...>;
141 // TODO: implement with a relaxed atomic that gets reset during the completion handler,
142 // so only the first thread that arrives will execute the function, rather than always
143 // executing on the master thread.
144 if (is_master())
145 return broadcast(std::invoke(std::forward<F>(f), std::forward<Args>(args)...), 0);
146 else
147 return broadcast(T{}, 0);
148 }
149
150 /// Perform a reduction of @p x across all threads using the given binary function @p func.
151 /// Returns a token that can be used to wait for the reduction to complete and obtain the
152 /// reduced value.
153 template <class T, class F>
154 [[nodiscard]] auto arrive_reduce(T x, F func) {
155 return shared.barrier.arrive_reduce(static_cast<uint32_t>(index), std::move(x),
156 std::move(func));
157 }
158
159 /// Wait for the reduction initiated by @ref arrive_reduce() to complete and obtain the reduced
160 /// value.
161 template <class T>
162 T wait_reduce(shared_context_type::barrier_type::template arrival_token_typed<T> &&token) {
163 return shared.barrier.wait_reduce(std::move(token));
164 }
165
166 /// Perform a reduction of @p x across all threads using the given binary function @p func, and
167 /// wait for the result.
168 template <class T, class F>
169 T reduce(T x, F func) {
170 return shared.barrier.reduce(static_cast<uint32_t>(index), std::move(x), std::move(func));
171 }
172
173 /// Reduction with `std::plus`, i.e., summation across all threads.
174 /// @see reduce(T,F)
175 template <class T>
176 T reduce(T x) {
177 return reduce(std::move(x), std::plus<>{});
178 }
179
180 /// Wait for all threads to reach this point, then run the given function on a single thread
181 /// before releasing all threads again. Changes by all threads are visible during the call to
182 /// @p f and changes made by @p f are visible to all threads after this function returns.
183 template <class F>
184 void run_single_sync(F &&f) {
185 shared.barrier.arrive_and_wait_with_completion(static_cast<uint32_t>(index),
186 std::forward<F>(f));
187 }
188};
189
190template <class F>
192#if !BATMAT_WITH_OPENMP
193 thread_pool.sync_run_n(num_thr, [this, &f](index_t i, index_t) {
194 Context<SharedContext> ctx{.shared = *this, .index = i};
195 f(ctx);
196 });
197#else
198 BATMAT_OMP(parallel for num_threads(num_thr))
199 for (index_t i = 0; i < num_thr; ++i) {
200 Context<SharedContext> ctx{.shared = *this, .index = i};
201 f(ctx);
202 }
203#endif
204}
205
206} // namespace cyqlone::parallel
Barrier synchronization primitive.
Fairly vanilla combining tree barrier.
Definition barrier.hpp:46
TraceLogger & get_trace_logger()
#define GUANAQO_TRACE(name, instance,...)
#define GUANAQO_TRACE_INSTANT(name, instance)
#define BATMAT_OMP(X)
No-op completion function for the TreeBarrier.
Definition barrier.hpp:29
Thread context for parallel execution.
Definition parallel.hpp:64
T broadcast(T x, index_t src=0)
Broadcast a value x from the thread with index src to all threads.
Definition parallel.hpp:131
T reduce(T x)
Reduction with std::plus, i.e., summation across all threads.
Definition parallel.hpp:176
auto call_broadcast(F &&f, Args &&...args) -> std::invoke_result_t< F, Args... >
Call a function f with the given args on a single thread and broadcast the return value to all thread...
Definition parallel.hpp:139
void run_single_sync(F &&f)
Wait for all threads to reach this point, then run the given function on a single thread before relea...
Definition parallel.hpp:184
void arrive_and_wait()
Arrive at the barrier and wait for the barrier phase to complete.
Definition parallel.hpp:112
bool is_master() const
Check if this thread is the master thread (thread index 0).
Definition parallel.hpp:86
typename shared_context_type::barrier_type::arrival_token arrival_token
Definition parallel.hpp:73
auto arrive_reduce(T x, F func)
Perform a reduction of x across all threads using the given binary function func.
Definition parallel.hpp:154
T wait_reduce(shared_context_type::barrier_type::template arrival_token_typed< T > &&token)
Wait for the reduction initiated by arrive_reduce() to complete and obtain the reduced value.
Definition parallel.hpp:162
void arrive_and_wait(int line)
Debug version of arrive_and_wait() that performs a sanity check to ensure that all threads are arrivi...
Definition parallel.hpp:122
T reduce(T x, F func)
Perform a reduction of x across all threads using the given binary function func, and wait for the re...
Definition parallel.hpp:169
void wait(arrival_token &&token)
Await a token returned by arrive(), waiting for the barrier phase to complete.
Definition parallel.hpp:100
friend constexpr bool operator==(const Context &a, const Context &b)
Definition parallel.hpp:79
arrival_token arrive()
Arrive at the barrier and obtain a token that can be used to wait for completion of the current barri...
Definition parallel.hpp:91
shared_context_type & shared
Definition parallel.hpp:76
Abstraction for a parallel execution context: a set of threads that can synchronize and communicate w...
Definition parallel.hpp:31
void run(F &&)
Execute the given function in parallel on all threads, blocking until completion.
Definition parallel.hpp:191
uint32_t set_barrier_spin_count(uint32_t spin_count)
Configure the barrier spin count used in parallel synchronization before falling back to a futex wait...
Definition parallel.hpp:53
TreeBarrier< completion_type, uint16_t > barrier_type
Definition parallel.hpp:39
ScopedLog trace(const char *name, int64_t instance, int64_t flop_count=-1)