天天看点

linux驱动编程--工作队列浅析

          内核默认为每个cpu创建一个worker process,events/0;并在全局的工作队列上为其挂上了一个专属的链表()。工作会被挂到该链表上,由events/0取出来执行。centainly, except for this default worker process.User could create a new worker, too. and the kernel will help the new worker create a new workqueue, for mange its work.

工作队列相关函数

//重要结构体
	struct work_struct;		//装载需要处理的事件信息的实体
	struct workqueue_struct;		//全局工作队列头,内核会自动帮助维护一个全局工作队列链表,用户只需要将需要处理的工作挂上去即可
	struct cpu_workqueue_struct;	//与cpu绑定的工作
	struct delayed_work;		//用于有延时的工作
	//初始化
	INIT_WORK(_work,_func);
	INIT_DELAYED_WORK(_work,_func);
	DECLARE_WORK(n,f);
	DECLARE_DELAYED_WORK(n,f);	
	//常用函数
	schedule_work(a);			//将一个工作添加到调度列表中
	schedule_delayed_work(struct delayed_work * dwork,unsigned long delay);
	cancel_delayed_work(struct delayed_work * work);	//取消一个延迟的工作
	flush_scheduled_work(void);			//刷新工作队列,在取消之后建议刷新一遍工作队列
	create_workqueue(name);				//创建一个全局工作队列,并创建其对应的worker process
	create_singlethread_workqueue(name);		
	destroy_workqueue(struct workqueue_struct * wq);	//销毁一个工作队列
	queue_work(struct workqueue_struct * wq,struct work_struct * work);	//调度一个"指定"的工作
	queue_delayed_work(struct workqueue_struct * wq,struct delayed_work * dwork,unsigned long delay);
           

下面来是相关结构体分析:

/** 用来记录工作信息*/
	struct work_struct {
		atomic_long_t data;	//用户处理函数数据+ ?
#define WORK_STRUCT_PENDING 0		/* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
		struct list_head entry;  //用来挂到cpu工作队列链表的挂载点,也许在之后会通过container_of(obj,type,memb)找到该实体
		work_func_t func;	       //用户处理函数
#ifdef CONFIG_LOCKDEP
		struct lockdep_map lockdep_map;
#endif
         };
         /** 用来*/
	struct workqueue_struct;		//全局工作队的结构体,每个cpu有一个events,且在内核的全局工作队列链表上会为其维护一个专属二级链表
	/*
	 * The externally visible workqueue abstraction is an array of
	 * per-CPU workqueues:
	 */
	struct workqueue_struct {
		struct cpu_workqueue_struct *cpu_wq;	//每个工作队列绑定一个cpu,其下会记录该cpu需要处理的工作链表
		struct list_head list;			//被用来挂到内核的全局工作队列时用,以container_of()找回
		const char *name;				//该工作队列的名字
		int singlethread;					
		int freezeable;		/* Freeze threads during suspend */
		int rt;
#ifdef CONFIG_LOCKDEP
		struct lockdep_map lockdep_map;
#endif
	};
	struct cpu_workqueue_struct *cpu_wq;
	/*
	 * The per-CPU workqueue (if single thread, we always use the first
	 * possible cpu).
	 */
	struct cpu_workqueue_struct {
	
		spinlock_t lock;
	
		struct list_head worklist;          //当前cpu需要处理的工作
		wait_queue_head_t more_work;	//当cpu没事时,会在该waitqueue上睡眠,在使用时用wakeup()唤醒
		struct work_struct *current_work;
	
		struct workqueue_struct *wq;
		struct task_struct *thread;
	} ____cacheline_aligned;
           

几个重要函数分析:

        schedule_work()是用来像默认工作队列(events)添加工作的。对应的是queue_work(),可以用来向指定工作队列添加工作,例如自己新建的工作队列。

