天天看點

深入了解glibc的互斥鎖

作者:程式員小x

互斥鎖是多線程同步時常用的手段,使用互斥鎖可以保護對共享資源的操作。共享資源也被稱為臨界區,當一個線程對一個臨界區加鎖後,其他線程就不能進入該臨界區,直到持有臨界區鎖的線程釋放該鎖。

本文以glibc中mutex的實作為例,講解其背後的實作原理。

glibc mutex類型

glibc的互斥鎖的類型名稱為pthread_mutex_t,其結構可以用下面的結構體表示:

typedef struct {
    int __lock;
    int __count;
    int __owner;
    int __nusers;
    int __kind;
    // other ignore
} pthread_mutex_t;           

其中:

  • __lock表示目前mutex的狀态,0表示沒有被加鎖,1表示mutex已經被加鎖,2表示mutex被某個線程持有并且有另外的線程在等待它的釋放。
  • __count表示mutex被加鎖的次數,對于不可重入鎖,該值為0或者1,對于可重入鎖,count可以大于1。
  • __owner用來記錄持有目前mutex的線程id。
  • __nusers用于記錄多少個線程持有該互斥鎖,一般來說該值隻能是0或者1,但是對于讀寫鎖,多個讀線程可以共同持有鎖,是以nusers通常用于讀寫鎖的場景下。
  • __kind表示鎖的類型。

pthread_mutex_t鎖可以是如下的類型:

  • PTHREAD_MUTEX_TIMED_NP: 普通鎖,當一個線程加鎖以後,其餘請求鎖的線程将形成一個等待隊列,并在解鎖後按優先級獲得鎖。這種鎖政策保證了資源配置設定的公平性。當鎖unlock時,會喚醒等待隊列中的一個線程。
  • PTHREAD_MUTEX_RECURSIVE_NP: 可重入鎖,如果線程沒有獲得該mutex的情況下,争用該鎖,那麼與PTHREAD_MUTEX_TIMED_NP一樣。如果一個線程已經擷取鎖,其可以再次擷取鎖,并通過多次unlock解鎖。
  • PTHREAD_MUTEX_ERRORCHECK_NP: 檢錯鎖,如果同一個線程請求同一個鎖,則傳回EDEADLK,而不是死鎖,其他點和PTHREAD_MUTEX_TIMED_NP相同。
  • PTHREAD_MUTEX_ADAPTIVE_NP: 自适應鎖,此鎖在多核處理器下首先進行自旋擷取鎖,如果自旋次數超過配置的最大次數,則也會陷入核心态挂起。

mutex的加鎖過程

本文使用的源碼是glibc-2.34版本,http://mirror.keystealth.org/gnu/libc/glibc-2.34.tar.gz。

本文主要側重于講解互斥鎖從使用者态到核心态的加鎖過程,而不同類型鎖的實作細節,本文不重點讨論。後續将在其他文章中做探讨。

下面就以最簡單的類型PTHREAD_MUTEX_TIMED_NP來跟蹤加鎖過程,從___pthread_mutex_lock開始看起,其定義在pthread_mutex_lock.c中。

如下所示,PTHREAD_MUTEX_TIMED_NP的鎖會調用lll_mutex_lock_optimized方法進行加鎖,如下所示:

if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP
				 | PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))
    return __pthread_mutex_lock_full (mutex);

  if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
    {
      FORCE_ELISION (mutex, goto elision);
    simple:
      /* Normal mutex.  */
      LLL_MUTEX_LOCK_OPTIMIZED (mutex);
      assert (mutex->__data.__owner == 0);
    }           

lll_mutex_lock_optimized也定義在pthread_mutex_lock.c檔案中,從注釋了解到,這是為單線程進行的優化,如果是單線程,則直接将mutex的__lock的值修改為1(因為不存在競争),如果不是單線程,則調用lll_lock方法。

