天天看點

【Linux基礎系列之】同步機制介紹

  當多核CPU同時執行一段代碼的時候,就容易發生搶占,這段代碼可以叫做臨界區,其他核心控制路徑能夠進入臨界區前,進入臨界區前的核心控制路徑必須全部執行完這段代碼,為了避免這種共享資料發生競争,就需要采用同步技術,本文就簡單介紹linux核心當中的一些同步原語;

(一) per-cpu變量

  最簡單的同步技術就是把核心變量申明為per-cpu變量,這個變量隻會在本地CPU操作時調用,就不用考慮其他CPU搶占的情況;

  将一個共享memory變成Per-CPU memory本質上是一個耗費更多memory來解決performance的方法。當一個在多個CPU之間共享的變量變成每個CPU都有屬于自己的一個私有的變量的時候,我們就不必考慮來自多個CPU上的并發,僅僅考慮本CPU上的并發就可以了;

注意:

  1. per-cpu變量為來自不同的CPU的并發通路提供保護,但對來自異步函數(中斷函數和可延遲函數)的通路不提供保護;
  2. 核心搶占可能使CPU變量産生競争條件,核心控制路徑應該在禁用搶占的情況下通路per-cpu變量;

per-cpu API:

聲明和定義Per-CPU變量的API 描述
DECLARE_PER_CPU(type, name) DEFINE_PER_CPU(type, name) 普通的、沒有特殊要求的per cpu變量定義接口函數
DECLARE_PER_CPU_FIRST(type, name) DEFINE_PER_CPU_FIRST(type, name) 通過該API定義的per cpu變量位于整個per cpu相關section的最前面
DECLARE_PER_CPU_SHARED_ALIGNED(type, name) DEFINE_PER_CPU_SHARED_ALIGNED(type, name) 通過該API定義的per cpu變量在SMP的情況下會對齊到L1 cache line ,對于UP,不需要對齊到cachine line
DECLARE_PER_CPU_ALIGNED(type, name) DEFINE_PER_CPU_ALIGNED(type, name) 無論SMP或者UP,都是需要對齊到L1 cache line
DECLARE_PER_CPU_PAGE_ALIGNED(type, name) DEFINE_PER_CPU_PAGE_ALIGNED(type, name) 為定義page aligned per cpu變量而設定的API接口
DECLARE_PER_CPU_READ_MOSTLY(type, name) DEFINE_PER_CPU_READ_MOSTLY(type, name) 通過該API定義的per cpu變量是read mostly的

靜态定義的per cpu變量不能象普通變量那樣進行通路,需要使用特定的接口函數,具體如下:

#define this_cpu_ptr(ptr)                       \
 ({                                  \
     __verify_pcpu_ptr(ptr);                     \
     SHIFT_PERCPU_PTR(ptr, my_cpu_offset);               \                                                                 

//SHIFT_PERCPU_PTR: 原始的per cpu變量的位址,經過shift轉成實際的percpu 副本的位址;  


 })

 /*              
261  * Must be an lvalue. Since @var must be a simple identifier,
262  * we force a syntax error here if it isn't.
263  */             
 #define get_cpu_var(var)                        \
 (*({                                    \
     preempt_disable();                      \
     this_cpu_ptr(&var);                     \
 }))             
                 
 /*              
271  * The weird & is necessary because sparse considers (void)(var) to be
272  * a direct dereference of percpu variable (var).
273  */             
 #define put_cpu_var(var)                        \
 do {                                    \
     (void)&(var);                           \
     preempt_enable();                       \
 } while ()
           

  上面這兩個接口函數已經内嵌了鎖的機制(preempt disable),使用者可以直接調用該接口進行本CPU上該變量副本的通路。如果使用者确認目前的執行環境已經是preempt disable(或者是更厲害的鎖,例如關閉了CPU中斷),那麼可以使用lock-free版本的Per-CPU變量的API:__get_cpu_var :