schedule_work(struct work_struct * work);
	/**
	 * schedule_work - put work task in global workqueue
	 * @work: job to be done
	 *
	 * This puts a job in the kernel-global workqueue.
	 */
	int schedule_work(struct work_struct *work)
	{
		return queue_work(keventd_wq, work);	//向默认的工作队列中添加一个新工作
	}
	/**
	 * queue_work - queue work on a workqueue
	 * @wq: workqueue to use
	 * @work: work to queue
	 *
	 * Returns 0 if @work was already on a queue, non-zero otherwise.
	 *
	 * We queue the work to the CPU on which it was submitted, but if the CPU dies
	 * it can be processed by another CPU.
	 */
	int queue_work(struct workqueue_struct *wq, struct work_struct *work)
	{
		int ret;
	
		ret = queue_work_on(get_cpu(), wq, work);//把work挂到get_cpu()所指定cpu的wq上去。之后该工作就有该cpu的工作者线程events负责完成
		put_cpu();					//使能抢占
	
		return ret;
	}
	/**
	 * queue_work_on - queue work on specific cpu
	 * @cpu: CPU number to execute work on
	 * @wq: workqueue to use
	 * @work: work to queue
	 *
	 * Returns 0 if @work was already on a queue, non-zero otherwise.
	 *
	 * We queue the work to a specific CPU, the caller must ensure it
	 * can't go away.
	 */
	int
	queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
	{
		int ret = 0;
	
		if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
			BUG_ON(!list_empty(&work->entry));
			__queue_work(wq_per_cpu(wq, cpu), work);
			ret = 1;
		}
		return ret;
	}
	
	#define put_cpu()		preempt_enable()

	static void __queue_work(struct cpu_workqueue_struct *cwq,
				 struct work_struct *work)
	{
		unsigned long flags;
	
		spin_lock_irqsave(&cwq->lock, flags);
		insert_work(cwq, work, &cwq->worklist);
		spin_unlock_irqrestore(&cwq->lock, flags);
	}

	static void insert_work(struct cpu_workqueue_struct *cwq,
				struct work_struct *work, struct list_head *head)
	{
		trace_workqueue_insertion(cwq->thread, work);	//??

		set_wq_data(work, cwq);			//??
		/*
		 * Ensure that we get the right work->data if we see the
		 * result of list_add() below, see try_to_grab_pending().
		 */
		smp_wmb();			//??
		list_add_tail(&work->entry, head);	//把新工作的挂到该cpu的处理队列上去
		wake_up(&cwq->more_work);		//唤醒等待队列的内容
	}

	
           

create_workqueue()

        下面是看一下工作者的具体创建过程

#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)

#define __create_workqueue(name, singlethread, freezeable, rt)	\
	__create_workqueue_key((name), (singlethread), (freezeable), (rt), \
			       NULL, NULL)

//创建一个新的wq将其挂到内核的全局工作队列上去.(通过list挂)
//且初始化该wq,为其指定cpu
//该wq下的cwq中会挂具体的工作
struct workqueue_struct *__create_workqueue_key(const char *name,
						int singlethread,
						int freezeable,
						int rt,
						struct lock_class_key *key,
						const char *lock_name)
{
	struct workqueue_struct *wq;
	struct cpu_workqueue_struct *cwq;
	int err = 0, cpu;

	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
		return NULL;

	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
	if (!wq->cpu_wq) {
		kfree(wq);
		return NULL;
	}

	wq->name = name;
	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
	wq->singlethread = singlethread;
	wq->freezeable = freezeable;
	wq->rt = rt;
	INIT_LIST_HEAD(&wq->list);

	if (singlethread) {//??
		cwq = init_cpu_workqueue(wq, singlethread_cpu);	//把wq和cpu关联上
		err = create_workqueue_thread(cwq, singlethread_cpu);
		start_workqueue_thread(cwq, -1);
	} else {
		cpu_maps_update_begin();
		/*
		 * We must place this wq on list even if the code below fails.
		 * cpu_down(cpu) can remove cpu from cpu_populated_map before
		 * destroy_workqueue() takes the lock, in that case we leak
		 * cwq[cpu]->thread.
		 */
		spin_lock(&workqueue_lock);
		list_add(&wq->list, &workqueues);		//将list挂到一个全局的工作队列中,之后会通过container_of()访问
		spin_unlock(&workqueue_lock);
		/*
		 * We must initialize cwqs for each possible cpu even if we
		 * are going to call destroy_workqueue() finally. Otherwise
		 * cpu_up() can hit the uninitialized cwq once we drop the
		 * lock.
		 */
		for_each_possible_cpu(cpu) {
			cwq = init_cpu_workqueue(wq, cpu);
			if (err || !cpu_online(cpu))
				continue;
			err = create_workqueue_thread(cwq, cpu);
			start_workqueue_thread(cwq, cpu);
		}
		cpu_maps_update_done();
	}

	if (err) {
		destroy_workqueue(wq);
		wq = NULL;
	}
	return wq;
}

           

继续阅读