JVM中有這樣一段注釋:
// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent. PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc. The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.
其中是說ParkEvent用于Java語言級别的關鍵字synchronized。
Parkers用于Java類庫中的并發資料集合,該集合是由JSR166發展來的。
這裡說這兩個東西功能類似,将來會統一使用ParkEvent。
那麼它們究竟有什麼差別呢?
我們先看看這兩個類的大概接口樣子:
(ParkEvent)
class ParkEvent : public os::PlatformEvent {
private:
ParkEvent * FreeNext ;
// Current association
Thread * AssociatedWith ;
intptr_t RawThreadIdentity ; // LWPID etc
volatile int Incarnation ;
class PlatformEvent : public CHeapObj<mtInternal> {
// Use caution with reset() and fired() -- they may require MEMBARs
void reset() { _Event = 0 ; }
int fired() { return _Event; }
void park () ;
void unpark () ;
int TryPark () ;
int park (jlong millis) ; // relative timed-wait only
(Parkers)
class Parker : public os::PlatformParker {
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
// Lifecycle operators
static Parker * Allocate (JavaThread * t) ;
static void Release (Parker * e) ;
private:
static Parker * volatile FreeList ;
static volatile int ListLock ;
可以看到它們提供一緻的相同接口,park和unpark。進而支撐Java中并發控制的功能。
它們究竟有什麼不同呢?我們首先來執行2段類似的代碼。
阻塞線程擷取鎖的順序完全相反
首先是使用synchronized提供的鎖機制,我們随便用一個Object lock = new Object()作為鎖關聯的對象,代碼如下,它的功能是讓10個線程進入阻塞狀态,然後釋放鎖,觀察随後線程擷取鎖的順序:
package com.psly.testLocks;
public class TestLockSynchronized {
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " get synch lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
}
for(int i = 0; i < N; ++i)
threads[i].join();
}
}
我們用一個0.2seconds的時間,進而讓先建立的線程能夠先進入阻塞狀态,輸出為:
Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock!
這有點奇怪,先嘗試擷取鎖的線程竟然後獲得鎖!
先不管這個, 我們把這個例子改為JSR166的Lock重做一遍:
package com.psly.testLocks;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLockSynchronized {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
lock.lock();
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.unlock();
for(int i = 0; i < N; ++i)
threads[i].join();
}
}
輸出為:
Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!
這個輸出比較符合了我們的預期,畢竟先嘗試擷取鎖的的确先擷取了鎖。
為什麼這兩種實作有這樣的差異呢,我們來看下他們分别的阻塞隊列實作,首先是JAVA的:
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我們這裡重點檢視addWaiter(Node node);可以看出來,線程構造的阻塞節點是通過tail字段加入進隊列的,并且作為next節點。這是個先進先出雙向隊列。
是以當鎖被釋放時,阻塞線程擷取鎖的順序與進阻塞隊列是一緻的。
我們接着看下synchronized的實作,這裡涉及到JVM中系統程式設計的源碼,這裡也隻貼出跟進入阻塞隊列相關的代碼:
class ObjectMonitor;
class ObjectSynchronizer : AllStatic {
static void fast_enter (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
static void slow_enter (Handle obj, BasicLock* lock, TRAPS);
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
................
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
·········
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
······
}
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
// Try the lock - TATAS
if (TryLock (Self) > 0) {
......
}
if (TrySpin (Self) > 0) {
......
}
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
這裡的重點是,ObjectWaiter node(Self) ;
ObjectWaiter node(Self) ;
........
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
_cxq,我們采用比較并交換的原子指令,修改了_cxq,修改之前将_cxq的舊值填入node的next字段,這樣一來我們就在_cxq上構造了個stack,也就是先進後出的隊列。于是下次當我們索取_cxq時候自然就取得了最後填入的值。這解釋了我們上面的執行示例,阻塞線程擷取鎖的順序與進隊列完全相反。
我們接着看下再複雜點的例子,依次啟動10個線程,依次擷取鎖,獲得鎖的同時列印自身資訊,然後主動調用wait語義的方法陷入阻塞狀态。等到這10個線程都阻塞之後主線程擷取鎖,接着再啟動10個無等待線程,這是個線程唯一做的事情就是依次擷取鎖,他們會按照我們上面所說的方式進入阻塞隊列。接着主線程依次發送4次notify語義的信号(注意時間間隔),然後釋放鎖。我們感興趣的是這幾個收到通知的線程,他們相對已經在阻塞隊列中的線程,誰會先擷取鎖?他們的排列又是怎麼樣的呢?
我們先執行JSR166的版本,代碼如下:
package com.psly.testLocks;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLockImplWithWaits {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
lock.lock(); //synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
lock.lock(); //synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
condition.signal();
}
Thread.sleep(200);
lock.unlock();
for(int i = 0; i < N; ++i)
threads[i].join();
for(int i = 0; i < N; ++i)
threadsForWaits[i].join();
}
}
Thread-10到Thread-19為主動調用wait阻塞的線程,Thread-0到Thread-9為隻擷取鎖的線程。
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get lock
可以看到JSR166的實作依然滿足先進先出,即使Thread-10到Thread-13是先擷取鎖之後陷入wait的。我們接着看下這是如何做到的,
注意JSR166的實作是在JAVA層面完成的。
主要是三個調用:wait,notify,unlock。
await:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
這裡的addConditionWaiter嘗試添加等待隊列的節點。acquireQueued用于将來被喚醒之後的再次嘗試擷取鎖。
我們來看addConditionWaiter,
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
是将新節點作為lastWaiter的next節點,并且本身成為lastWaiter節點。那麼這裡說明這構造的是一個先進先出的隊列。(這裡是在已經擷取鎖的情況下,是以不需同步)
我們接着看 signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
以上已經拿到了等待隊列第一個節點,接着enq讓他轉移(transfer)到鎖的阻塞隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這樣一來我們就完成了将等待線程從處于wait的狀态,轉移到了未獲得鎖處于阻塞的狀态。
最後看下當主線程釋放鎖時的操作:
unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到,調用unlock的線程喚醒了阻塞隊列head中的第一個線程。 (OVER)
因為阻塞隊列跟等待隊列都是先進先出,這樣子能夠得到一個比較好的行為。
進而導緻了我們之前的輸出,看上去比較符合預期。
最後我們來看看采用synchronized,這個示例下的輸出是什麼,代碼如下:
package com.psly.testLocks;
public class TestLockImplWithWaits {
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
lock.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
lock.notify();
}
Thread.sleep(200);
}
for(int i = 0; i < N; ++i)
threads[i].join();
for(int i = 0; i < N; ++i)
threadsForWaits[i].join();
}
}
輸出入下:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
這個結果很奇怪,最奇怪在于居然是連續輸出Thread-10,Thread-13,Thread-12,Thread-11:
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
我們調整一下發送notify的數量,給出所有等待線程數量的調用N。
for(int i = 0; i < N ; ++i){
lock.notify();
}
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
依然是Thread-10莫名其妙出現在最前,後面緊接着Thread-19到Thread-11倒序。
我們再嘗試換下調用方式,采用notifyAll();
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.notifyAll();
Thread.sleep(200);
}
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
這下子又變了,Thread-10變為最後,完全逆序來擷取鎖了。
我們嘗試進入JVM去看下這一切是怎麼回事。與之前的過程類似,首先我們來看看等待之後線程節點如何組織的:
經研究,應該是下面這片代碼:
WAIT:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
如果_WaitSet為空,則設定它,并且前驅和後繼都是它。
如果隻有_WaitSet一個,則将節點增加到它的後面。
不為空的情況下統一添加到next節點。
是以這裡是個先進先出的隊列。
notify
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
static int Knob_MoveNotifyee = 2 ; // notify() - disposition of notifyee
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
這裡的DequeueWaiter調用DequeueSpecificWaiter,效果是隊列出一個元素,_WaitSet.next成為_WaitSet。這裡有_EntryList、_cxq兩個資料結構。
接着我們走Policy==2分支,注意這裡并不是全部放入cxq(盡管注釋如此),判斷是_EntryList==NULL的時候,直接将我們的節點放入它。否則,将我們的節點添加到_cxq這個stack前面。想象一個,假如第一個節點進來,發現_EntryList為空,_EntryList設定為它自己。從第二個節點開始,所有節點都是進stack,這樣的話是不是取出時,第二個往後的節點都颠倒了呢。假如我們取節點的方式是先驅_EntryList,然後再取stack中的元素。則就會發生示例中Thread-10提前的亂序情況。
但是注意,之前的notifyAll并沒有産生這種效果。是以我們來看下notifyAll的代碼:
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
果然如此!notifyAll的邏輯跟notify大部分一樣,除了它将所有節點都加入cxq。是以我們才會觀察到notifyAll調用之後的節點擷取鎖順序是逆序。
unlock
我們接着看看unlock的時候,是不是如我們猜測的那樣先取_EntryList的元素,再來看cxq。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
.........
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
...................
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
.........
} else {
..........
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
w = _cxq ;
if (w == NULL) continue ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
if (QMode == 1) {
..............
} else {
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
static int Knob_QMode = 0 ; // EntryList-cxq policy - queue discipline
static int Knob_ExitPolicy = 0 ;
這裡是先取_EntryList,假如有就調用ExitEpilog并傳回,否則采用原子操作取_cxq,然後将這個值再次給_EntryList,并調用ExitEpilog。
總之這裡最終都是将資料給_EntryList,隻不過假如_EntryList原本就有值,那麼我們會先使用它,之後再使用_cxq。
我們看下ExitEpilog完成了什麼事:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
Trigger->unpark() ;
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
果然這裡最後調用了unpark,進而喚醒了相應的那個線程。這裡的_EntryList的值會如何變化?我們最後看下,當等待線程從wait中醒過來會做什麼:
// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
..........
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
.............
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
............
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
..............
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
進入了enter(Self),
void ATTR ObjectMonitor::enter(TRAPS) {
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
進入EnterI(THREAD),
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
DeferredInitialize () ;
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
// The Spin failed -- Enqueue and park the thread ...
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
if (TryLock(Self) > 0) break ;
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
// I'd like to write:
// guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// but as we're at a safepoint that's not safe.
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
assert (_succ != Self, "invariant") ;
if (_Responsible == Self) {
_Responsible = NULL ;
OrderAccess::fence(); // Dekker pivot-point
.........
這裡的重點是UnlinkAfterAcquire,
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
assert (_owner == Self, "invariant") ;
assert (SelfNode->_thread == Self, "invariant") ;
if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
// Normal case: remove Self from the DLL EntryList .
// This is a constant-time operation.
ObjectWaiter * nxt = SelfNode->_next ;
ObjectWaiter * prv = SelfNode->_prev ;
if (nxt != NULL) nxt->_prev = prv ;
if (prv != NULL) prv->_next = nxt ;
if (SelfNode == _EntryList ) _EntryList = nxt ;
assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
TEVENT (Unlink from EntryList) ;
} else {
它會将_EntryList的值做更新,進而讓鎖的擷取繼續下去,保證不會出錯。
到這裡為止,我們終于大緻走完了一遍synchronized鎖與lock鎖分别在JVM和JUC中的實作。
那麼有個問題,linux中pthread鎖的實作,行為模式又是怎麼樣的呢?
我們嘗試将使用pthread來執行測試這兩個例子:
鎖擷取代碼:
編譯生成執行檔案testLock:gcc -pthread testLock.c -o testLock
執行:./testLock
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}
int main(){
// int N = 10;
pthread_t threads[N];
int i = 0;
pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
return 0;
}
輸出:
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
鎖擷取+阻塞等待代碼:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}
void* runTaskWithWait(void* pm){
pthread_mutex_lock(&mutex);
printf("%d wait first get lock\n", (int)pm);
pthread_cond_wait(&cond, &mutex);
printf("%d wait second get lock\n", (int)pm);
usleep(300000);
pthread_mutex_unlock(&mutex);
}
int main(){
// int N = 10;
pthread_t threads[N];
pthread_t threadsForWaits[N];
int i = 0;
for(; i < N; ++i){
pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
usleep(100000);
}
pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
//pthread_cond_broadcast(&cond);
for(i = 0; i < N; ++i)
pthread_cond_signal(&cond);
usleep(100000);
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
for(i = 0; i < N; ++i){
pthread_join(threadsForWaits[i], NULL);
}
return 0;
}
輸出:
0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock
隻能說等待線程轉移到阻塞線程之後的排列,看起來是沒啥規律 ([email protected][email protected]=)