#define __get_cpu_var(var)  (*this_cpu_ptr(&(var)))
           

  隻有Per-CPU變量的原始變量還是不夠的,必須為每一個CPU建立一個副本,怎麼建?直接靜态定義一個NR_CPUS的數組?NR_CPUS定義了系統支援的最大的processor的個數,并不是實際中系統processor的數目,這樣的定義非常浪費記憶體對于NUMA系統,每個CPU上的Per-CPU變量的副本應該位于它通路最快的那段memory上,也就是說Per-CPU變量的各個CPU副本可能是散布在整個記憶體位址空間的,而這些空間之間是有空洞的。

(二)原子操作

  那些有多個核心控制路徑進行read-modify-write的變量,核心提供了一個特殊的類型atomic_t來避免竟态,申明的變量就叫原子變量,這樣的行為我們可以叫做原子操作;

定義:atomic_t val_name = ATOMIC_INIT(val);

typedef struct {
    int counter;
} atomic_t; 
           

原子操作API

接口函數 描述
static inline void atomic_add(int i, atomic_t *v) 給一個原子變量v增加i
static inline int atomic_add_return(int i, atomic_t *v) 同上,隻不過将變量v的最新值傳回
static inline void atomic_sub(int i, atomic_t *v) 給一個原子變量v減去i
static inline int atomic_cmpxchg(atomic_t *ptr, int old, int new) 比較old和原子變量ptr中的值,如果相等,那麼就把new值賦給原子變量。傳回舊的原子變量ptr中的值
atomic_read 擷取原子變量的值
atomic_set 設定原子變量的值
atomic_inc(v) 原子變量的值加1
atomic_dec(v) 原子變量的值減去1
atomic_sub_and_test(i, v) 給一個原子變量v減去i,并判斷變量v的最新值是否等于0
atomic_add_negative(i,v) 給一個原子變量v增加i,并判斷變量v的最新值是否是負數
static inline int atomic_add_unless(atomic_t *v, int a, int u) 隻要原子變量v不等于u,那麼就執行原子變量v加a的操作,如果v不等于u,傳回非0值,否則傳回0值

代碼分析:

TODO
           

(三) memory barrier

  編譯器可以在将c翻譯成彙編的時候進行優化(例如記憶體通路指令的重新排序),讓産出的彙編指令在CPU上運作的時候更快;然而,這種優化産出的結果未必符合程式員原始的邏輯;

  程式員需通過内嵌在c代碼中的memory barrier來指導編譯器的優化行為(這種memory barrier又叫做優化屏障,Optimization barrier),讓編譯器産出即高效,又邏輯正确的代碼;

  Memory barrier 包括兩類:

1.編譯器 barrier
2.CPU Memory barrier
           

  Memory barrier 能夠讓 CPU 或編譯器在記憶體通路上有序。一個 Memory barrier 之前的記憶體通路操作必定先于其之後的完成。

Linux 核心提供函數 barrier() 用于讓編譯器保證其之前的記憶體通路先于其之後的完成;

memory barrier相關的API清單:

接口名稱 描述
barrier() 優化屏障,阻止編譯器為了進行性能優化而進行的memory access smp_wmb()reorder;
mb() 記憶體屏障(包括讀和寫),用于SMP和UP;
rmb() 讀記憶體屏障,用于SMP和UP;
wmb() 寫記憶體屏障,用于SMP和UP;
smp_mb() 用于SMP場合的記憶體屏障,對于UP不存在memory order的問題(對彙編指令),是以,在UP上就是一個優化屏障,確定彙編和c代碼的memory order是一緻的;
smp_rmb() 用于SMP場合的讀記憶體屏障;
smp_wmb() 用于SMP場合的寫記憶體屏障;
13 /* Optimization barrier */
 14 /* The "volatile" is due to gcc bugs */
 15 #define barrier() __asm__ __volatile__("": : :"memory")   

 53 #define smp_mb()    barrier()
 54 #define smp_rmb()   barrier()
 55 #define smp_wmb()   barrier()
           

  這裡的memory就是告知gcc,在彙編代碼中,我修改了memory中的内容,嵌入式彙編之前的c代碼塊和嵌入式彙編之後的c代碼塊看到的memory是不一樣的,對memory的通路不能依賴于嵌入式彙編之前的c代碼塊中寄存器的内容,需要重新加載;

  而之是以會有memory barrier這個“邪惡的東西”是由于CPU的速度要快于(數量級上的差異)memory以及他們之間的互連器件;

  對于memory需要更深入的了解可以檢視:https://www.kernel.org/doc/Documentation/memory-barriers.txt