#ifndef LLL_MUTEX_LOCK
/* lll_lock with single-thread optimization.  */
static inline void
lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{
  /* The single-threaded optimization is only valid for private
     mutexes.  For process-shared mutexes, the mutex could be in a
     shared mapping, so synchronization with another process is needed
     even without any threads.  If the lock is already marked as
     acquired, POSIX requires that pthread_mutex_lock deadlocks for
     normal mutexes, so skip the optimization in that case as
     well.  */
  int private = PTHREAD_MUTEX_PSHARED (mutex);
  if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)
    mutex->__data.__lock = 1;
  else
    lll_lock (mutex->__data.__lock, private);
}

# define LLL_MUTEX_LOCK(mutex)						\
  lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))           

lll_lock定義在lowlevellock.h檔案中,又會調用到**__lll_lock方法,由于存在競争,是以在__lll_lock方法中使用了CAS方法**嘗試對mutex的__lock值進行修改。

CAS是compare-and-swap的含義,其是原子變量的實作的基礎,其僞代碼如下所示,即當記憶體mem出的值如果等于old_value,則将其替換為new_value,這個過程是原子的,底層由CMPXCHG指令保證。

bool CAS(T* mem, T new_value, T old_value) {
    if (*mem == old_value) {
        *mem = new_value;
        return true;
    } else {
        return false;
    }
}           

__lll_lock中的atomic_compare_and_exchange_bool_acq就是上述所說的CAS方法,如果futex = 0,則嘗試将其修改為1,表示加鎖成功, 如果futex >= 1,則會調用**__lll_lock_wait_private或者__lll_lock_wait**。注意這裡的futex其實就是mutex結構體中的__lock。

