鎖是作業系統中實作程序同步的重要機制。
基本概念
臨界區(Critical Section)是指對共享資料進行通路與操作的代碼區域。所謂共享資料,就是可能有多個代碼執行流并發地執行,并在執行中可能會同時通路的資料。
同步(Synchronization)是指讓兩個或多個程序/線程能夠按照程式員期望的方式來協調執行的順序。比如,讓A程序必須完成某個操作後,B程序才能執行。互斥(Mutual Exclusion)則是指讓多個線程不能夠同時通路某些資料,必須要一個程序通路完後,另一個程序才能通路。
當多個程序/線程并發地執行并且通路一塊資料,并且程序/線程的執行結果依賴于它們的執行順序,我們就稱這種情況為競争狀态(Race Condition)。
Xv6作業系統要求在核心臨界區操作時中斷必須關閉。如果此時中斷開啟,那麼可能會出現以下死鎖情況:A程序在核心态運作并拿下了p鎖時,觸發中斷進入中斷處理程式,中斷處理程式也在核心态中請求p鎖,由于鎖在A程序手裡,且隻有A程序執行時才能釋放p鎖,是以中斷處理程式必須傳回,p鎖才能被釋放。那麼此時中斷處理程式會永遠拿不到鎖,陷入無限循環,進入死鎖。
Xv6中實作了自旋鎖(Spinlock)用于核心臨界區通路的同步和互斥。自旋鎖最大的特征是當程序拿不到鎖時會進入無限循環,直到拿到鎖退出循環。顯然,自旋鎖看上去效率很低,我們很容易想到更加高效的基于等待隊列的方法,讓等待程序陷入阻塞而不是無限循環。然而,Xv6允許同時運作多個CPU核,多核CPU上的等待隊列實作相當複雜,是以使用自旋鎖是相對比較簡單且能正确執行的實作方案。
Xv6的Spinlock
Xv6中鎖的定義如下
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
uint pcs[10]; // The call stack (an array of program counters)
// that locked the lock.
};
核心的變量隻有一個
locked
,當
locked
為1時代表鎖已被占用,反之未被占用,初始值為0。
在調用鎖之前,必須對鎖進行初始化。
void initlock(struct spinlock *lk, char *name) {
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
最困難的地方是如何對
locked
變量進行原子操作占用鎖和釋放鎖。這兩步具體被實作為
acquire()
和
release()
函數。(注意v7版本和v11版本的實作略有不同,本文使用的是v11版本)
acquire()
函數
acquire()
// Acquire the lock.
// Loops (spins) until the lock is acquired.
// Holding a lock for a long time may cause
// other CPUs to waste time spinning to acquire it.
void acquire(struct spinlock *lk) {
pushcli(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// The xchg is atomic.
while(xchg(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen after the lock is acquired.
__sync_synchronize();
// Record info about lock acquisition for debugging.
lk->cpu = mycpu();
getcallerpcs(&lk, lk->pcs);
}
acquire()
函數首先禁止了中斷,并且使用專門的
pushcli()
函數,這個函數保證了如果有兩個
acquire()
禁止了中斷,那麼也必須調用兩次
release()
中的
popcli()
後中斷才會被允許。然後,
acquire()
函數采用
xchg
指令來實作在設定
locked
為1的同時獲得其原來的值的操作。這裡的C代碼中封裝了一個
xchg()
函數,在
xchg()
函數中采用GCC的内聯彙編特性,實作如下
static inline uint xchg(volatile uint *addr, uint newval) {
uint result;
// The + in "+m" denotes a read-modify-write operand.
asm volatile("lock; xchgl %0, %1" :
"+m" (*addr), "=a" (result) :
"1" (newval) :
"cc");
return result;
}
其中,
volatile
标志用于避免gcc對其進行一些優化;第一個冒号後的
"+m" (*addr), "=a" (result)
是這個彙編指令的兩個輸出值;
newval
是這個彙編指令的輸入值。假設
newval
位于
eax
寄存器中,
addr
位于
rax
寄存器中,那麼gcc會翻譯得到如下彙編指令
lock; xchgl (%rdx), %eax
由于
xchg
函數是
inline
的,它會被直接嵌入調用
xchg
函數的代碼中,使用的寄存器可能會有所不同。
下面我們來分析一下上面的指令的語義。·
lock
是一個指令字首,它保證了這條指令對總線和緩存的獨占權,也就是這條指令的執行過程中不會有其他CPU或同CPU内的指令通路緩存和記憶體。由于現代CPU一般是多發射流水線+亂序執行的,是以一般情況下并不能保證這一點。
xchgl
指令是一條古老的x86指令,作用是交換兩個寄存器或者記憶體位址裡的4位元組值,兩個值不能都是記憶體位址,他不會設定條件碼。
那麼,仔細思考一下就能發現,以上一條
xchg
指令就同時做到了交換locked和1的值,并且在之後通過檢查
eax
寄存器就能知道locked的值是否為0。并且,以上操作是原子的,這就保證了有且隻有一個程序能夠拿到locked的0值并且進入臨界區。
最後,
acquire()
函數使用
__sync_synchronize
為了避免編譯器對這段代碼進行指令順序調整的話和避免CPU在這塊代碼采用亂序執行的優化。
release()
函數
release()
// Release the lock.
void release(struct spinlock *lk) {
if(!holding(lk))
panic("release");
lk->pcs[0] = 0;
lk->cpu = 0;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other cores before the lock is released.
// Both the C compiler and the hardware may re-order loads and
// stores; __sync_synchronize() tells them both not to.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code can't use a C assignment, since it might
// not be atomic. A real OS would use C atomics here.
asm volatile("movl $0, %0" : "+m" (lk->locked) : );
popcli();
}
release
函數為了保證設定locked為0的操作的原子性,同樣使用了内聯彙編。最後,使用popcli()來允許中斷(或者彈出一個cli,但因為其他鎖未釋放使得中斷依然被禁止)。
在Xv6中實作信号量
struct semaphore {
int value;
struct spinlock lock;
struct proc *queue[NPROC];
int end;
int start;
};
void sem_init(struct semaphore *s, int value) {
s->value = value;
initlock(&s->lock, "semaphore_lock");
end = start = 0;
}
void sem_wait(struct semaphore *s) {
acquire(&s->lock);
s->value--;
if (s->value < 0) {
s->queue[s->end] = myproc();
s->end = (s->end + 1) % NPROC;
sleep(myproc(), &s->lock)
}
release(&s->lock);
}
void sem_signal(struct semaphore *s) {
acquire(&s->lock);
s->value++;
if (s->value <= 0) {
wakeup(s->queue[s->start]);
s->queue[s->start] = 0;
s->start = (s->start + 1) % NPROC;
}
release(&s->lock);
}
上面的代碼使用Xv6提供的接口實作了信号量,格式和命名與POSIX标準類似。這個信号量的實作采用等待隊列的方式。當一個程序因信号量陷入阻塞時,會将自己放進等待隊列并睡眠(18-22行)。當一個程序釋放信号量時,會從等待隊列中取出一個程序繼續執行(29-33行)。
轉載于:https://www.cnblogs.com/hehao98/p/10678493.html