(四) 自旋鎖spin_lock

  如果共享資料被中斷上下文和程序上下文通路,該如何保護呢?如果隻有程序上下文的通路,那麼可以考慮使用semaphore或者mutex的鎖機制,但是現在中斷上下文也參和進來,那些可以導緻睡眠的lock就不能使用了,這時候,可以考慮使用spin lock;

特點:

  1. spin lock是一種死等的鎖機制。目前的執行thread會不斷的重新嘗試直到擷取鎖進入臨界區。
  2. 隻允許一個thread進入。semaphore可以允許多個thread進入,spin lock不行,一次隻能有一個thread擷取鎖并進入臨界區,其他的thread都是在門口不斷的嘗試。
  3. 執行時間短。由于spin lock死等這種特性,是以它使用在那些代碼不是非常複雜的臨界區(當然也不能太簡單,否則使用原子操作或者其他适用簡單場景的同步機制就OK了),如果臨界區執行時間太長,那麼不斷在臨界區門口“死等”的那些thread是多麼的浪費CPU;
  4. 可以在中斷上下文執行。由于不睡眠,是以spin lock可以在中斷上下文中适用。

  當兩個程序執行在同一個cpu,執行在程序上下文的時候,其中一個程序擷取spin_lock的時候禁止本cpu上的搶占;如果兩個程序在不同的cpu上的時候,某個程序會等待已經擷取鎖的程序釋放鎖;

接口API的類型 spinlock中的定義 raw_spinlock的定義
定義spin lock并初始化 : DEFINE_SPINLOCK DEFINE_RAW_SPINLOCK
動态初始化spin lock : spin_lock_init raw_spin_lock_init
擷取指定的spin lock : spin_lock raw_spin_lock
擷取指定的spin lock同時disable本CPU中斷 : spin_lock_irq raw_spin_lock_irq
儲存本CPU目前的irq狀态,disable本CPU中斷并擷取指定的spin lock : spin_lock_irqsave raw_spin_lock_irqsave
擷取指定的spin lock同時disable本CPU的bottom half : spin_lock_bh raw_spin_lock_bh
釋放指定的spin lock : spin_unlock raw_spin_unlock
釋放指定的spin lock同時enable本CPU中斷 : spin_unlock_irq raw_spin_unock_irq
釋放指定的spin lock同時恢複本CPU的中斷狀态: spin_unlock_irqstore raw_spin_unlock_irqstore
釋放指定的spin lock同時enable本CPU的bottom half: spin_unlock_bh raw_spin_unlock_bh
嘗試去擷取spin lock,如果失敗,不會spin,而是傳回非零值 : spin_trylock raw_spin_trylock
判斷spin lock是否是locked,如果其他的thread已經擷取了該lock,那麼傳回非零值,否則傳回0 : spin_is_locked raw_spin_is_locked

這裡為什麼有raw_spin_lock:

  要解答這個問題,我們要回到2004年,MontaVista Software, Inc的開發人員在郵件清單中提出來一個Real-Time Linux Kernel的模型,旨在提升Linux的實時性,之後Ingo Molnar很快在他的一個項目中實作了這個模型,并最終産生了一個Real-Time preemption的patch。

  該模型允許在臨界區中被搶占,而且申請臨界區的操作可以導緻程序休眠等待,這将導緻自旋鎖的機制被修改,由原來的整數原子操作變更為信号量操作。當時核心中已經有大約10000處使用了自旋鎖的代碼,直接修改spin_lock将會導緻這個patch過于龐大,于是,他們決定隻修改哪些真正不允許搶占和休眠的地方,而這些地方隻有100多處,這些地方改為使用raw_spin_lock,但是,因為原來的核心中已經有raw_spin_lock這一名字空間,用于代表體系相關的原子操作的實作,于是linus本人建議:

把原來的raw_spin_lock改為arch_spin_lock;
    把原來的spin_lock改為raw_spin_lock;
    實作一個新的spin_lock;
           

對于2.6.33和之後的版本,我的了解是:

