天天看点

弄懂线程取消的一个例子

今天结束了,仍然在看 《Programming with posix threads》这本书,关于线程的取消。本来想总结一下线程推迟取消 异步取消 以及清除这些概念,发现有个人总结的特别的好,链接拉过来给大家看一下。

https://www.cnblogs.com/lijunamneg/archive/2013/01/25/2877211.html

在这里想写一下弄懂这章其中一个例子的过程,温习一下思考过程。

这主要是个承包转包线程的例子,算是工作组模型吧。

程序执行时创建承包线程,在承包线程里面又创建了很多的子线程,子线程叫转包线程,不过是一种叫法,无需多想。看过链接里文章会知道,在取消线程的时候可以指定取消时要执行的清除函数。具体细节如附程序所示。

特别的,是在清除函数中执行了对部分承包线程的取消工作。

我产生了些困惑,在这部分取消工作当中。为什么下标是从team->join_i 开始的呢?

一开始我这么想的,thread_routine 创建转包线程完成一次对每个线程执行pthread_join. 在接到取消信号的时正好执行对某个的pthread_join的取消点,剩余没有join的线程就被清理掉了,但是,就是已经join的线程就不需要清理吗。我之所以会这么问,当时并不清楚join函数执行是阻塞的 。查一下pthread_join的文档就应该知道,这个函数是阻塞调用的,一旦返回就说明程序已经结束了,也就没有了清理的必要。

每次我试图告诉你发现某个接口的秘密,就会突然发现,文档已经写得很完美了,尤其对于这种比较成熟的库而言。比如pthread_join的文档告诉你函数返回保证了什么,join同一个线程两次对不对,不能join的线程是怎么样的。什么样的线程是joinable的,调用joinable的线程被取消被joinable的线程会怎么样。都告诉你了, 所以还是遇到问题首选看文档啊:

http://man7.org/linux/man-pages/man3/pthread_join.3.html

同样的, pthread_cancel 文档也会告诉你很多

http://man7.org/linux/man-pages/man3/pthread_cancel.3.html

其实这个例子中创建的线程是个无限循环不太好,直接运行代码结果一定从第0个线程开始取消的,因为每个线程都在被取消之前永远不终止,所以承包线程一直阻塞在第一个pthread_join函数调用上。

我们来改一下程序吧,增加一个函数,有区别的创建转包线程。

void *worker_routine1 (void *arg)
{
    int counter;

    for (counter = ; ; counter++;counter < )
        if ((counter % ) == )
            pthread_testcancel ();
}
           

在创建前3个线程的时候用worker_routine1() , 后面的线程用work_routine(). 这样在取消的时候前20个线程已经终止了。所以clean up 的时候只需要清理还没有被join的函数就可以了。结果是从下标3开始取消的。

运行结果

弄懂线程取消的一个例子

附原始代码(修改之前的)

/*
 * cancel_subcontract.c
 *
 * Demonstrate how a thread can handle cancellation and in turn
 * cancel a set of worker ("subcontractor") threads.
 *
 * Special notes: On a Solaris 2.5 uniprocessor, this test will
 * hang unless an LWP is created for each worker thread by
 * calling thr_setconcurrency(), because threads are not
 * timesliced.
 */
#include <pthread.h>
#include "errors.h"

#define THREADS 5

/*
 * Structure that defines the threads in a "team".
 */
typedef struct team_tag {
   int          join_i;                 /* join index */
   pthread_t    workers[THREADS];       /* thread identifiers */
} team_t;

/*
 * Start routine for worker threads. They loop waiting for a
 * cancellation request.
 */
void *worker_routine (void *arg)
{
    int counter;

    for (counter = ; ; counter++)
        if ((counter % ) == )
            pthread_testcancel ();
}

/*
 * Cancellation cleanup handler for the contractor thread. It
 * will cancel and detach each worker in the team.
 */
void cleanup (void *arg)
{
    team_t *team = (team_t *)arg;
    int count, status;

    for (count = team->join_i; count < THREADS; count++) {
        status = pthread_cancel (team->workers[count]);
        if (status != )
            err_abort (status, "Cancel worker");

        status = pthread_detach (team->workers[count]);
        if (status != )
            err_abort (status, "Detach worker");
        printf ("Cleanup: cancelled %d\n", count);
    }
}

/*
 * Thread start routine for the contractor. It creates a team of
 * worker threads, and then joins with them. When cancelled, the
 * cleanup handler will cancel and detach the remaining threads.
 */
void *thread_routine (void *arg)
{
    team_t team;                        /* team info */
    int count;
    void *result;                       /* Return status */
    int status;

    for (count = ; count < THREADS; count++) {
        status = pthread_create (
            &team.workers[count], NULL, worker_routine, NULL);
        if (status != )
            err_abort (status, "Create worker");
    }
    pthread_cleanup_push (cleanup, (void*)&team);

    for (team.join_i = ; team.join_i < THREADS; team.join_i++) {
        status = pthread_join (team.workers[team.join_i], &result);
        if (status != )
            err_abort (status, "Join worker");
    }

    pthread_cleanup_pop ();
    return NULL;
}

int main (int argc, char *argv[])
{
    pthread_t thread_id;
    int status;

#ifdef sun
    /*
     * On Solaris 2.5, threads are not timesliced. To ensure
     * that our threads can run concurrently, we need to
     * increase the concurrency level to at least 2 plus THREADS
     * (the number of workers).
     */
    DPRINTF (("Setting concurrency level to %d\n", THREADS+));
    thr_setconcurrency (THREADS+);
#endif
    status = pthread_create (&thread_id, NULL, thread_routine, NULL);
    if (status != )
        err_abort (status, "Create team");
    sleep ();
    printf ("Cancelling...\n");
    status = pthread_cancel (thread_id);
    if (status != )
        err_abort (status, "Cancel team");
    status = pthread_join (thread_id, NULL);
    if (status != )
        err_abort (status, "Join team");
}
           

继续阅读