天天看点

linux wait_queue/work_queue一. 等待队列二. Work_queue工作队列

Linux-wait_queue/work_queue

  


  
首先,我们得明白,linux中的所有的进程都由task_struct这个结构管理。在生成进程的时候将会分配一个task_struct结构,之后将通过这个结构对进程进行管理。task_struct结构存在于平坦地址空间内,任何时候Linux内核都可以参照所有进程的所有管理情报。内核堆栈也同样位于平坦地址空间内。(平坦 的意思是"独立的连续区间")

  
下面是tesk_struct的主要成员:

  
struct task_struct {

  
struct files_struct* files; //文件描述符

  
struct signal_struct* sig; //信号控制signal handler

  
struct mm_struct* mm; //内存管理模块

  
long stat //进程状态

  
struct list_head runlist; //用于联结RUN队列

  
long priority; //基本优先权

  
long counter; //变动优先权

  
char comm[]; //命令名

  
struct thread_struct tss; //上下文保存领域

  
...

  
};

  
我们现在只需了解它里面的state就可以,state有下面几种状态:

  
状态 							说明

  
TASK_RUNNING  				执行可能状态

  
TASK_INTERRUPTIBLE 		等待状态。可接受信号

  
TASK_UNINTERRUPTIBLE 	等待状态。不能接受信号

  
TASK_ZOMBIE 				僵尸状态。exit后的状态

  
TASK_STOPPED 				延缓状态

  
TASK_STACE					追踪状态 

  
 

  


          

一. 等待队列

等待队列的使用分为如下两部分: (1)为使当前进程在一个等待队列中睡眠,需要调用wait_event函数(或者某个等价函数),进程加入睡眠,将控制权释放给调度器。 (2)在内核的另一处,必须调用wake_up函数(或某个等价函数)唤醒等待队列中的睡眠进程。

一.1. 重要结构体:

1、等待队列  struct __wait_queue {          unsigned int flags;  #define WQ_FLAG_EXCLUSIVE       0x01          void *private;          wait_queue_func_t func;           struct list_head task_list; }; typedef struct __wait_queue wait_queue_t; 2、等待队列头 struct __wait_queue_head {          spinlock_t lock;          struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t;

一.2. 定义、初始化等待队列头和等待队列

1、struct wait_queue_head_t wq; /* global variable */ DECLARE_WAIT_QUEUE_HEAD (wq); 2、/*静态声明并初始化一个wait_queue_t  等待队列*/ #define DEFINE_WAIT_FUNC(name, function)                                \          wait_queue_t name = {                                           \                  .private        = current,                              \                  .func           = function,                             \                  .task_list      = LIST_HEAD_INIT((name).task_list),     \          } #define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function) /*动态分配一个wait_queue_t 实例*/  97 static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)  98 {  99         q->flags = 0; 100         q->private = p; 101         q->func = default_wake_function; 102 }

一.3. 进程睡眠

包括两个层次,第一个层次是将一个等待队列(wait_queue_t)(通常是某个进程)加入到等待队列(wait_queue_head_t)链表中。第二个层次是将当前进程进入睡眠。 1、进程添加到某个等待队列。 Kernel/wait.c extern void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait); extern void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait); extern void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait); 其中add_wait_queue将等待队列加入到等待队列链表尾部(默认)中,并设置flag标志~WQ_FLAG_EXCLUSIVE。  22 void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)  23 {  24         unsigned long flags;  25   26         wait->flags &= ~WQ_FLAG_EXCLUSIVE;  27         spin_lock_irqsave(&q->lock, flags);  28         __add_wait_queue(q, wait); /*list_add(&new->task_list, &head->task_list);*/  29         spin_unlock_irqrestore(&q->lock, flags);  30 } 将等待队列加入到等待队列链表另外一种方法是prepare_to_wait。除了add_wait_queue所需要的参数外,还需要进程的状态。 void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state); void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state); void finish_wait(wait_queue_head_t *q, wait_queue_t *wait); void abort_exclusive_wait(wait_queue_head_t *q, wait_queue_t *wait,  unsigned int mode, void *key);  68 prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)  69 {  70         unsigned long flags;  71   72         wait->flags &= ~WQ_FLAG_EXCLUSIVE;  73         spin_lock_irqsave(&q->lock, flags);  74         if (list_empty(&wait->task_list))  75                 __add_wait_queue(q, wait);  76         set_current_state(state);  77         spin_unlock_irqrestore(&q->lock, flags);  78 }