盡可能使用spin_lock;
絕對不允許被搶占和休眠的地方,使用raw_spin_lock,否則使用spin_lock;
如果你的臨界區足夠小,使用raw_spin_lock;    
           

  總結就是:spinlock,在rt linux(配置了PREEMPT_RT)的時候可能會被搶占(實際底層可能是使用支援PI(優先級翻轉)的mutext)。而raw_spinlock,即便是配置了PREEMPT_RT也要頑強的spin;arch_spinlock是architecture相關的實作 ;

typedef struct spinlock {
      union {              
          struct raw_spinlock rlock;
                           
  #ifdef CONFIG_DEBUG_LOCK_ALLOC
  # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
          struct {         
              u8 __padding[LOCK_PADSIZE];
              struct lockdep_map dep_map;
          };               
  #endif                   
      };                   
  } spinlock_t;


  typedef struct raw_spinlock {
      arch_spinlock_t raw_lock;
  #ifdef CONFIG_GENERIC_LOCKBREAK
      unsigned int break_lock;
  #endif
  #ifdef CONFIG_DEBUG_SPINLOCK
      unsigned int magic, owner_cpu;
      void *owner;
  #endif
  #ifdef CONFIG_DEBUG_LOCK_ALLOC
      struct lockdep_map dep_map;
  #endif
  } raw_spinlock_t;
           

  首先定義一個spinlock_t的資料類型,其本質上是一個整數值(對該數值的操作需要保證原子性),該數值表示spin lock是否可用。初始化的時候被設定為1。當thread想要持有鎖的時候調用spin_lock函數,該函數将spin lock那個整數值減去1,然後進行判斷,如果等于0,表示可以擷取spin lock,如果是負數,則說明其他thread的持有該鎖,本thread需要spin。

  補充spin_lock,spin_lock_irq,spin_lock_irqsave的差別:

  1. 普通的spin_lock是preempt_disable();而spin_lock_irq和spin_lock_irqsave是preempt_disable()同時local_irq_disable();在任何情況下使用spin_lock_irq都是安全的。因為它既禁止本地中斷,又禁止核心搶占;
  2. spin_lock比spin_lock_irq速度快,但是它并不是任何情況下都是安全的。
  3. spin_lock_irqsave保留目前中斷狀态,進入退出臨界區中斷狀态不變;

  舉個例子:程序A中調用了spin_lock(&lock)然後進入臨界區,此時來了一個中斷(interrupt),該中斷也運作在和程序A相同的CPU上,并且在該中斷處理程式中恰巧也會spin_lock(&lock)試圖擷取同一個鎖。由于是在同一個CPU上被中斷,程序A會被設定為TASK_INTERRUPT狀态,中斷處理程式無法獲得鎖,會不停的忙等,由于程序A被設定為中斷狀态,schedule()程序排程就無法再排程程序A運作,這樣就導緻了死鎖!

  但是如果該中斷處理程式運作在不同的CPU上就不會觸發死鎖。 因為在不同的CPU上出現中斷不會導緻程序A的狀态被設為TASK_INTERRUPT,隻是換出。當中斷處理程式忙等被換出後,程序A還是有機會獲得CPU,執行并退出臨界區。是以在使用spin_lock時要明确知道該鎖不會在中斷處理程式中使用;

(五) 讀/寫spin_lock

  spin lock嚴格的限制隻有一個thread可以進入臨界區,但是實際中,有些對共享資源的通路可以嚴格區分讀和寫的,這時候,其實多個讀的thread進入臨界區是OK的,使用spin lock則限制一個讀thread進入,進而導緻性能的下降。

讀寫鎖工作原理:

(1)假設臨界區内沒有任何的thread,這時候任何read thread或者write thread可以進入,但是隻能是其一。

(2)假設臨界區内有一個read thread,這時候新來的read thread可以任意進入,但是write thread不可以進入

(3)假設臨界區内有一個write thread,這時候任何的read thread或者write thread都不可以進入

(4)假設臨界區内有一個或者多個read thread,write thread當然不可以進入臨界區,但是該write thread也無法阻止後續read thread的進入,他要一直等到臨界區一個read thread也沒有的時候,才可以進入,多麼可憐的write thread。

讀/寫spin_lock API:

接口API描述 rw spinlock API
定義rw spin lock并初始化 DEFINE_RWLOCK
動态初始化rw spin lock rwlock_init
擷取指定的rw spin lock read_lock write_lock
擷取指定的rw spin lock同時disable本CPU中斷 read_lock_irq write_lock_irq
儲存本CPU目前的irq狀态,disable本CPU中斷并擷取指定的rw spin lock read_lock_irqsave write_lock_irqsave
擷取指定的rw spin lock同時disable本CPU的bottom half read_lock_bh write_lock_bh
釋放指定的spin lock read_unlock write_unlock
釋放指定的rw spin lock同時enable本CPU中斷 read_unlock_irq write_unlock_irq
釋放指定的rw spin lock同時恢複本CPU的中斷狀态 read_unlock_irqrestore write_unlock_irqrestore
擷取指定的rw spin lock同時enable本CPU的bottom half read_unlock_bh write_unlock_bh
嘗試去擷取rw spin lock,如果失敗,不會spin,而是傳回非零值 read_trylock write_trylock

(六) RCU(Read-Copy Update)

  RCU(Read-Copy Update)是Linux核心比較成熟的新型讀寫鎖,具有較高的讀寫并發性能,常常用在需要互斥的性能關鍵路徑。

  RCU允許多個讀者和寫者并發執行,而且RCU是不用鎖的,它不使用被所有CPU共享的鎖或者計數器,在這一點上與讀/寫自旋鎖和順序鎖,由于高速緩存行竊用和失效而有很高的的開銷,RCU具有更大的優勢;

  RCU的核心理念是讀者通路的同時,寫者可以更新通路對象的副本,但寫者需要等待所有讀者完成通路之後,才能删除老對象。這個過程實作的關鍵和難點就在于如何判斷所有的讀者已經完成通路。通常把寫者開始更新,到所有讀者完成通路這段時間叫做寬限期(Grace Period)。

RCU的使用場景比較受限,主要适用于下面的場景:

(1)RCU隻能保護動态配置設定的資料結構,并且必須是通過指針通路該資料結構

(2)受RCU保護的臨界區内不能sleep(SRCU不是本文的内容)

(3)讀寫不對稱,對writer的性能沒有特别要求,但是reader性能要求極高。

(4)reader端對新舊資料不敏感。

對于reader,RCU的操作包括:

(1)rcu_read_lock,用來辨別RCU read side臨界區的開始。

(2)rcu_dereference,該接口用來擷取RCU protected pointer。reader要通路RCU保護的共享資料,當然要擷取RCU protected pointer,然後通過該指針進行dereference的操作。

(3)rcu_read_unlock,用來辨別reader離開RCU read side臨界區

對于writer,RCU的操作包括:

(1)rcu_assign_pointer。該接口被writer用來進行removal的操作,在witer完成新版本資料配置設定和更新之後,調用這個接口可以讓RCU protected pointer指向RCU protected data。

(2)synchronize_rcu。writer端的操作可以是同步的,也就是說,完成更新操作之後,可以調用該接口函數等待所有在舊版本資料上的reader線程離開臨界區,一旦從該函數傳回,說明舊的共享資料沒有任何引用了,可以直接進行reclaimation的操作。

(3)call_rcu。當然,某些情況下(例如在softirq context中),writer無法阻塞,這時候可以調用call_rcu接口函數,該函數僅僅是注冊了callback就直接傳回了,在适當的時機會調用callback函數,完成reclaimation的操作。這樣的場景其實是分開removal和reclaimation的操作在兩個不同的線程中:updater和reclaimer。

//讀鎖;

  static inline void __rcu_read_lock(void)
  {              
      preempt_disable();
  }   
      
  static inline void __rcu_read_unlock(void)
  {   
      preempt_enable();
  } 

//讀者在讀完RCU保護的資料結構之前,是不能睡眠的;
           

  call_rcu:核心每經過一個時鐘tick就周期性的檢測CPU是否經過了一個靜止狀态;如果所有cpu都經過了靜止狀态;本地tasklet就執行連結清單當中所有的回調函數;

