天天看點

核心工作隊列【轉】

workqueue作為核心的重要基礎元件,在核心中被廣泛的使用,通過工作隊列,可以很友善的把我們要執行的某個任務(即函數+上下文參數)交代給核心,由核心替我們執行。本文主要是介紹工作隊列的使用,及其内部實作的邏輯。

    因為核心在系統初始化的時候,已為我們建立好了預設的工作隊列,是以我們在使用的時候,可以不需要再建立工作隊列,隻需要建立工作,并将工作添加到工作隊列上即可。當然,我們也可以建立自己的工作隊列,而不使用核心建立好的工作隊列。

    簡單的了解,工作隊列是由核心線程+連結清單+等待隊列來實作的,即由一個核心線程不斷的從連結清單中讀取工作,然後執行工作的工作函數!

    一、工作的表示: struct work_struct

1 typedef void (*work_func_t)(struct work_struct *work);
2 struct work_struct {
3     atomic_long_t data; /* 核心内部使用 */
4     struct list_head entry; /* 用于連結到工作隊列中*/
5     work_func_t func; /* 工作函數*/
6 #ifdef CONFIG_LOCKDEP
7     struct lockdep_map lockdep_map;
8 #endif
9      

        從這個結構體可以看出,一個工作,就是一個待執行的工作函數,那如果我們要給工作函數傳遞參數,怎麼解決呢?

    仔細觀察工作函數的格式:參數是work_struct,是以在實際使用的時候,經常會在建立我們自己工作的時候,将此結構體嵌套在内部。然後在work_func函數内部通過container_of來得到我們自定義的工作,這樣子就完成參數的傳遞了。

二、工作隊列的常用接口: 在linux/kernel/workqueue.h

  1. 初始化工作:
1 #define __DELAYED_WORK_INITIALIZER(n, f) {          \
 2     .work = __WORK_INITIALIZER((n).work, (f)),      \
 3     .timer = TIMER_INITIALIZER(NULL, 0, 0),         \
 4     }
 5 
 6 #define DECLARE_WORK(n, f)                  \
 7     struct work_struct n = __WORK_INITIALIZER(n, f)
 8     
 9 // 也可以使用INIT_WORK宏:
10 #define INIT_WORK(_work, _func)                     \
11     do {                                \
12         (_work)->data = (atomic_long_t) WORK_DATA_INIT();   \
13         INIT_LIST_HEAD(&(_work)->entry);            \
14         PREPARE_WORK((_work), (_func));             \
15     } while (0)      

    主要是完成func成員的指派。

    2. 工作入隊: 添加到核心工作隊列