2、让当前进程进入睡眠

wait_event(wq, condition) ; wait_event_timeout(wq, condition, timeout); wait_event_interruptible(wq, condition); wait_event_interruptible_timeout(wq, condition, timeout); 192 #define __wait_event(wq, condition)                                     \ 193 do {                                                                \ 194         DEFINE_WAIT(__wait);  /*定义wait_queue_t 等待队列*/           \ 196         for (;;) {                                                      \ 197                 prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \ 198                 if (condition)                                          \ 199                         break;                                          \ 200                 schedule();                                             \ 201         }                                                             \ 202         finish_wait(&wq, &__wait);                                      \ 203 } while (0) 在用DEFINE_WAIT建立等待队列成员之后,这个宏产生一个无限循环。使用prepare_to_wait使进程在等待队列上睡眠。每次进程被唤醒,内核都会检查指定的条件是否满足,如果条件满足则退出无限循环。否则将控制权交给调度器,进程再次睡眠。 The @condition is checked each time the waitqueue @wq is woken up. 217 #define wait_event(wq, condition)                                       \ 218 do {                                                                    \ 219         if (condition)                                                  \ 220                 break;                                                  \ 221         __wait_event(wq, condition);                                    \ 222 } while (0) l wait_event_timeout()等待满足指定的条件,但如果等待时间超过了指定的超时限制则停止; l wait_event_interruptible()使用的进程状态是TASK_INTERRUPTIBLE,也让睡眠进程可以通过接受信号而被唤醒; l wait_event_interruptible_timeout()则是睡眠进程既能被中断唤醒,也能限制超时。 此外内核还定义了若干废弃的函数(sleep_on、sleep_on_timeout,------),这些不应该在新的代码中继续使用。

一.4. 唤醒进程

169 #define wake_up(x)       __wake_up(x, TASK_NORMAL, 1, NULL) 170 #define wake_up_nr(x, nr)   __wake_up(x, TASK_NORMAL, nr, NULL) 171 #define wake_up_all(x)    __wake_up(x, TASK_NORMAL, 0, NULL) 172 #define wake_up_locked(x)   __wake_up_locked((x), TASK_NORMAL, 1) 173 #define wake_up_all_locked(x)    __wake_up_locked((x), TASK_NORMAL, 0) #define wake_up_interruptible(x)  __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x)  __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1) Kernel/core.c 3423 /** 3424  * __wake_up - wake up threads blocked on a waitqueue. 3425  * @q: the waitqueue 3426  * @mode: which threads 3427  * @nr_exclusive: how many wake-one or wake-many threads to wake up 将要唤醒的设置了WQ_FLAG_EXCLUSIVE标志的进程数目 3428  * @key: is directly passed to the wakeup function 3429  * 3430  * It may be assumed that this function implies a write memory barrier before 3431  * changing the task state if and only if any tasks are woken up. 3432  */ 3433 void __wake_up(wait_queue_head_t *q, unsigned int mode, 3434                         int nr_exclusive, void *key) 3435 { 3436         unsigned long flags; 3437  3438         spin_lock_irqsave(&q->lock, flags); 3439         __wake_up_common(q, mode, nr_exclusive, 0, key); 3440         spin_unlock_irqrestore(&q->lock, flags); 3441 } 3401  * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just 3402  * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve 3403  * number) then we wake all the non-exclusive tasks and one exclusive task. 3404  * 3405  * There are circumstances in which we can try to wake a task which has already 3406  * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns 3407  * zero in this (rare) case, and we handle it by continuing to scan the queue. 3408  */ 3409 static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, 3410                         int nr_exclusive, int wake_flags, void *key) 3411 { 3412         wait_queue_t *curr, *next; 3413  3414         list_for_each_entry_safe(curr, next, &q->task_list, task_list) { 3415                 unsigned flags = curr->flags; 3416  3417                 if (curr->func(curr, mode, wake_flags, key) && 3418                    (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) 3419                     break; 3420         } 3421 } 这里会反复扫描指定工作队列链表,直至没有更多进程需要唤醒,或已经唤醒的独占进程数目达到了nr_exclusive,该限制主要用于避免所谓的惊群效应。最常用的wake_up函数将nr_exclusive设置为1,确保值唤醒一个独占访问进程。