#define __lll_lock(futex, private)                                      \
  ((void)                                                               \
   ({                                                                   \
     int *__futex = (futex);                                            \
     if (__glibc_unlikely                                               \
         (atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \
       {                                                                \
         if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \
           __lll_lock_wait_private (__futex);                           \
         else                                                           \
           __lll_lock_wait (__futex, private);                          \
       }                                                                \
   }))
#define lll_lock(futex, private)	\
  __lll_lock (&(futex), private)           

__lll_lock_wait_private和__lll_lock_wait是類似的,這裡首先會調用atomic_exchange_acquire将futex的舊值和2進行交換,傳回值是futex的舊值。

是以如果其傳回值不為0,代表目前鎖還是加鎖狀态,可能需要進入核心态等待(調用futex_wait)。如果其傳回0,則代表,目前鎖已經被釋放,加鎖成功,退出循環。

注意futex值修改為2的目的是為了提高pthread_mutex_unlock的效率。在pthread_mutex_unlock()中,會調用atomic_exchange_rel()無條件的把mutex->__lock的值更新為0,并且檢查mutex->__lock的原始值,如果原始值為0或者1,表示沒有競争發生,自然也就沒有必要調用futex系統調用,浪費時間。隻有檢查到mutex->__lock的值大于1的時候,才需要調用futex系統調用,喚醒等待該鎖上的線程。

void
__lll_lock_wait_private (int *futex)
{
  if (atomic_load_relaxed (futex) == 2)
    goto futex;

  while (atomic_exchange_acquire (futex, 2) != 0)
    {
    futex:
      LIBC_PROBE (lll_lock_wait_private, 1, futex);
      futex_wait ((unsigned int *) futex, 2, LLL_PRIVATE); /* Wait if *futex == 2.  */
    }
}
libc_hidden_def (__lll_lock_wait_private)

void
__lll_lock_wait (int *futex, int private)
{
  if (atomic_load_relaxed (futex) == 2)
    goto futex;

  while (atomic_exchange_acquire (futex, 2) != 0)
    {
    futex:
      LIBC_PROBE (lll_lock_wait, 1, futex);
      futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2.  */
    }
}           

__lll_lock_wait_private和__lll_lock_wait調用了futex_wait,該函數相對簡單,其内部将會調用lll_futex_timed_wait方法。

static __always_inline int
futex_wait (unsigned int *futex_word, unsigned int expected, int private)
{
  int err = lll_futex_timed_wait (futex_word, expected, NULL, private);
  switch (err)
    {
    case 0:
    case -EAGAIN:
    case -EINTR:
      return -err;

    case -ETIMEDOUT: /* Cannot have happened as we provided no timeout.  */
    case -EFAULT: /* Must have been caused by a glibc or application bug.  */
    case -EINVAL: /* Either due to wrong alignment or due to the timeout not
		     being normalized.  Must have been caused by a glibc or
		     application bug.  */
    case -ENOSYS: /* Must have been caused by a glibc bug.  */
    /* No other errors are documented at this time.  */
    default:
      futex_fatal_error ();
    }
}           

lll_futex_timed_wait方法其實是對sys_futex系統調用的封裝,其最終将調用sys_futex方法。

# define lll_futex_timed_wait(futexp, val, timeout, private)     \
  lll_futex_syscall (4, futexp,                                 \
		     __lll_private_flag (FUTEX_WAIT, private),  \
		     val, timeout)

# define lll_futex_syscall(nargs, futexp, op, ...)                      \
  ({                                                                    \
    long int __ret = INTERNAL_SYSCALL (futex, nargs, futexp, op, 	\
				       __VA_ARGS__);                    \
    (__glibc_unlikely (INTERNAL_SYSCALL_ERROR_P (__ret))         	\
     ? -INTERNAL_SYSCALL_ERRNO (__ret) : 0);                     	\
  })

#define __NR_futex 202
#undef INTERNAL_SYSCALL
#define INTERNAL_SYSCALL(name, nr, args...)				\
	internal_syscall##nr (SYS_ify (name), args)

#undef SYS_ify
#define SYS_ify(syscall_name)	__NR_##syscall_name


#undef internal_syscall4
#define internal_syscall4(number, arg1, arg2, arg3, arg4)		\
({									\
    unsigned long int resultvar;					\
    TYPEFY (arg4, __arg4) = ARGIFY (arg4);			 	\
    TYPEFY (arg3, __arg3) = ARGIFY (arg3);			 	\
    TYPEFY (arg2, __arg2) = ARGIFY (arg2);			 	\
    TYPEFY (arg1, __arg1) = ARGIFY (arg1);			 	\
    register TYPEFY (arg4, _a4) asm ("r10") = __arg4;			\
    register TYPEFY (arg3, _a3) asm ("rdx") = __arg3;			\
    register TYPEFY (arg2, _a2) asm ("rsi") = __arg2;			\
    register TYPEFY (arg1, _a1) asm ("rdi") = __arg1;			\
    asm volatile (							\
    "syscall\n\t"							\
    : "=a" (resultvar)							\
    : "0" (number), "r" (_a1), "r" (_a2), "r" (_a3), "r" (_a4)		\
    : "memory", REGISTERS_CLOBBERED_BY_SYSCALL);			\
    (long int) resultvar;						\
})           

sys_futex的函數原型如下所示:

int sys_futex (int *uaddr, int op, int val, const struct timespec *timeout);           

其作用是原子性的檢查uaddr中計數器的值是否為val,如果是則讓程序休眠,直到FUTEX_WAKE或者逾時(time-out)。也就是把程序挂到uaddr相對應的等待隊列上去。

這裡實際上就是檢查mutex的**__lock是否等于2**。

  • 如果不等于2,意味着,鎖可能已經被釋放,不需要将線程添加到sleep隊列,sys_futex直接傳回,重新嘗試加鎖。
  • 如果等于2,則意味着使用者态到核心段的這段時間内,鎖的值沒有發生變化,于是将線程添加到sleep隊列,等待其他線程釋放鎖。

glibc的mutex的加鎖是使用者态的原子操作和核心态sys_futex共同作用的結果,上述過程可以用下面這張流程圖來概括:

深入了解glibc的互斥鎖

glic-mutex