1 int schedule_work(struct      

    3. 工作撤銷: 從核心工作隊列中删除

1 int cancel_work_sync(struct      

    4. 建立工作隊列:

1 #ifdef CONFIG_LOCKDEP
 2 #define __create_workqueue(name, singlethread, freezeable, rt)  \
 3 ({                              \
 4     static struct lock_class_key __key;         \
 5     const char *__lock_name;                \
 6                                 \
 7     if (__builtin_constant_p(name))             \
 8         __lock_name = (name);               \
 9     else                            \
10         __lock_name = #name;                \
11                                 \
12     __create_workqueue_key((name), (singlethread),      \
13                    (freezeable), (rt), &__key,  \
14                    __lock_name);            \
15 })
16 #else
17 #define __create_workqueue(name, singlethread, freezeable, rt)  \
18     __create_workqueue_key((name), (singlethread), (freezeable), (rt), \
19                    NULL, NULL)
20 #endif
21 
22 #define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
23 #define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
24 #define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
25 #define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
26     建立工作隊列,最終調用的都是__create_workqueue_key()函數來完成,此函數傳回的是struct workqueue_struct *,用于表示一個工作隊列。
27 
28     5. 銷毀工作隊列:
29 
30 void destroy_workqueue(struct workqueue_struct *wq);
31  
32 
33     在介紹了上面的接口後,看一個簡單的使用例子,這個例子使用的是核心已建立好的工作隊列,要使用自己建立的工作隊列,也是很簡單的,看了後面的實作源碼分析就清楚了。
34 
35 #include <linux/module.h>
36 #include <linux/delay.h>
37 #include <linux/workqueue.h>
38 
39 #define ENTER() printk(KERN_DEBUG "%s() Enter", __func__)
40 #define EXIT() printk(KERN_DEBUG "%s() Exit", __func__)
41 #define ERR(fmt, args...) printk(KERN_ERR "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)
42 #define DBG(fmt, args...) printk(KERN_DEBUG "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)
43 
44 struct test_work {
45     struct work_struct w;
46     unsigned long data;
47 };
48 
49 static struct test_work my_work;
50 
51 static void my_work_func(struct work_struct *work)
52 {
53     struct test_work *p_work;
54     ENTER();
55     p_work = container_of(work, struct test_work, w);
56     while (p_work->data) {
57         DBG("data: %lu", p_work->data--);
58         msleep_interruptible(1000);
59     }
60 
61     EXIT();
62 }
63 
64 static int __init wq_demo_init(void)
65 {
66     INIT_WORK(&my_work.w, my_work_func);
67     my_work.data = 30;
68 
69     msleep_interruptible(1000);
70     DBG("schedule work begin:");
71     if (schedule_work(&my_work.w) == 0) {
72         ERR("schedule work fail");
73         return -1;
74     }
75 
76     DBG("success");
77     return 0;
78 }
79 
80 static void __exit wq_demo_exit(void)
81 {
82     ENTER();
83     while (my_work.data) {
84         DBG("waiting exit");
85         msleep_interruptible(2000);
86     }
87     EXIT();
88 }
89 
90 MODULE_LICENSE("GPL");
91 module_init(wq_demo_init);
92      

    下面就分析workqueue元件的源碼實作,先從work_queue子產品的初始化開始,然後再分析工作的注冊過程,最後是工作如何被執行的。

三、workqueu的初始化:在kernel/workqueue.c

1 static DEFINE_SPINLOCK(workqueue_lock);  //全局的自旋鎖,用于保證對全局連結清單workqueues的原子操作
 2 static LIST_HEAD(workqueues); // 全局連結清單,用于連結所有的工作隊列
 3 static struct workqueue_struct *keventd_wq;
 4 
 5 void __init init_workqueues(void)
 6 {
 7     alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
 8 
 9     cpumask_copy(cpu_populated_map, cpu_online_mask);
10     singlethread_cpu = cpumask_first(cpu_possible_mask);
11     cpu_singlethread_map = cpumask_of(singlethread_cpu);
12     hotcpu_notifier(workqueue_cpu_callback, 0);
13     keventd_wq = create_workqueue("events");                                                                                              
14     BUG_ON(!keventd_wq);
15      

    調用create_workqueue()函數建立了一個工作隊列,名字為events。那就再看看工作隊列是如何建立的:

    上面在介紹建立工作隊列的接口時,有看到最終調用的都是__create_workqueue_key()函數的,在介紹這個函數之前,先看看struct workqueue_struct結構體的定義:在kernel/workqueue.c

1 struct workqueue_struct {                                                                                                                 
 2     struct cpu_workqueue_struct *cpu_wq;
 3     struct list_head list; // 用于連結到全局連結清單workqueues
 4     const char *name; //工作隊列的名字,即核心線程的名字
 5     int singlethread;
 6     int freezeable;     /* Freeze threads during suspend */
 7     int rt;
 8 #ifdef CONFIG_LOCKDEP
 9     struct lockdep_map lockdep_map;
10 #endif
11 };
12 
13 struct cpu_workqueue_struct {
14 
15     spinlock_t lock; // 用于保證worklist連結清單的原子操作
16 
17     struct list_head worklist;  //用于儲存添加的工作
18     wait_queue_head_t more_work; // 等待隊列,當worklist為空時,則會将核心線程挂起,存放與此連結清單中
19     struct work_struct *current_work;  //儲存核心線程目前正在執行的工作
20 
21     struct workqueue_struct *wq;
22     struct task_struct *thread; // 核心線程
23      

    下面看一下建立函數的内部實作,這裡要注意我們傳遞的參數:singlethread = 0, freezeable =0, rt = 0

1 struct workqueue_struct *__create_workqueue_key(const char *name,
 2                         int singlethread,
 3                         int freezeable,
 4                         int rt,
 5                         struct lock_class_key *key,
 6                         const char *lock_name)
 7 {
 8     struct workqueue_struct *wq;
 9     struct cpu_workqueue_struct *cwq;
10     int err = 0, cpu;
11 
12     wq = kzalloc(sizeof(*wq), GFP_KERNEL);
13     if (!wq)
14         return NULL;
15 
16     wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
17     if (!wq->cpu_wq) {
18         kfree(wq);
19         return NULL;
20     }
21 
22     wq->name = name;
23     lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
24     wq->singlethread = singlethread;
25     wq->freezeable = freezeable;
26     wq->rt = rt;
27     INIT_LIST_HEAD(&wq->list);
28 
29     if (singlethread) {
30         cwq = init_cpu_workqueue(wq, singlethread_cpu);
31         err = create_workqueue_thread(cwq, singlethread_cpu); 
32         start_workqueue_thread(cwq, -1);
33     } else {
34         cpu_maps_update_begin();
35         /*
36          * We must place this wq on list even if the code below fails.
37          * cpu_down(cpu) can remove cpu from cpu_populated_map before
38          * destroy_workqueue() takes the lock, in that case we leak
39          * cwq[cpu]->thread.
40          */
41         spin_lock(&workqueue_lock);
42         list_add(&wq->list, &workqueues);
43         spin_unlock(&workqueue_lock);
44         /*
45          * We must initialize cwqs for each possible cpu even if we
46          * are going to call destroy_workqueue() finally. Otherwise
47          * cpu_up() can hit the uninitialized cwq once we drop the
48          * lock.
49          */
50         for_each_possible_cpu(cpu) {
51             cwq = init_cpu_workqueue(wq, cpu);
52             if (err || !cpu_online(cpu))
53                 continue;
54             err = create_workqueue_thread(cwq, cpu); /*建立核心線程*/
55             start_workqueue_thread(cwq, cpu); /*啟動核心線程*/
56         }
57         cpu_maps_update_done();
58     }
59 
60     if (err) {
61         destroy_workqueue(wq);
62         wq = NULL;
63     }
64     return wq;
65      

        主要的代碼邏輯是建立一個struct workqueue_struct類型的對象,然後将此工作隊列加入到workqueues連結清單中,最後是調用create_workqueue_thread()建立一個核心線程,并啟動此線程。

    我們知道核心線程最主要的是它的線程函數,那麼工作隊列的線程函數時什麼呢?

1 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
 2 {
 3     struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
 4     struct workqueue_struct *wq = cwq->wq;
 5     const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
 6     struct task_struct *p;
 7     
 8     p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
 9     /*
10      * Nobody can add the work_struct to this cwq,
11      *  if (caller is __create_workqueue)
12      *      nobody should see this wq
13      *  else // caller is CPU_UP_PREPARE
14      *      cpu is not on cpu_online_map
15      * so we can abort safely.
16      */
17     if (IS_ERR(p))
18         return PTR_ERR(p);
19     if (cwq->wq->rt)
20         sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
21     cwq->thread = p;
22     
23     trace_workqueue_creation(cwq->thread, cpu);
24     
25     return 0;
26 }
27 
28 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
29 {
30     struct task_struct *p = cwq->thread;
31 
32     if (p != NULL) {
33         if (cpu >= 0)
34             kthread_bind(p, cpu);
35         wake_up_process(p);
36     }
37      

        主要就是調用kthread_create()建立核心線程,線程函數為worker_thread,參數為cwq。start_workqueue_thread()函數就是調用wake_up_process()把核心線程加入到run queue中。

    下面就分析下線程函數worker_thread到底是怎麼我們添加的工作的?

1 static int worker_thread(void *__cwq)
 2 {
 3     struct cpu_workqueue_struct *cwq = __cwq;
 4     DEFINE_WAIT(wait);
 5 
 6     if (cwq->wq->freezeable)
 7         set_freezable();
 8 
 9     for (;;) {
10         prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
11         if (!freezing(current) &&
12             !kthread_should_stop() &&
13             list_empty(&cwq->worklist))
14             schedule(); //若worklist連結清單為空,則進行排程
15         finish_wait(&cwq->more_work, &wait);
16 
17         try_to_freeze();
18 
19         if (kthread_should_stop())
20             break;
21 
22         run_workqueue(cwq);//執行隊列中的工作
23     }
24 
25     return 0;
26      

        前面已經有文章分析了核心線程和等待隊列waitqueue,了解這個的話,就很容易看懂這段代碼,就是判斷worklist隊列是否為空,如果為空,則将目前核心線程挂起,否則就調用run_workqueue()去執行已添加注冊的工作:

1 static void run_workqueue(struct cpu_workqueue_struct *cwq)
 2 {
 3     spin_lock_irq(&cwq->lock);
 4     while (!list_empty(&cwq->worklist)) {
 5         struct work_struct *work = list_entry(cwq->worklist.next,
 6                         struct work_struct, entry);
 7         work_func_t f = work->func;
 8 #ifdef CONFIG_LOCKDEP
 9         /*
10          * It is permissible to free the struct work_struct
11          * from inside the function that is called from it,
12          * this we need to take into account for lockdep too.
13          * To avoid bogus "held lock freed" warnings as well
14          * as problems when looking into work->lockdep_map,
15          * make a copy and use that here.
16          */
17         struct lockdep_map lockdep_map = work->lockdep_map;
18 #endif
19         trace_workqueue_execution(cwq->thread, work);
20         cwq->current_work = work; //儲存目前工作到current_work
21         list_del_init(cwq->worklist.next); // 将此工作從連結清單中移除
22         spin_unlock_irq(&cwq->lock);
23 
24         BUG_ON(get_wq_data(work) != cwq);
25         work_clear_pending(work);
26         lock_map_acquire(&cwq->wq->lockdep_map);
27         lock_map_acquire(&lockdep_map);
28         f(work); //執行工作函數
29         lock_map_release(&lockdep_map);
30         lock_map_release(&cwq->wq->lockdep_map);
31 
32         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
33             printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
34                     "%s/0x%08x/%d\n",
35                     current->comm, preempt_count(),
36                         task_pid_nr(current));
37             printk(KERN_ERR "    last function: ");
38             print_symbol("%s\n", (unsigned long)f);
39             debug_show_held_locks(current);
40             dump_stack();
41         }
42 
43         spin_lock_irq(&cwq->lock);
44         cwq->current_work = NULL;
45     }
46     spin_unlock_irq(&cwq->lock);
47      

    上面的注釋已經說明清楚代碼的邏輯了,這裡就不在解釋了。

四、添加工作到核心工作隊列中:

    上面提到了,當工作隊列中的worklist連結清單為空,及沒有需要執行的工作,怎會将工作隊列所在的核心線程挂起,那麼什麼時候會将其喚醒呢?肯定就是當有工作添加到連結清單的時候,即調用schedule_work()的時候:

1 int schedule_work(struct work_struct *work)
2 {
3     return queue_work(keventd_wq, work); // 将工作添加到核心提我們建立好的工作隊列中
4      

    前面在初始化的時候,就将核心建立的工作隊列儲存在keventd_wq變量中。

1 /**
 2  * queue_work - queue work on a workqueue
 3  * @wq: workqueue to use
 4  * @work: work to queue
 5  *
 6  * Returns 0 if @work was already on a queue, non-zero otherwise.
 7  *
 8  * We queue the work to the CPU on which it was submitted, but if the CPU dies
 9  * it can be processed by another CPU.
10  */
11 int queue_work(struct workqueue_struct *wq, struct work_struct *work)                                                                     
12 {
13     int ret;
14 
15     ret = queue_work_on(get_cpu(), wq, work);
16     put_cpu();
17 
18     return ret;
19 }
20 
21 int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
22 {
23     int ret = 0;
24 
25     if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
26         BUG_ON(!list_empty(&work->entry));
27         __queue_work(wq_per_cpu(wq, cpu), work);
28         ret = 1;
29     }
30     return ret;
31 }
32 
33 static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
34 {
35     unsigned long flags;
36 
37     spin_lock_irqsave(&cwq->lock, flags); //加鎖,保證insert_work原子操作
38     insert_work(cwq, work, &cwq->worklist);
39     spin_unlock_irqrestore(&cwq->lock, flags); //解鎖
40 }
41 
42 static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head)
43 {
44     trace_workqueue_insertion(cwq->thread, work);
45     
46     set_wq_data(work, cwq);
47     /*
48      * Ensure that we get the right work->data if we see the
49      * result of list_add() below, see try_to_grab_pending().
50      */
51     smp_wmb();
52     list_add_tail(&work->entry, head); //加入到worklist連結清單
53     wake_up(&cwq->more_work); //喚醒在more_work等待連結清單上的任務,即工作隊列線程
54      

五、使用自定義工作隊列:

    通過上面的分析,建立 工作隊列最基本的接口時create_workqueue()。當我們要把工作放入到自定義的工作隊列時,使用如下接口:

1 int queue_work(struct workqueue_struct *wq, struct      

    在上面的分析中,其實已經使用了此接口,隻不過我們調用schedule_work()的時候,wq參數為核心已建立好的工作隊列keventd_wq。