cyqlone develop
Fast, parallel and vectorized solver for linear systems with optimal control structure.
Loading...
Searching...
No Matches
barrier.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file
4/// Barrier synchronization primitive.
5/// @ingroup topic-parallelization
6
7#include <cyqlone/config.hpp>
8#include <batmat/assume.hpp>
9#include <array>
10#include <atomic>
11#include <cstdint>
12#include <cstring>
13#include <functional>
14#include <memory>
15#include <type_traits>
16
17#ifndef CYQLONE_SANITY_CHECKS_BARRIER
18#ifndef NDEBUG
19#define CYQLONE_SANITY_CHECKS_BARRIER 1
20#else
21#define CYQLONE_SANITY_CHECKS_BARRIER 0
22#endif
23#endif
24
25namespace cyqlone {
26
27/// No-op completion function for the @ref TreeBarrier.
28/// @ingroup topic-parallelization
30 void operator()() const noexcept {} ///< Does nothing.
31};
32
33/**
34 * Fairly vanilla combining tree barrier. It is inspired by GCC 15.2's __tree_barrier, with some
35 * important API differences:
36 * - Every thread has a unique thread ID in [0, expected-1]. This eliminates the need for hashing
37 * the pthread thread IDs and for the inner search loop to find free slots in the tree.
38 * - Wait tries to spin for a given number of iterations before falling back to a futex-based
39 * atomic wait.
40 * - The barrier phase is exposed to the user.
41 * - Custom completion functions can be provided at arrival time.
42 * - Reductions and broadcasts on small values are supported.
43 * @ingroup topic-parallelization
44 */
45template <typename CompletionFn = EmptyCompletion, class PhaseType = uint32_t>
47 public:
48 enum class BarrierPhase : PhaseType {};
60
61 template <class T>
63
64 private:
65 static constexpr size_t cache_line_size = 64; ///< @todo increase to 128 on newer architectures
66 /// Storage for small values used in reductions and broadcasts.
67 /// @todo use placement new and launder, drop trivial copyability requirement
68 struct Storage {
69 alignas(cache_line_size) std::array<std::byte, cache_line_size> payload;
70 template <class T>
71 void store(T t) noexcept {
72 static_assert(sizeof(T) <= sizeof(payload));
73 static_assert(std::is_trivially_copyable_v<T>);
74 std::memcpy(payload.data(), &t, sizeof(T));
75 }
76 template <class T>
77 T load() const noexcept {
78 static_assert(sizeof(T) <= sizeof(payload));
79 static_assert(std::is_trivially_copyable_v<T>);
80 T t;
81 std::memcpy(&t, payload.data(), sizeof(T));
82 return t;
83 }
84 template <class T>
85 static constexpr bool is_compatible =
86 sizeof(T) <= sizeof(payload) && std::is_trivially_copyable_v<T>;
87 };
88 /// Atomic counters for each level of the combining tree. Aligned to avoid false sharing.
89 /// @todo figure out why the libstdc++ implementation does not reuse tickets across levels
90 /// (as is done with the storage in the reduction variant below).
91 struct alignas(cache_line_size) State {
92 using atomic_byte = std::atomic<unsigned char>;
93 using atomic_word = std::atomic<uint32_t>;
94 static constexpr bool only_word_lock_free =
95 atomic_word::is_always_lock_free && !atomic_byte::is_always_lock_free;
96 // Make the ticket size as small as possible while still being lock-free.
97 // TODO: this may be overkill and/or even unnecessary if we reuse tickets.
98 using ticket_t = std::conditional_t<only_word_lock_free, atomic_word, atomic_byte>;
99 static constexpr size_t num_levels = cache_line_size / sizeof(ticket_t);
100 std::array<ticket_t, num_levels> tickets{};
101 };
102 using ticket_value_type = typename State::ticket_t::value_type;
103
104 uint32_t expected; ///< Number of participating threads.
105 std::unique_ptr<State[]> state; ///< Combining tree state.
106 std::unique_ptr<Storage[]> storage; ///< Used for reductions.
107 Storage broadcast_storage; ///< Used for broadcasts (including after reductions).
108 [[no_unique_address]] CompletionFn completion; ///< Called when last thread arrives.
109 alignas(cache_line_size) std::atomic<BarrierPhase> phase{};
110
111#if CYQLONE_SANITY_CHECKS_BARRIER
112 State::ticket_t &get_local_phase(uint32_t thread_id) noexcept {
113 return state[thread_id >> 1].tickets[State::num_levels - 1 - (thread_id & 1)];
114 }
115 State::ticket_t &get_local_line(uint32_t thread_id) noexcept {
116 return state[thread_id >> 1].tickets[State::num_levels - 3 - (thread_id & 1)];
117 }
118 void sanity_check_arrival(uint32_t thread_id, BarrierPhase cur_phase) noexcept {
119 if (get_local_phase(thread_id).fetch_add(1, std::memory_order_relaxed) !=
120 static_cast<ticket_value_type>(cur_phase))
121 BATMAT_ASSERT(!"This thread has already arrived in this phase");
122 }
123#endif
124
125 /// Combining tree arrival. The last thread arriving at a certain ticket (counter) moves on to
126 /// the next level of the tree. When reaching the root, it returns true. The number of tickets
127 /// halves at each level, with at most two threads per ticket.
128 bool arrive_impl(BarrierPhase old_phase, uint32_t thread_id) {
129 static constexpr auto acq_rel = std::memory_order_acq_rel;
130 const auto first_of_one = static_cast<ticket_value_type>(old_phase),
131 second_of_two = static_cast<ticket_value_type>(2 * first_of_one + 1);
132
133 uint32_t level_size = expected; // Total sum in this level of the tree
134 for (size_t level = 0;; ++level) {
135 if (level_size <= 1)
136 return true;
137 BATMAT_ASSUME(level < 32);
138 thread_id >>= 1;
139 auto &ticket = state[thread_id].tickets[level];
140 const uint32_t end_node = (level_size + 1) >> 1; // Two threads per node
141 const bool last_odd = thread_id + 1 == end_node && (level_size & 1) == 1;
142 const auto target = last_odd ? first_of_one : second_of_two;
143 const auto old_value = ticket.fetch_add(1, acq_rel);
144 if (old_value != target)
145 return false;
146 level_size = end_node;
147 }
148 }
149
150 /// Fused implementation of the combining tree arrival and a reduction operation. The last
151 /// thread arriving at a certain ticket (counter) moves on to the next level of the tree. When
152 /// it does so, it reads the value written by the other thread that arrived at the same ticket,
153 /// applies the reduction function, and writes the result to be used in the next level. When
154 /// reaching the root, it stores the final value and returns true. Note that the left and right
155 /// arguments to the reduction function are determined by the thread IDs, regardless of the
156 /// order in which threads arrive. In other words, for a given number of threads, the order of
157 /// the reduction operations is fully deterministic.
158 template <class T, class F>
159 bool arrive_impl(BarrierPhase old_phase, uint32_t thread_id, T value, F reduce) {
160 static constexpr auto acq_rel = std::memory_order_acq_rel;
161 const auto first_of_one = static_cast<ticket_value_type>(old_phase),
162 second_of_two = static_cast<ticket_value_type>(2 * first_of_one + 1);
163
164 // Diagram of the storage used at each level (for expected = 4):
165 // l=0 l=1 l=2
166 // storage[0] t=0 1 3 10
167 // storage[1] t=1 2
168 // storage[2] t=2 3 7
169 // storage[3] t=3 4
170
171 uint32_t level_size = expected; // Total sum in this level of the tree
172 for (size_t level = 0;; ++level) {
173 if (level_size <= 1) {
174 broadcast_storage.store(value);
175 return true;
176 }
177 BATMAT_ASSUME(level < 32);
178 auto offset = size_t{1} << level;
179 auto write = thread_id << level;
180 storage[write].store(value);
181 thread_id >>= 1;
182 auto &ticket = state[thread_id].tickets[level];
183 const uint32_t end_node = (level_size + 1) >> 1; // Two threads per node
184 const bool last_odd = thread_id + 1 == end_node && (level_size & 1) == 1;
185 const auto target = last_odd ? first_of_one : second_of_two;
186 const auto old_value = ticket.fetch_add(1, acq_rel);
187 if (old_value != target)
188 return false;
189 if (!last_odd)
190 value = reduce(storage[write & ~offset].template load<T>(),
191 storage[write | +offset].template load<T>());
192 level_size = end_node;
193 }
194 }
195
196 /// Generic implementation of arrive with custom completion function. The arrival function
197 /// should return true when the thread is the last to arrive at the root of the tree.
198 /// Returns a token that can be used to wait for the barrier to complete.
199 /// The custom completion function is called by the last thread arriving at the root, before
200 /// advancing the barrier phase and notifying all waiting threads.
201 template <class A, class C>
202 [[nodiscard]] arrival_token arrive_with_completion(uint32_t thread_id, A arrival,
203 C &&custom_completion) {
204 BATMAT_ASSUME(thread_id < expected);
205 const auto cur_phase = phase.load(std::memory_order_relaxed);
206#if CYQLONE_SANITY_CHECKS_BARRIER
207 sanity_check_arrival(thread_id, cur_phase);
208#endif
209 if (arrival(cur_phase, thread_id)) {
210 std::invoke(std::forward<C>(custom_completion));
211 auto next_phase = static_cast<BarrierPhase>(static_cast<PhaseType>(cur_phase) + 1);
212 phase.store(next_phase, std::memory_order_release);
213 phase.notify_all();
214 }
215 return arrival_token{cur_phase};
216 }
217
218 public:
219 /// Maximum number of threads supported by this barrier implementation.
220 static constexpr uint32_t max() {
221#if CYQLONE_SANITY_CHECKS_BARRIER // Leave space for local phases for sanity checks
222 constexpr static uint32_t num_levels = State::num_levels - 4;
223#else
224 constexpr static uint32_t num_levels = State::num_levels;
225#endif
226 return num_levels > 31 ? 0xFFFFFFFF : uint32_t{1} << num_levels;
227 }
228
229 /// Create a barrier with @p expected participating threads and a completion function that is
230 /// called by the last thread that arrives at each phase.
231 TreeBarrier(uint32_t expected, CompletionFn completion)
232 : expected(expected), completion(std::move(completion)),
233 phase(static_cast<BarrierPhase>(0)) {
235 const size_t leaf_count = (expected + 1) >> 1;
236 state = std::make_unique<State[]>(leaf_count);
237 storage = std::make_unique<Storage[]>(expected);
238 }
239 TreeBarrier(const TreeBarrier &) = delete;
243
244 /// Arrive at the barrier with a custom completion function that is called by the last thread
245 /// that arrives, before advancing the barrier phase and notifying all waiting threads.
246 /// The completion function of the barrier is not called in this case.
247 /// Each thread should use a unique thread ID in [0, expected-1].
248 template <class C>
249 [[nodiscard]] arrival_token arrive_with_completion(uint32_t thread_id, C &&custom_completion) {
250 auto arrival = [this](BarrierPhase cur_phase, uint32_t thread_id) {
251 return arrive_impl(cur_phase, thread_id);
252 };
253 return arrive_with_completion(thread_id, arrival, std::forward<C>(custom_completion));
254 }
255
256 /// Arrive at the barrier. The barrier's completion function is called by the last thread
257 /// that arrives, before advancing the barrier phase and notifying all waiting threads.
258 /// Each thread should use a unique thread ID in [0, expected-1].
259 [[nodiscard]] arrival_token arrive(uint32_t thread_id) {
260 return arrive_with_completion(thread_id, completion);
261 }
262
263 /// Arrive at the barrier, recording the given line number for sanity checking to make sure
264 /// that all threads arrive from the same line or statement in the source code.
265 /// This is useful for debugging purposes to detect mismatched barrier calls, but should not
266 /// really be used otherwise. If @ref CYQLONE_SANITY_CHECKS_BARRIER is disabled, the line
267 /// number is ignored and this function is equivalent to @ref arrive(uint32_t).
268 /// Each thread should use a unique thread ID in [0, expected-1].
269 [[nodiscard]] arrival_token arrive(uint32_t thread_id, [[maybe_unused]] int line) {
270#if CYQLONE_SANITY_CHECKS_BARRIER
271 get_local_line(thread_id).store(static_cast<ticket_value_type>(line),
272 std::memory_order_relaxed);
273 return arrive_with_completion(thread_id, [&] {
274 for (uint32_t i = 0; i < expected; ++i)
275 BATMAT_ASSERT(get_local_line(i).load(std::memory_order_relaxed) ==
276 static_cast<ticket_value_type>(line));
277 completion();
278 });
279#else
280 return arrive(thread_id);
281#endif
282 }
283
284 /// Query the current barrier phase. May wrap around on overflow, but all threads will see the
285 /// same phase values in the same order.
286 [[nodiscard]] BarrierPhase current_phase() const {
287 return phase.load(std::memory_order_relaxed);
288 }
289
290 /// Number of spin iterations before falling back to futex-based wait.
291 uint32_t spin_count = 1000; // approx. 2-3 cycles/iteration (Haswell, according to llvm-mca)
292
293 /// Check if @ref wait() may block. If it returns false, the caller can call @ref wait()
294 /// and it will return immediately without spinning or sleeping. This is useful if the caller
295 /// has other non-critical work to do while waiting for other threads.
296 /// Users should still call @ref wait() before arriving again.
297 /// @note This function does not impose any memory ordering, so even when it returns false,
298 /// changes made before the arrival of other threads may not be visible yet. In contrast,
299 /// @ref wait() does ensure proper synchronization.
300 bool wait_may_block(const arrival_token &token) const noexcept {
301 return phase.load(std::memory_order_relaxed) == token.get();
302 }
303
304 /// Wait for the barrier to complete after an arrival, using the given token. Separating the
305 /// arrival and wait phases allows for overlapping computation with waiting, hiding the
306 /// synchronization latency.
307 /// Waiting on the same token multiple times is not allowed.
308 void wait(arrival_token &&token) const {
309 const auto old_phase = token.get();
310 // barring overflow, we have that current_phase >= old_phase
311 if (phase.load(std::memory_order_acquire) != old_phase) [[likely]]
312 return;
313 // Spin before calling wait
314 for (auto spin = this->spin_count; spin-- > 0;)
315 if (phase.load(std::memory_order_acquire) != old_phase) [[unlikely]]
316 return;
317 phase.wait(old_phase, std::memory_order_acquire);
318 }
319
320 /// Convenience function to arrive and wait in a single call.
321 void arrive_and_wait(uint32_t thread_id) { wait(arrive(thread_id)); }
322 /// Convenience function to arrive and wait in a single call (with optional sanity check).
323 void arrive_and_wait(uint32_t thread_id, int line) { wait(arrive(thread_id, line)); }
324 /// Convenience function to arrive and wait in a single call (with custom completion).
325 template <class C>
326 requires std::is_void_v<std::invoke_result_t<C &&>>
327 void arrive_and_wait_with_completion(uint32_t thread_id, C &&custom_completion) {
328 wait(arrive_with_completion(thread_id, std::forward<C>(custom_completion)));
329 }
330 /// Convenience function to arrive and wait in a single call (with custom completion).
331 /// Broadcasts the return value of the custom completion function to all threads.
332 template <class C>
333 requires(!std::is_void_v<std::invoke_result_t<C &&>> &&
334 !std::is_reference_v<std::invoke_result_t<C &&>> &&
335 Storage::template is_compatible<std::invoke_result_t<C &&>>)
336 [[nodiscard]] auto arrive_and_wait_with_completion(uint32_t thread_id, C &&custom_completion) {
337 using ret_t = std::invoke_result_t<C &&>;
339 [this, c{std::forward<C>(custom_completion)}] mutable {
340 broadcast_storage.store(std::invoke(std::forward<C>(c)));
341 }));
342 return broadcast_storage.template load<ret_t>();
343 }
344
345 /// Combining tree reduction across all threads. Deterministic application order for a given
346 /// number of threads.
347 template <class T, class F>
348 [[nodiscard]] arrival_token_typed<T> arrive_reduce(uint32_t thread_id, T x, F reduce) {
349 auto arrival = [this, &reduce, &x](BarrierPhase cur_phase, uint32_t thread_id) {
350 return arrive_impl(cur_phase, thread_id, std::move(x), std::move(reduce));
351 };
352 return arrival_token_typed<T>{arrive_with_completion(thread_id, arrival, [] {})};
353 }
354
355 /// Wait for the result of an @ref arrive_reduce call and obtain the reduced value.
356 template <class T>
357 [[nodiscard]] T wait_reduce(arrival_token_typed<T> &&token) {
358 wait(std::move(token));
359 return broadcast_storage.template load<T>();
360 }
361
362 /// Combining tree reduction across all threads. Deterministic application order for a given
363 /// number of threads.
364 template <class T, class F>
365 [[nodiscard]] T reduce(uint32_t thread_id, T x, F reduce) {
366 return wait_reduce(arrive_reduce(thread_id, std::move(x), std::move(reduce)));
367 }
368
369 /// Broadcast a value from the source thread to all other threads. All threads must call this
370 /// function with the same source thread ID.
371 template <class T>
372 [[nodiscard]] T broadcast(uint32_t thread_id, T &&x, uint32_t src = 0) {
373 if (thread_id == src)
374 storage[thread_id].store(std::forward<T>(x));
375 // TODO: in debug mode, we could have the other threads write some unused bit in the storage
376 // to detect if all calls used the same src.
377 auto custom_completion = [this, src] {
378 broadcast_storage.store(storage[src].template load<T>());
379 };
380 wait(arrive_with_completion(thread_id, custom_completion));
381 return broadcast_storage.template load<T>();
382 }
383};
384
385} // namespace cyqlone
#define BATMAT_ASSUME(x)
#define BATMAT_ASSERT(x)
BarrierPhase get() const noexcept
Definition barrier.hpp:58
arrival_token & operator=(arrival_token &&phase)=default
arrival_token(BarrierPhase phase)
Definition barrier.hpp:53
arrival_token & operator=(const arrival_token &phase)=delete
arrival_token(arrival_token &&phase)=default
arrival_token(const arrival_token &phase)=delete
static constexpr uint32_t max()
Maximum number of threads supported by this barrier implementation.
Definition barrier.hpp:220
TreeBarrier(uint32_t expected, CompletionFn completion)
Create a barrier with expected participating threads and a completion function that is called by the ...
Definition barrier.hpp:231
void arrive_and_wait_with_completion(uint32_t thread_id, C &&custom_completion)
Convenience function to arrive and wait in a single call (with custom completion).
Definition barrier.hpp:327
bool wait_may_block(const arrival_token &token) const noexcept
Check if wait() may block.
Definition barrier.hpp:300
State::ticket_t & get_local_line(uint32_t thread_id) noexcept
Definition barrier.hpp:115
bool arrive_impl(BarrierPhase old_phase, uint32_t thread_id, T value, F reduce)
Fused implementation of the combining tree arrival and a reduction operation.
Definition barrier.hpp:159
T wait_reduce(arrival_token_typed< T > &&token)
Wait for the result of an arrive_reduce call and obtain the reduced value.
Definition barrier.hpp:357
BarrierPhase current_phase() const
Query the current barrier phase.
Definition barrier.hpp:286
void arrive_and_wait(uint32_t thread_id, int line)
Convenience function to arrive and wait in a single call (with optional sanity check).
Definition barrier.hpp:323
TreeBarrier(const TreeBarrier &)=delete
arrival_token_typed< T > arrive_reduce(uint32_t thread_id, T x, F reduce)
Combining tree reduction across all threads.
Definition barrier.hpp:348
arrival_token arrive(uint32_t thread_id)
Arrive at the barrier.
Definition barrier.hpp:259
TreeBarrier(TreeBarrier &&)=default
arrival_token arrive_with_completion(uint32_t thread_id, C &&custom_completion)
Arrive at the barrier with a custom completion function that is called by the last thread that arrive...
Definition barrier.hpp:249
void wait(arrival_token &&token) const
Wait for the barrier to complete after an arrival, using the given token.
Definition barrier.hpp:308
auto arrive_and_wait_with_completion(uint32_t thread_id, C &&custom_completion)
Convenience function to arrive and wait in a single call (with custom completion).
Definition barrier.hpp:336
T reduce(uint32_t thread_id, T x, F reduce)
Definition barrier.hpp:365
arrival_token arrive_with_completion(uint32_t thread_id, A arrival, C &&custom_completion)
Generic implementation of arrive with custom completion function.
Definition barrier.hpp:202
void sanity_check_arrival(uint32_t thread_id, BarrierPhase cur_phase) noexcept
Definition barrier.hpp:118
arrival_token arrive(uint32_t thread_id, int line)
Arrive at the barrier, recording the given line number for sanity checking to make sure that all thre...
Definition barrier.hpp:269
TreeBarrier & operator=(TreeBarrier &&)=default
void arrive_and_wait(uint32_t thread_id)
Convenience function to arrive and wait in a single call.
Definition barrier.hpp:321
TreeBarrier & operator=(const TreeBarrier &)=delete
State::ticket_t & get_local_phase(uint32_t thread_id) noexcept
Definition barrier.hpp:112
typename State::ticket_t::value_type ticket_value_type
Definition barrier.hpp:102
T broadcast(uint32_t thread_id, T &&x, uint32_t src=0)
Broadcast a value from the source thread to all other threads.
Definition barrier.hpp:372
bool arrive_impl(BarrierPhase old_phase, uint32_t thread_id)
Combining tree arrival.
Definition barrier.hpp:128
No-op completion function for the TreeBarrier.
Definition barrier.hpp:29
void operator()() const noexcept
Does nothing.
Definition barrier.hpp:30
Atomic counters for each level of the combining tree.
Definition barrier.hpp:91
std::atomic< uint32_t > atomic_word
Definition barrier.hpp:93
std::array< ticket_t, num_levels > tickets
Definition barrier.hpp:100
std::atomic< unsigned char > atomic_byte
Definition barrier.hpp:92
static constexpr size_t num_levels
Definition barrier.hpp:99
std::conditional_t< only_word_lock_free, atomic_word, atomic_byte > ticket_t
Definition barrier.hpp:98
static constexpr bool only_word_lock_free
Definition barrier.hpp:94
Storage for small values used in reductions and broadcasts.
Definition barrier.hpp:68
static constexpr bool is_compatible
Definition barrier.hpp:85
void store(T t) noexcept
Definition barrier.hpp:71
T load() const noexcept
Definition barrier.hpp:77
std::array< std::byte, cache_line_size > payload
Definition barrier.hpp:69