天天看點

C++20 latch

vs2019 C++20 latch

    • 01 latch
    • 02 latch的一個實作

01 latch

<latch>

闩:單次使用的線程屏障。

latch 是 ptrdiff_t 類型的向下計數器,它能用于同步線程。在建立時初始化計數器的值。

線程可能在 latch 上阻塞直至計數器減少到零。沒有可能增加或重置計數器,這使得 latch 為單次使用的屏障。

同時調用 latch 的成員函數,除了析構函數,不引入資料競争。

不同于 std::barrier ,參與線程能減少 std::latch 多于一次。1

方法 作用
latch 不可指派
count_down 以不阻塞的方式減少計數器
try_wait 測試内部計數器是否等于零
wait 阻塞直至計數器抵達零
arrive_and_wait 減少計數器并阻塞直至它抵達零
max [靜态]實作所支援的計數器最大值

latch的參考頭檔案 std::latch2

namespace std {
  class latch {
  public:
    static constexpr ptrdiff_t max() noexcept;
 
    constexpr explicit latch(ptrdiff_t expected);
    ~latch();
 
    latch(const latch&) = delete;
    latch& operator=(const latch&) = delete;
 
    void count_down(ptrdiff_t update = 1);
    bool try_wait() const noexcept;
    void wait() const;
    void arrive_and_wait(ptrdiff_t update = 1);
 
  private:
    ptrdiff_t counter;  // 僅用于闡釋
  };
}
           

02 latch的一個實作

github上面找到一個latch的實作(linux,macos,windows上的實作)。

https://github.com/luncliff/latch

這裡貼出windows上面的實作。如果開發又需求,不如先用這個開源的先用起來。

這個latch在windows上面的實作重點使用了

WakeByAddressAll

,

WaitOnAddress

,

InterlockedAdd

,

InterlockedAdd64

四個api。

latch.h

#pragma once
#include <cstddef>

namespace std {
/**
 * @defgroup thread.coord
 * Concepts related to thread coordination, and defines the coordination types `latch` and `barrier`.
 * These types facilitate concurrent computation performed by a number of threads.
 */

/**
 * @brief Allows any number of threads to block until an expected number of threads arrive at the latch
 * @ingroup thread.coord
 *
 * A `latch` is a thread coordination mechanism that allows any number of threads
 * to block until an expected number of threads arrive at the `latch`
 * (via the `count_down` function).
 *
 * The expected count is set when the `latch` is created.
 * An individual `latch` is a single-use object;
 * once the expected count has been reached, the `latch` cannot be reused.
 *
 * @see N4835, 1571~1572p
 */
class latch {
  public:
    /**
     * @brief upper limit on the value of `expected` for constructor of `latch`
     * @return ptrdiff_t    The maximum value of counter that the implementation supports
     * @see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p1865r1.html
     * @see /proc/sys/kernel/threads-max
     */
    static constexpr ptrdiff_t max() noexcept {
        return 32;
    }

  public:
    /**
     * @brief   Initialize `counter` with `expected`
     * @param   expected
     * @pre     `expected >= 0` is true
     */
    constexpr explicit latch(ptrdiff_t expected) noexcept : counter{expected} {
    }
    /**
     * Concurrent invocations of the member functions of `latch` other than its destructor,
     * do not introduce data races
     */
    ~latch() = default;

    latch(const latch&) = delete;
    latch& operator=(const latch&) = delete;

    /**
     * **Synchronization**:
     * Strongly happens before the returns from all calls that are unblocked.
     *
     * **Error Conditions**:
     * Any of the error conditions allowed for mutex types (32.5.3.2)
     *
     * @param   update
     * @pre     `update >= 0` is true, and `update <= counter` is true
     * @post    Atomically decreses `counter` by `update`.
     *          If `counter` is equal to zero, unblocks all threads blocked on `*this`
     * @throw   system_error
     */
    void count_down(ptrdiff_t update = 1) noexcept(false);
    /**
     * @return true     `counter` equals zero
     * @return false    Very low probability of failure from system call
     */
    bool try_wait() const noexcept;
    /**
     * If `counter` equals zero, returns immediately.
     * Otherwise, blocks on `*this` until a call to `count_down` that decrements `counter` to zero
     *
     * @throw   system_error
     */
    void wait() const noexcept(false);
    /**
     * @param   update  input for `count_down`
     * @see count_down
     * @see wait
     */
    void arrive_and_wait(ptrdiff_t update = 1) noexcept(false);

  private:
    /**
     * @brief A latch maintains an internal counter
     *
     * A latch maintains an internal counter that is initialized when the latch is created
     * Threads can block on the latch object, waiting for counter to be decremented to zero.
     */
    ptrdiff_t counter;
};
} // namespace std
           

latch_windows.cpp

#include "latch.h"

#include <atomic>
#include <system_error>
#include <type_traits>
// clang-format off

#include <Windows.h>
#include <synchapi.h>
// clang-format on

namespace std {

static_assert(is_copy_assignable_v<latch> == false);
static_assert(is_copy_constructible_v<latch> == false);

void latch::arrive_and_wait(ptrdiff_t update) noexcept(false) {
    this->count_down(update);
    this->wait();
}

void latch::count_down(ptrdiff_t update) noexcept(false) {
    static_assert(is_same_v<ptrdiff_t, LONG64> || is_same_v<ptrdiff_t, LONG>);
    if (counter < update)
        throw system_error{EINVAL, system_category(),
                           "update is greater than counter"};
    // if not lock-free, rely on InterLocked operation
    if constexpr (atomic<ptrdiff_t>::is_always_lock_free) {
        counter -= update;
    } else if constexpr (is_same_v<ptrdiff_t, LONG>) {
        InterlockedAdd(reinterpret_cast<LONG*>(&counter),
                       static_cast<LONG>(-update));
    } else if constexpr (is_same_v<ptrdiff_t, LONG64>) {
        InterlockedAdd64(reinterpret_cast<LONG64*>(&counter),
                         static_cast<LONG64>(-update));
    }
    // counter reached zero
    if (counter == 0)
        WakeByAddressAll(&counter);
}

bool latch::try_wait() const noexcept {
    // if counter equals zero, returns immediately
    if (counter == 0)
        return true;
    // blocks on `*this` until a call to count_down that decrements counter to zero
    ptrdiff_t captured = counter;
    if (WaitOnAddress(const_cast<ptrdiff_t*>(&counter), &captured,
                      sizeof(ptrdiff_t), INFINITE))
        return counter == 0;
    // caller can check `GetLastError` for this case
    return false;
}

void latch::wait() const noexcept(false) {
    while (try_wait() == false) {
        // case: error from WaitOnAddress
        if (const auto ec = GetLastError())
            throw system_error{static_cast<int>(ec), system_category(),
                               "WaitOnAddress"};
        // case: counter != 0. retry
        // ...
    }
}

} // namespace std
           
  1. std::latch ↩︎
  2. 标準庫頭檔案

    <latch>

    ↩︎

繼續閱讀