(七) 信号量

  核心信号量類似于自旋鎖,因為當鎖關閉的時侯,不允許核心控制路徑繼續運作,當一個核心任務試圖擷取核心信号量所保護的臨界區資源時,相應的程序被挂起到一個等待隊列中,然後讓其睡眠,目前cpu就可以去執行其他的代碼,資源被釋放的時候,程序在再次變為可運作的;是以隻有可以睡眠的函數才可以擷取核心信号量,中斷處理程式和可延遲函數都不能使用核心信号量;

struct semaphore {
      raw_spinlock_t      lock;
      unsigned int        count;
      struct list_head    wait_list;
  }; 
           

  信号量不同于自旋鎖,它不會禁止核心搶占,是以持有信号量的代碼可以被搶占;同時自旋鎖在一個時刻隻允許一個任務持有它,信号量同時允許的持有數量可以在申明信号量時指定;

信号量初始化:

static inline void sema_init(struct semaphore *sem, int val)
  {  
      static struct lock_class_key __key;
      *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val);
      lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, );
  }
 
           

擷取信号量:

int down_interruptible(struct semaphore *sem)
  {   
      unsigned long flags;
      int result = ;
      
      raw_spin_lock_irqsave(&sem->lock, flags);
      if (likely(sem->count > ))
          sem->count--;
      else
          result = __down_interruptible(sem);
      raw_spin_unlock_irqrestore(&sem->lock, flags);                                    
      
      return result;
  }   
  EXPORT_SYMBOL(down_interruptible);

 static noinline int __sched __down_interruptible(struct semaphore *sem)
 {                
     return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
 }  

 static inline int __sched __down_common(struct semaphore *sem, long state,
                                 long timeout)
 {   
     struct task_struct *task = current;
     struct semaphore_waiter waiter;
     //先添加到wait清單;
     list_add_tail(&waiter.list, &sem->wait_list);
     waiter.task = task;
     waiter.up = false;
     
     for (;;) {
         if (signal_pending_state(state, task))
             goto interrupted;
         if (unlikely(timeout <= ))
             goto timed_out;
         __set_task_state(task, state);//設定為可interruptible;
         raw_spin_unlock_irq(&sem->lock);
         timeout = schedule_timeout(timeout); 
         raw_spin_lock_irq(&sem->lock);
         if (waiter.up)
             return ;
     }
     
  timed_out:
     list_del(&waiter.list);        
     return -ETIME;//timeout之後就停止等待了;
     
  interrupted:
     list_del(&waiter.list);
     return -EINTR;
 }
           

常用接口說明:

extern void down(struct semaphore *sem); //該函數用于獲得信号量sem,他會導緻睡眠,睡眠狀态不可喚醒,是以不能在中斷上下文(包括IRQ上下文和softirq上下文)使用該函數。該函數将把sem的值減1,如果信号量sem的值非負,就直接傳回,否則調用者将被挂起,直到别的任務釋放該信号量才能繼續運作。 

  extern int __must_check down_interruptible(struct semaphore *sem);//該函數功能和down類似,不同之處為,down不會被信号(signal)打斷,但down_interruptible能被信号打斷,是以該函數有傳回值來區分是正常傳回還是被信号中斷,如果傳回0,表示獲得信号量正常傳回,如果被信号打斷,傳回-EINTR。 

  extern int __must_check down_trylock(struct semaphore *sem);該函數試着獲得信号量sem,如果能夠即時獲得,他就獲得該信号量并傳回,否則,表示不能獲得信号量sem,傳回值為非值。是以,他不會導緻調用者睡眠,能在中斷上下文使用。

  extern void up(struct semaphore *sem);//釋放信号量;并喚醒等待該資源程序隊列的第一個程序
           

釋放信号量:

void up(struct semaphore *sem)
 {  
     unsigned long flags;
    
     raw_spin_lock_irqsave(&sem->lock, flags);
     if (likely(list_empty(&sem->wait_list)))
         sem->count++;
     else
         __up(sem);
     raw_spin_unlock_irqrestore(&sem->lock, flags);
 }                                                                                                                                           
 EXPORT_SYMBOL(up);


 static noinline void __sched __up(struct semaphore *sem)
 {                          
     struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
                         struct semaphore_waiter, list);
     list_del(&waiter->list);
     waiter->up = true;       
     wake_up_process(waiter->task);
 }
           

  首先獲得sem所在的wait_list為頭部的連結清單的第一個有效節點,然後從連結清單中将其删除,然後喚醒該節點上睡眠的程序。

  由此可見,對于sem上的每次down_interruptible調用,都會在sem的wait_list連結清單尾部加入一新的節點。對于sem上的每次up調用,都會删除掉wait_list連結清單中的第一個有效節點,并喚醒睡眠在該節點上的程序。

