天天看点

多线程并发 wait wait_for wait_until

在使用wait_for wait等带Predicate参数时,需注意以下事项:

1.设置了 Predicate,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程;

2.在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。

wait (带条件参数)的典型用例

#include <iostream>                // std::cout
#include <pthread.h>
#include <sched.h>
#include <system_error>
#include <thread>                // std::thread, std::this_thread::yield
#include <mutex>                 // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable
#include <chrono>               //std::this_thread::sleep_for(120ms);
std::mutex mtx;
std::condition_variable cv;
using namespace std::chrono_literals;
uint32_t g_val = 0;
void thread1()
{
    std::unique_lock<std::mutex> lck(mtx);
    std::cerr << "wait begin...\n";
    cv.wait(lck,[]{std::cerr << g_val<< "   Notifying...\n";return g_val == 1;});
    std::cerr << "wait end...\n";
}
void signal()
{
    std::this_thread::sleep_for(120ms);
    std::cerr << "Notifying...\n";
    std::lock_guard<std::mutex> lk1(mtx);
    g_val = 1;
    cv.notify_all();
}
int main()
{
    std::thread th1(thread1);
    std::thread th2(signal);
    std::thread th3(signal);
    th1.join();
    th2.join();
    th3.join();
    return 0;
}
           
#include <iostream>                // std::cout
#include <thread>                // std::thread, std::this_thread::yield
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable
 
std::mutex mtx;
std::condition_variable cv;
 
int cargo = 0;
bool shipment_available()
{
    return cargo != 0;
}
 
// 消费者线程.
void consume(int n)
{
    for (int i = 0; i < n; ++i) 
    {
        std::unique_lock <std::mutex> lck(mtx);
        cv.wait(lck, shipment_available);
        std::cout <<"consume: i:"<< i << ";cargo:" << cargo << '\n';
        cargo = 0;
    }
}
int main()
{
    std::thread consumer_thread(consume, 10); // 消费者线程.

    // 主线程为生产者线程, 生产 10 个物品.
    for (int i = 0; i < 10; ++i) 
    {
        while (shipment_available())//cargo !=0
        {
            std::this_thread::yield();
        } 
        std::unique_lock <std::mutex> lck(mtx);
        cargo = i + 1;
        std::cout << "notify _one   i=" << i << "\n";
        cv.notify_one();
    }
    consumer_thread.join();
    return 0;
}
           

wait_for wait_until

#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <time.h>
using namespace std::chrono_literals;
 
std::condition_variable cv;
std::mutex cv_m;
std::atomic<int> i{0};
 
void waits_for0(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
     //cv.wait_for(lck,std::chrono::seconds(1))
    if(cv.wait_for( lk,  std::chrono::milliseconds(200) ) != std::cv_status::timeout)
        std::cerr << "continue " << idx << " finished waiting. i == " << i << '\n';
    else
        std::cerr << "return err " << idx << " timed out. i == " << i << '\n';
}

void waits_for(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
    if(cv.wait_for(lk,  idx*200ms, [](){return i == 1;})) 
        std::cerr << "continue " << idx << " finished waiting. i == " << i << '\n';
    else
        std::cerr << "return err " << idx << " timed out. i == " << i << '\n';
}

void waits_until(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
    if(cv.wait_until(lk, now + idx*200ms, [](){return i == 1;}))

    //if(cv.wait_for(lk,  idx*200ms, [](){return i == 1;})) 
        std::cerr << "continue " << idx << " finished waiting. i == " << i << '\n';
    else
        std::cerr << "return err " << idx << " timed out. i == " << i << '\n';
}

 
void signals(int x)
{
    std::this_thread::sleep_for(x*150ms);
    i = 1;
    std::cerr << "Notifying... " << x << "\n";
    cv.notify_all();
  
}
/*
* 21.waits_for条件触发
* 22.waits_for超时触发
* 31.waits_for条件触发
* 32.waits_for超时触发
* 33.waits_until条件触发
* 34.waits_until超时触发
*/
// 与 std::condition_variable::wait() 类似,不过 wait_for 可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for 返回,剩下的处理步骤和 wait() 类似。

// 另外,wait_for 的重载版本(predicte(2))的最后一个参数 pred 表示 wait_for 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,因此相当于如下代码:
int main()
{
    while(1)
    {
        int value = 0;
        std::cout << "Please, enter an integer (I'll be printing dots): \n";
        std::cin >> value;
        std::cout << "You entered: " << value << '\n';
        if(value  == 21)
        {
            std::thread t1(waits_for0, 1), t2(signals,1);
            t1.join(); 
            t2.join();
        }else if(value == 22){
            std::thread t1(waits_for0, 1), t2(signals,5);
            t1.join(); 
            t2.join();
        }else if(value  == 31)
        {
            std::thread t1(waits_for, 1), t2(signals,1);
            t1.join(); 
            t2.join();
        }else if(value == 32){
            std::thread t1(waits_for, 1), t2(signals,5);
            t1.join(); 
            t2.join();
        }else if(value == 33){
            std::thread t1(waits_until, 1), t2(signals,1);
            t1.join(); 
            t2.join();
        }else if(value == 34){
            std::thread t1(waits_until, 1), t2(signals,5);
            t1.join(); 
            t2.join();
        }
        std::this_thread::sleep_for(1000ms);   
        i =0;
    }


}
           

继续阅读