一.5. Example

考虑两个读写进程,共享一缓冲区buffer,一个进程向里写入数据,另外一个进程读出数据。当写满时,必须等待读进程将数据部分独处才行。 首先定义全局等待队列: struct wait_queue_head_t wq; /* global variable */ DECLARE_WAIT_QUEUE_HEAD (wq); 1、写进程中: while ( is_full ){ interruptible_sleep_on( &wq ); } write_to_buffer();                    /*旧内核*/ wait_event_interruptible(wq, !is_full); /*新内核*/ write_to_buffer();   2、读进程中: if ( !is_empty ) { read_from_buffer(); wake_up_interruptible( &wq );   /*旧内核*/ } read_from_buffer(); wake_up_interruptible(wq);          /*新内核*/      

二. Work_queue工作队列

前文介绍了中断底半部的tasklet机制,那么对于工作队列机制,与其区别在哪里呢?   工作队列类似 taskets,允许内核代码请求在将来某个时间调用一个函数,不同在于: (1)tasklet 在软件中断上下文中运行,所以 tasklet 代码必须是原子的。而工作队列函数在一个特殊内核进程上下文运行,有更多的灵活性,且能够休眠。 (2)tasklet 只能在最初被提交的处理器上运行,这只是工作队列默认工作方式。 (3)内核代码可以请求工作队列函数被延后一个给定的时间间隔。 (4)tasklet 执行的很快, 短时期, 并且在原子态, 而工作队列函数可能是长周期且不需要是原子的,两个机制有它适合的情形。

二.1. 自己创建工作队列(非共享)

