天天看點

Linux核心自旋鎖spinlock_t機制【轉】

spinlock用在什麼場景?

自旋鎖用在臨界區代碼非常少的情況。

spinlock在使用時有什麼注意事項?

  • 臨界區代碼應該盡可能精簡
  • 不允許睡眠(會出現死鎖)
  • Need to have interrupts disabled when locked by ordinary threads, if shared by an interrupt handler。(會出現死鎖)

spinlock是怎麼實作的?

看一下源代碼:

typedef struct raw_spinlock {
arch_spinlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
unsigned int magic, owner_cpu;
void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

typedef struct spinlock {
union {
struct raw_spinlock rlock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
        };
#endif
    };
} spinlock_t;
      

如果忽略CONFIG_DEBUG_LOCK_ALLOC話,spinlock主要包含一個arch_spinlock_t的結構,從名字可以看出,這個結構是跟體系結構有關的。

加鎖流程

加鎖的相關源碼如下:

#define raw_spin_lock(lock) _raw_spin_lock(lock)

static inline void spin_lock(spinlock_t *lock)
{
raw_spin_lock(&lock->rlock);
}
      

_raw_spin_lock完成實際的加鎖動作。

根據CPU體系結構,spinlock分為SMP版本和UP版本,這裡以SMP版本為例來分析。SMP版本中,_raw_spin_lock為聲明為:

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)        __acquires(lock);
      

再看_raw_spin_lock的實作,SMP版本中,看_raw_spin_lock最終調用了__raw_spin_lock,__raw_spin_lock的源代碼如下:

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
// 禁止搶占
preempt_disable();
// for debug
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
// real work done here
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
      

LOCK_CONTENDED是一個通用的加鎖流程。do_raw_spin_trylock和do_raw_spin_lock的實作依賴于具體的體系結構,以x86為例,do_raw_spin_trylock最終調用的是:

do_raw_spin_trylock的源代碼:

static inline int do_raw_spin_trylock(raw_spinlock_t *lock)
{
// 體系結構相關
return arch_spin_trylock(&(lock)->raw_lock);
}
      

以x86為例,arch_spin_trylock最終調用__ticket_spin_trylock函數。其源代碼如下:

// 定義在arch/x86/include/asm/spinlock_types.h
typedef struct arch_spinlock {
union {
__ticketpair_t head_tail;
struct __raw_tickets {
__ticket_t head, tail; // 注意,x86使用的是小端模式,存在高位址空間的是tail
        } tickets;
    };
} arch_spinlock_t;

// 定義在arch/x86/include/asm中
static __always_inline int __ticket_spin_trylock(arch_spinlock_t *lock)
{
arch_spinlock_t old, new;
// 擷取舊的ticket資訊
old.tickets = ACCESS_ONCE(lock->tickets);
// head和tail不一緻,說明鎖正被占用,加鎖不成功
if (old.tickets.head != old.tickets.tail)
return 0;

new.head_tail = old.head_tail + (1 << TICKET_SHIFT); // 将tail + 1

/* cmpxchg is a full barrier, so nothing can move before it */
return cmpxchg(&lock->head_tail, old.head_tail, new.head_tail) == old.head_tail;
}
      

從上述代碼中可知,__ticket_spin_trylock的核心功能,就是判斷自旋鎖是否被占用,如果沒被占用,嘗試原子性地更新lock中的head_tail的值,将tail+1,傳回是否加鎖成功。

不考慮CONFIG_DEBUG_SPINLOCK宏的話, do_raw_spin_lock的源代碼如下:

static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{
__acquire(lock);
arch_spin_lock(&lock->raw_lock);
}
      

arch_spin_lock的源代碼:

static __always_inline void arch_spin_lock(arch_spinlock_t *lock)
{
__ticket_spin_lock(lock);
}
      

__ticket_spin_lock的源代碼:

static __always_inline void __ticket_spin_lock(arch_spinlock_t *lock)
{
register struct __raw_tickets inc = { .tail = 1 };

// 原子性地把ticket中的tail+1,傳回的inc是+1之前的原始值
inc = xadd(&lock->tickets, inc);

for (;;) {
// 循環直到head和tail相等
if (inc.head == inc.tail)
break;
cpu_relax();
// 讀取新的head值
inc.head = ACCESS_ONCE(lock->tickets.head);
    }
barrier();      /* make sure nothing creeps before the lock is taken */
}
      

ticket分成兩個部分,一部分叫tail,相當于一個隊列的隊尾,一個部分叫head,相當于一個隊列的隊頭。初始化的時候,tail和head都是0,表示無人占用鎖。

__ticket_spin_lock就是原子性地把tail+1,并且把+1之前的值記錄下來,然後不斷地和head進行比較。由于是原子性的操作,是以不同的鎖競争者拿到的tail值是不一樣的。如果tail值和head一樣了,說明這時候沒人占用鎖了,下一個拿到鎖的就是自己了。

舉例來說,假設線程A和線程B競争同一個自旋鎖:

  1. 初始化tail=0, head=0,線程A将tail+1, 并傳回tail的舊值0,将0和head值比較,相等,于是這時候線程A就拿到了鎖。
  2. 線程A這時候也來拿鎖,将tail值+1,變成2,傳回tail的舊值1,将其和head值0比較,不相等,繼續循環。
  3. 線程A用完鎖了,将head值+1。
  4. 線程B讀取head值,并将其和tail值比較,發現相等,獲得鎖。

解鎖流程

對于SMP架構來說,spin_unlock最終調用的是__raw_spin_unlock,其源代碼如下:

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
spin_release(&lock->dep_map, 1, _RET_IP_);
// 主要的解鎖工作  
do_raw_spin_unlock(lock);
// 啟用搶占
preempt_enable();
}

static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{
arch_spin_unlock(&lock->raw_lock);
__release(lock);
}
      

arch_spin_unlock在x86體系結構下的實作代碼如下:

static __always_inline void arch_spin_unlock(arch_spinlock_t *lock)
{
__ticket_spin_unlock(lock);
}

static __always_inline void __ticket_spin_unlock(arch_spinlock_t *lock)
{
// 将tickers的head值加1
__add(&lock->tickets.head, 1, UNLOCK_LOCK_PREFIX);
}

      

考慮中斷處理函數

如果自旋鎖可能在中斷處理進行中使用,那麼在擷取自旋鎖之前,必須禁止本地中斷。則,持有鎖的核心代碼會被中斷處理程式打斷,接着試圖去争用這個已經被持有的自旋鎖。這樣的結果是,中斷處理函數自旋,等待該鎖重新可用,但是鎖的持有者在該中斷處理程式執行完畢之前不可能運作,這就成為了雙重請求死鎖。注意,需要關閉的隻是目前處理器上的中斷。因為中斷發生在不同的處理器上,即使中斷處理程式在同一鎖上自旋,也不會妨礙鎖的持有者(在不同處理器上)最終釋放。

是以要使用spin_lock_irqsave() / spin_unlock_irqrestore()這個版本的加鎖、解鎖函數。

函數spin_lock_irqsave():儲存中斷的目前狀态,禁止本地中斷,然後擷取指定的鎖。

函數spin_unlock_reqrestore():對指定的鎖解鎖,讓中斷恢複到加鎖前的狀态。是以即使中斷最初是被禁止的,代碼也不會錯誤地激活它們。

spinlock的幾種變種

  1. rwlock_t 讀寫鎖
  2. seqlock_t 順序鎖

【作者】張昺華

【微信公衆号】 張昺華