vs2019 C++20 latch
-
- 01 latch
- 02 latch的一個實作
01 latch
<latch>
闩:單次使用的線程屏障。
latch 是 ptrdiff_t 類型的向下計數器,它能用于同步線程。在建立時初始化計數器的值。
線程可能在 latch 上阻塞直至計數器減少到零。沒有可能增加或重置計數器,這使得 latch 為單次使用的屏障。
同時調用 latch 的成員函數,除了析構函數,不引入資料競争。
不同于 std::barrier ,參與線程能減少 std::latch 多于一次。1
方法 | 作用 |
---|---|
latch | 不可指派 |
count_down | 以不阻塞的方式減少計數器 |
try_wait | 測試内部計數器是否等于零 |
wait | 阻塞直至計數器抵達零 |
arrive_and_wait | 減少計數器并阻塞直至它抵達零 |
max | [靜态]實作所支援的計數器最大值 |
latch的參考頭檔案 std::latch2
namespace std {
class latch {
public:
static constexpr ptrdiff_t max() noexcept;
constexpr explicit latch(ptrdiff_t expected);
~latch();
latch(const latch&) = delete;
latch& operator=(const latch&) = delete;
void count_down(ptrdiff_t update = 1);
bool try_wait() const noexcept;
void wait() const;
void arrive_and_wait(ptrdiff_t update = 1);
private:
ptrdiff_t counter; // 僅用于闡釋
};
}
02 latch的一個實作
github上面找到一個latch的實作(linux,macos,windows上的實作)。
https://github.com/luncliff/latch
這裡貼出windows上面的實作。如果開發又需求,不如先用這個開源的先用起來。
這個latch在windows上面的實作重點使用了
WakeByAddressAll
,
WaitOnAddress
,
InterlockedAdd
,
InterlockedAdd64
四個api。
latch.h
#pragma once
#include <cstddef>
namespace std {
/**
* @defgroup thread.coord
* Concepts related to thread coordination, and defines the coordination types `latch` and `barrier`.
* These types facilitate concurrent computation performed by a number of threads.
*/
/**
* @brief Allows any number of threads to block until an expected number of threads arrive at the latch
* @ingroup thread.coord
*
* A `latch` is a thread coordination mechanism that allows any number of threads
* to block until an expected number of threads arrive at the `latch`
* (via the `count_down` function).
*
* The expected count is set when the `latch` is created.
* An individual `latch` is a single-use object;
* once the expected count has been reached, the `latch` cannot be reused.
*
* @see N4835, 1571~1572p
*/
class latch {
public:
/**
* @brief upper limit on the value of `expected` for constructor of `latch`
* @return ptrdiff_t The maximum value of counter that the implementation supports
* @see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1865r1.html
* @see /proc/sys/kernel/threads-max
*/
static constexpr ptrdiff_t max() noexcept {
return 32;
}
public:
/**
* @brief Initialize `counter` with `expected`
* @param expected
* @pre `expected >= 0` is true
*/
constexpr explicit latch(ptrdiff_t expected) noexcept : counter{expected} {
}
/**
* Concurrent invocations of the member functions of `latch` other than its destructor,
* do not introduce data races
*/
~latch() = default;
latch(const latch&) = delete;
latch& operator=(const latch&) = delete;
/**
* **Synchronization**:
* Strongly happens before the returns from all calls that are unblocked.
*
* **Error Conditions**:
* Any of the error conditions allowed for mutex types (32.5.3.2)
*
* @param update
* @pre `update >= 0` is true, and `update <= counter` is true
* @post Atomically decreses `counter` by `update`.
* If `counter` is equal to zero, unblocks all threads blocked on `*this`
* @throw system_error
*/
void count_down(ptrdiff_t update = 1) noexcept(false);
/**
* @return true `counter` equals zero
* @return false Very low probability of failure from system call
*/
bool try_wait() const noexcept;
/**
* If `counter` equals zero, returns immediately.
* Otherwise, blocks on `*this` until a call to `count_down` that decrements `counter` to zero
*
* @throw system_error
*/
void wait() const noexcept(false);
/**
* @param update input for `count_down`
* @see count_down
* @see wait
*/
void arrive_and_wait(ptrdiff_t update = 1) noexcept(false);
private:
/**
* @brief A latch maintains an internal counter
*
* A latch maintains an internal counter that is initialized when the latch is created
* Threads can block on the latch object, waiting for counter to be decremented to zero.
*/
ptrdiff_t counter;
};
} // namespace std
latch_windows.cpp
#include "latch.h"
#include <atomic>
#include <system_error>
#include <type_traits>
// clang-format off
#include <Windows.h>
#include <synchapi.h>
// clang-format on
namespace std {
static_assert(is_copy_assignable_v<latch> == false);
static_assert(is_copy_constructible_v<latch> == false);
void latch::arrive_and_wait(ptrdiff_t update) noexcept(false) {
this->count_down(update);
this->wait();
}
void latch::count_down(ptrdiff_t update) noexcept(false) {
static_assert(is_same_v<ptrdiff_t, LONG64> || is_same_v<ptrdiff_t, LONG>);
if (counter < update)
throw system_error{EINVAL, system_category(),
"update is greater than counter"};
// if not lock-free, rely on InterLocked operation
if constexpr (atomic<ptrdiff_t>::is_always_lock_free) {
counter -= update;
} else if constexpr (is_same_v<ptrdiff_t, LONG>) {
InterlockedAdd(reinterpret_cast<LONG*>(&counter),
static_cast<LONG>(-update));
} else if constexpr (is_same_v<ptrdiff_t, LONG64>) {
InterlockedAdd64(reinterpret_cast<LONG64*>(&counter),
static_cast<LONG64>(-update));
}
// counter reached zero
if (counter == 0)
WakeByAddressAll(&counter);
}
bool latch::try_wait() const noexcept {
// if counter equals zero, returns immediately
if (counter == 0)
return true;
// blocks on `*this` until a call to count_down that decrements counter to zero
ptrdiff_t captured = counter;
if (WaitOnAddress(const_cast<ptrdiff_t*>(&counter), &captured,
sizeof(ptrdiff_t), INFINITE))
return counter == 0;
// caller can check `GetLastError` for this case
return false;
}
void latch::wait() const noexcept(false) {
while (try_wait() == false) {
// case: error from WaitOnAddress
if (const auto ec = GetLastError())
throw system_error{static_cast<int>(ec), system_category(),
"WaitOnAddress"};
// case: counter != 0. retry
// ...
}
}
} // namespace std
- std::latch ↩︎
- 标準庫頭檔案
↩︎<latch>