1、创建工作队列 工作队列有 struct workqueue_struct 类型,在 <linux/workqueue.h> 中定义。一个工作队列必须明确的在使用前创建,宏为: struct workqueue_struct *create_workqueue(const char *name); struct workqueue_struct *create_singlethread_workqueue(const char *name);  每个工作队列有一个或多个专用的进程("内核线程"), 这些进程运行提交给这个队列的函数。 若使用 create_workqueue, 就得到一个工作队列,它在系统的每个处理器上有一个专用的线程。在很多情况下,过多线程对系统性能有影响,如果单个线程就足够则使用 create_singlethread_workqueue 来创建工作队列。 2、创建工作 /*需要填充work_struct或delayed_work结构,可以在编译时完成, 宏如下: */ struct work_struct {      atomic_long_t data; #define WORK_STRUCT_PENDING 0        /* T if work item pending execution */ #define WORK_STRUCT_FLAG_MASK (3UL) #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)     struct list_head entry;      work_func_t func; }; struct delayed_work {     struct work_struct work;     struct timer_list timer; }; DECLARE_WORK(n, f)     /*n 是声明的work_struct结构名称, f是要从工作队列被调用的函数*/ DECLARE_DELAYED_WORK(n, f) /*n是声明的delayed_work结构名称, f是要从工作队列被调用的函数*/ /*若在运行时需要建立 work_struct 或 delayed_work结构, 使用下面 2 个宏定义:*/ INIT_WORK(struct work_struct *work, void (*function)(void *));  PREPARE_WORK(struct work_struct *work, void (*function)(void *));  INIT_DELAYED_WORK(struct delayed_work *work, void (*function)(void *));  PREPARE_DELAYED_WORK(struct delayed_work *work, void (*function)(void *));  /* INIT_* 做更加全面的初始化结构的工作,在第一次建立结构时使用. PREPARE_* 做几乎同样的工作, 但是它不初始化用来连接 work_struct或delayed_work 结构到工作队列的指针。如果这个结构已经被提交给一个工作队列, 且只需要修改该结构,则使用 PREPARE_* 而不是 INIT_* */ 3、提交工作给工作队列 /*有 2 个函数来提交工作给一个工作队列:*/ int queue_work(struct workqueue_struct *queue, struct work_struct *work); int queue_delayed_work(struct workqueue_struct *queue, struct delayed_work *work, unsigned long delay); /*添加work到指定的workqueue。如果使用 queue_delay_work,  这些函数若返回 1 则工作被成功加入到队列; 若为0,则意味着这个 work 已经在队列中等待,不能再次加入*/ 工作队列wq的执行时间待定(在调度器选择该守护进程时执行),delayed_work引入timer_list定时器,确保排队的工作项将在提交后指定的一段时间内执行,即确保在延期工作执行之前,至少会经过delay指定的一段时间。 在将来的某个时间, 这个工作函数将被传入给定的 data 值来调用。这个函数将在工作线程的上下文运行, 因此它可以睡眠 (你应当知道这个睡眠可能影响提交给同一个工作队列的其他任务) 工作函数不能访问用户空间,因为它在一个内核线程中运行, 完全没有对应的用户空间来访问。 4、取消工作项 取消一个挂起的工作队列入口项可以调用: int cancel_delayed_work(struct delayed_work *work);  void cancel_work_sync(struct work_struct *work) 如果这个入口在它开始执行前被取消,则返回非零。内核保证给定入口的执行不会在调用 cancel_delay_work 后被初始化. 如果 cancel_delay_work 返回 0, 但是, 这个入口可能已经运行在一个不同的处理器, 并且可能仍然在调用 cancel_delayed_work 后在运行. 要绝对确保工作函数没有在 cancel_delayed_work 返回 0 后在任何地方运行, 你必须跟随这个调用来调用: void flush_workqueue(struct workqueue_struct *queue); 在 flush_workqueue 返回后, 没有在这个调用前提交的函数在系统中任何地方运行。而cancel_work_sync会取消相应的work,但是如果这个work已经在运行那么cancel_work_sync会阻塞,直到work完成并取消相应的work。 当用完一个工作队列,可以去掉它,使用: void destroy_workqueue(struct workqueue_struct *queue);  

二.2. 共享队列

内核创建了一个标准的工作队列,称为events。内核各个部分,凡是没有必要创建独立的工作队列者,均可使用该队列。 在许多情况下, 设备驱动不需要它自己的工作队列。如果你只偶尔提交任务给队列, 简单地使用内核提供的共享的默认的队列可能更有效。若使用共享队列,就必须明白将和其他人共享它,这意味着不应当长时间独占队列(不能长时间睡眠), 并且可能要更长时间才能获得处理器。 使用步骤: (1) 建立 work_struct 或 delayed_work static struct work_struct jiq_work; static struct delayed_work jiq_work_delay;     /* this line is in jiq_init() */ INIT_WORK(&jiq_work, jiq_print_wq); INIT_DELAYED_WORK(&jiq_work_delay, jiq_print_wq);   (2)提交工作 int schedule_work(&jiq_work);/*对于work_struct结构*/ int schedule_delayed_work(&jiq_work_delay, delay);/*对于delayed_work结构*/ /*返回值的定义和 queue_work 一样*/  若需取消一个已提交给工作队列入口项, 可以使用 cancel_delayed_work和cancel_work_sync, 但刷新共享队列需要一个特殊的函数: void flush_scheduled_work(void);   因为不知道谁可能使用这个队列,因此不可能知道 flush_schduled_work 返回需要多长时间。

继续阅读