(八) 互斥量

  前面說過信号量可以設定進入臨界去的task的數量,而互斥量mutex就是允許使用計數為1的信号量;

struct mutex {
      /* 1: unlocked, 0: locked, negative: locked, possible waiters */
      atomic_t        count;
      spinlock_t      wait_lock;
      struct list_head    wait_list;
  #if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
      struct task_struct  *owner;
  #endif                                                                                                                                      
  #ifdef CONFIG_MUTEX_SPIN_ON_OWNER
      struct optimistic_spin_queue osq; /* Spinner MCS lock */
  #endif
  #ifdef CONFIG_DEBUG_MUTEXES
      const char      *name;
      void            *magic;
  #endif
  #ifdef CONFIG_DEBUG_LOCK_ALLOC
      struct lockdep_map  dep_map;
  #endif
  };
           

常用接口如下:

void init_MUTEX (struct semaphore *sem);

  該函數用于初始化一個互斥鎖,即他把信号量sem的值設定為1。

void init_MUTEX_LOCKED (struct semaphore *sem);

  該函數也用于初始化一個互斥鎖,但他把信号量sem的值設定為0,即一開始就處在已鎖狀态。

DECLARE_MUTEX(name)

  該宏聲明一個信号量name并初始化他的值為1,即聲明一個互斥鎖。

DECLARE_MUTEX_LOCKED(name)

  該宏聲明一個互斥鎖name,但把他的初始值設定為0,即鎖在建立時就處在已鎖狀态。是以對于這種鎖,一般是先釋放後獲得

int fastcall __sched mutex_lock_interruptible(struct mutex *lock);和mutex_lock()一樣,也是擷取互斥鎖。在獲得了互斥鎖或進入睡眠直到獲得互斥鎖之後會傳回0。如果在等待擷取鎖的時候進入睡眠狀态收到一個信号(被信号打斷睡眠),則傳回_EINIR。

int fastcall __sched mutex_trylock(struct mutex *lock);試圖擷取互斥鎖,如果成功擷取則傳回1,否則傳回0,不等待。

void fastcall mutex_unlock(struct mutex *lock);//釋放被目前程序擷取的互斥鎖。該函數不能用在中斷上下文中,而且不允許去釋放一個沒有上鎖的互斥鎖。

互斥量和信号量:它們的标準使用方式都有簡單的規範:除非mutex的某個限制妨礙你使用,負責相比信号量要優先使用mutex;

(九) 完成變量

如果核心中一個任務需要發出信号通知另一個任務發生了某個特定的事件,可以用完成變量來實作這種同步:

include/linux/completion.h:

申明和定義:

struct completion {
      unsigned int done;
      wait_queue_head_t wait;
  };     

  #define DECLARE_COMPLETION(work) \
      struct completion work = COMPLETION_INITIALIZER(work)
           

完成變量API:

方法 描述
wait_for_completion 等待指定的完成變量,接受信号
complete 發送完成并喚醒信号給等待的任務

wait_for_completion() -> do_wait_for_common()

do_wait_for_common(struct completion *x,
             long (*action)(long), long timeout, int state)
  {  
      if (!x->done) {
          DECLARE_WAITQUEUE(wait, current);
     
          __add_wait_queue_tail_exclusive(&x->wait, &wait);
          do {
              if (signal_pending_state(state, current)) {
                  timeout = -ERESTARTSYS;
                  break;
              }
              __set_current_state(state);
              spin_unlock_irq(&x->wait.lock);
              timeout = action(timeout);
              spin_lock_irq(&x->wait.lock);
          } while (!x->done && timeout);
          __remove_wait_queue(&x->wait, &wait);
          if (!x->done)
              return timeout;
      }
      x->done--;
      return timeout ?: ;
  } 
           

繼續閱讀