天天看点

JVM:锁实现(synchronized&JSR166)行为分析和相关源码

JVM中有这样一段注释:

// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent.  PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc.  The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.
           

其中是说ParkEvent用于Java语言级别的关键字synchronized。

Parkers用于Java类库中的并发数据集合,该集合是由JSR166发展来的。

这里说这两个东西功能类似,将来会统一使用ParkEvent。

那么它们究竟有什么区别呢?

我们先看看这两个类的大概接口样子:

(ParkEvent)

class ParkEvent : public os::PlatformEvent {
  private:
    ParkEvent * FreeNext ;

    // Current association
    Thread * AssociatedWith ;
    intptr_t RawThreadIdentity ;        // LWPID etc
    volatile int Incarnation ;

class PlatformEvent : public CHeapObj<mtInternal> {
    // Use caution with reset() and fired() -- they may require MEMBARs
    void reset() { _Event = 0 ; }
    int  fired() { return _Event; }
    void park () ;
    void unpark () ;
    int  TryPark () ;
    int  park (jlong millis) ; // relative timed-wait only
           

(Parkers)

class Parker : public os::PlatformParker {
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;
           

可以看到它们提供一致的相同接口,park和unpark。从而支撑Java中并发控制的功能。

它们究竟有什么不同呢?我们首先来执行2段类似的代码。

阻塞线程获取锁的顺序完全相反

首先是使用synchronized提供的锁机制,我们随便用一个Object lock = new Object()作为锁关联的对象,代码如下,它的功能是让10个线程进入阻塞状态,然后释放锁,观察随后线程获取锁的顺序:

package com.psly.testLocks;

public class TestLockSynchronized {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					synchronized(lock){
						System.out.println(Thread.currentThread().getName() + " get synch lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
				
			});
		}
		synchronized(lock){
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		}
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}
           

我们用一个0.2seconds的时间,从而让先创建的线程能够先进入阻塞状态,输出为:

Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock! 
           

这有点奇怪,先尝试获取锁的线程竟然后获得锁!

先不管这个, 我们把这个例子改为JSR166的Lock重做一遍:

package com.psly.testLocks;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

	private static Lock lock = new ReentrantLock();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					lock.lock();
						System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					lock.unlock();
				}
				
			});
		}
		lock.lock();
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		lock.unlock();
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}
           

输出为:

Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!
           

 这个输出比较符合了我们的预期,毕竟先尝试获取锁的的确先获取了锁。

为什么这两种实现有这样的差异呢,我们来看下他们分别的阻塞队列实现,首先是JAVA的:

public void lock() {
        sync.lock();
    }

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
           

我们这里重点查看addWaiter(Node node);可以看出来,线程构造的阻塞节点是通过tail字段加入进队列的,并且作为next节点。这是个先进先出双向队列。

所以当锁被释放时,阻塞线程获取锁的顺序与进阻塞队列是一致的。

我们接着看下synchronized的实现,这里涉及到JVM中系统编程的源码,这里也只贴出跟进入阻塞队列相关的代码:

class ObjectMonitor;

class ObjectSynchronizer : AllStatic {
  static void fast_enter  (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
  static void slow_enter  (Handle obj, BasicLock* lock, TRAPS);
           
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}
           
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  ................
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
           
void ATTR ObjectMonitor::enter(TRAPS) {
  // The following code is ordered to check the most common cases first
  // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
  Thread * const Self = THREAD ;
  void * cur ;
  ·········
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;

      //
      // We have acquired the contended monitor, but while we were
      // waiting another thread suspended us. We don't want to enter
      // the monitor while suspended because that would surprise the
      // thread that suspended us.
      //
          _recursions = 0 ;
     ······
}

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    // Try the lock - TATAS
    if (TryLock (Self) > 0) {
			......
    }

    if (TrySpin (Self) > 0) {
       ......
    }

    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    // Push "Self" onto the front of the _cxq.
    // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
    // Note that spinning tends to reduce the rate at which threads
    // enqueue and dequeue on EntryList|cxq.
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
           

这里的重点是,ObjectWaiter node(Self) ;

ObjectWaiter node(Self) ;
........
 for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
           

_cxq,我们采用比较并交换的原子指令,修改了_cxq,修改之前将_cxq的旧值填入node的next字段,这样一来我们就在_cxq上构造了个stack,也就是先进后出的队列。于是下次当我们索取_cxq时候自然就取得了最后填入的值。这解释了我们上面的执行示例,阻塞线程获取锁的顺序与进队列完全相反。

我们接着看下再复杂点的例子,依次启动10个线程,依次获取锁,获得锁的同时打印自身信息,然后主动调用wait语义的方法陷入阻塞状态。等到这10个线程都阻塞之后主线程获取锁,接着再启动10个无等待线程,这是个线程唯一做的事情就是依次获取锁,他们会按照我们上面所说的方式进入阻塞队列。接着主线程依次发送4次notify语义的信号(注意时间间隔),然后释放锁。我们感兴趣的是这几个收到通知的线程,他们相对已经在阻塞队列中的线程,谁会先获取锁?他们的排列又是怎么样的呢?

我们先执行JSR166的版本,代码如下:

package com.psly.testLocks;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockImplWithWaits {

	private static Lock lock = new ReentrantLock();
	private static Condition condition = lock.newCondition();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					lock.lock();		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					lock.lock();		//synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						condition.await();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		lock.lock();	//synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
				condition.signal();
		}
		Thread.sleep(200);
		lock.unlock();
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}
           

Thread-10到Thread-19为主动调用wait阻塞的线程,Thread-0到Thread-9为只获取锁的线程。

输出为:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get lock
           

可以看到JSR166的实现依然满足先进先出,即使Thread-10到Thread-13是先获取锁之后陷入wait的。我们接着看下这是如何做到的,

注意JSR166的实现是在JAVA层面完成的。

主要是三个调用:wait,notify,unlock。

await:

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
           

这里的addConditionWaiter尝试添加等待队列的节点。acquireQueued用于将来被唤醒之后的再次尝试获取锁。

我们来看addConditionWaiter,

/**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
           

是将新节点作为lastWaiter的next节点,并且本身成为lastWaiter节点。那么这里说明这构造的是一个先进先出的队列。(这里是在已经获取锁的情况下,所以不需同步)

我们接着看 signal

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
           
private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
           
final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }  
           

以上已经拿到了等待队列第一个节点,接着enq让他转移(transfer)到锁的阻塞队列

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
           

这样一来我们就完成了将等待线程从处于wait的状态,转移到了未获得锁处于阻塞的状态。

最后看下当主线程释放锁时的操作:

unlock

public void unlock() {
        sync.release(1);
    }
           
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
           

可以看到,调用unlock的线程唤醒了阻塞队列head中的第一个线程。 (OVER)

因为阻塞队列跟等待队列都是先进先出,这样子能够得到一个比较好的行为。

从而导致了我们之前的输出,看上去比较符合预期。

最后我们来看看采用synchronized,这个示例下的输出是什么,代码如下:

package com.psly.testLocks;

public class TestLockImplWithWaits {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					synchronized(lock){		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
			lock.notify();
		}
		Thread.sleep(200);
		}
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}
           

输出入下:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
           

这个结果很奇怪,最奇怪在于居然是连续输出Thread-10,Thread-13,Thread-12,Thread-11:

Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
           

我们调整一下发送notify的数量,给出所有等待线程数量的调用N。

for(int i = 0; i < N ; ++i){
			lock.notify();
		}
           

输出为:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
           

依然是Thread-10莫名其妙出现在最前,后面紧接着Thread-19到Thread-11倒序。

我们再尝试换下调用方式,采用notifyAll();

synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		lock.notifyAll();
		Thread.sleep(200);
		}
           

输出为:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
           

这下子又变了,Thread-10变为最后,完全逆序来获取锁了。

我们尝试进入JVM去看下这一切是怎么回事。与之前的过程类似,首先我们来看看等待之后线程节点如何组织的:

经研究,应该是下面这片代码:

WAIT:

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;


   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);

   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.

   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
           
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}
           

如果_WaitSet为空,则设置它,并且前驱和后继都是它。

如果只有_WaitSet一个,则将节点增加到它的后面。

不为空的情况下统一添加到next节点。

所以这里是个先进先出的队列。

notify

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,
  // timeout or other spurious wake-up, dequeue the
  // waiter from waiting list
  ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}
static int Knob_MoveNotifyee       = 2 ;       // notify() - disposition of notifyee
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
           

这里的DequeueWaiter调用DequeueSpecificWaiter,效果是队列出一个元素,_WaitSet.next成为_WaitSet。这里有_EntryList、_cxq两个数据结构。

接着我们走Policy==2分支,注意这里并不是全部放入cxq(尽管注释如此),判断是_EntryList==NULL的时候,直接将我们的节点放入它。否则,将我们的节点添加到_cxq这个stack前面。想象一个,假如第一个节点进来,发现_EntryList为空,_EntryList设置为它自己。从第二个节点开始,所有节点都是进stack,这样的话是不是取出时,第二个往后的节点都颠倒了呢。假如我们取节点的方式是先驱_EntryList,然后再取stack中的元素。则就会发生示例中Thread-10提前的乱序情况。 

但是注意,之前的notifyAll并没有产生这种效果。所以我们来看下notifyAll的代码:

if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         iterator->TState = ObjectWaiter::TS_CXQ ;
         for (;;) {
             ObjectWaiter * Front = _cxq ;
             iterator->_next = Front ;
             if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                 break ;
             }
         }
     } else
           

果然如此!notifyAll的逻辑跟notify大部分一样,除了它将所有节点都加入cxq。所以我们才会观察到notifyAll调用之后的节点获取锁顺序是逆序。

unlock

我们接着看看unlock的时候,是不是如我们猜测的那样先取_EntryList的元素,再来看cxq。

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
  fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
  // if displaced header is null, the previous enter is recursive enter, no-op
  .........

  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
           
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
       ...................
       for (;;) {
      assert (THREAD == _owner, "invariant") ;


      if (Knob_ExitPolicy == 0) {

        .........
      } else {
			..........
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;

  

      w = _EntryList  ;
      if (w != NULL) {

          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
 

      if (QMode == 1) {
               ..............
      } else {
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
           
static int Knob_QMode              = 0 ;       // EntryList-cxq policy - queue discipline
static int Knob_ExitPolicy         = 0 ;
           

这里是先取_EntryList,假如有就调用ExitEpilog并返回,否则采用原子操作取_cxq,然后将这个值再次给_EntryList,并调用ExitEpilog。

总之这里最终都是将数据给_EntryList,只不过假如_EntryList原本就有值,那么我们会先使用它,之后再使用_cxq。

我们看下ExitEpilog完成了什么事:

void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   assert (_owner == Self, "invariant") ;

   // Exit protocol:
   // 1. ST _succ = wakee
   // 2. membar #loadstore|#storestore;
   // 2. ST _owner = NULL
   // 3. unpark(wakee)

   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
   ParkEvent * Trigger = Wakee->_event ;

   Wakee  = NULL ;

   // Drop the lock
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;                               // ST _owner vs LD in unpark()

   if (SafepointSynchronize::do_call_back()) {
      TEVENT (unpark before SAFEPOINT) ;
   }

   Trigger->unpark() ;

   // Maintain stats and report events to JVMTI
   if (ObjectMonitor::_sync_Parks != NULL) {
      ObjectMonitor::_sync_Parks->inc() ;
   }
}
           

果然这里最后调用了unpark,从而唤醒了相应的那个线程。这里的_EntryList的值会如何变化?我们最后看下,当等待线程从wait中醒过来会做什么:

// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
..........
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;

.............
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
............
   if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         if (node.TState == ObjectWaiter::TS_WAIT) {
            DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
            assert(node._notified == 0, "invariant");
            node.TState = ObjectWaiter::TS_RUN ;
         }
..............
     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
           

进入了enter(Self),

void ATTR ObjectMonitor::enter(TRAPS) {
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;
           

进入EnterI(THREAD),

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    if (TryLock (Self) > 0) {
        assert (_succ != Self              , "invariant") ;
        assert (_owner == Self             , "invariant") ;
        assert (_Responsible != Self       , "invariant") ;
        return ;
    }

    DeferredInitialize () ;


    if (TrySpin (Self) > 0) {
        assert (_owner == Self        , "invariant") ;
        assert (_succ != Self         , "invariant") ;
        assert (_Responsible != Self  , "invariant") ;
        return ;
    }

    // The Spin failed -- Enqueue and park the thread ...
    assert (_succ  != Self            , "invariant") ;
    assert (_owner != Self            , "invariant") ;
    assert (_Responsible != Self      , "invariant") ;


    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }

    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
        // Try to assume the role of responsible thread for the monitor.
        // CONSIDER:  ST vs CAS vs { if (Responsible==null) Responsible=Self }
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {

        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        if ((SyncFlags & 2) && _Responsible == NULL) {
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        }

        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }

        if (TryLock(Self) > 0) break ;

        TEVENT (Inflated enter - Futile wakeup) ;
        if (ObjectMonitor::_sync_FutileWakeups != NULL) {
           ObjectMonitor::_sync_FutileWakeups->inc() ;
        }
        ++ nWakeups ;

        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
           Self->_ParkEvent->reset() ;
           OrderAccess::fence() ;
        }
        if (_succ == Self) _succ = NULL ;

        // Invariant: after clearing _succ a thread *must* retry _owner before parking.
        OrderAccess::fence() ;
    }

    assert (_owner == Self      , "invariant") ;
    assert (object() != NULL    , "invariant") ;
    // I'd like to write:
    //   guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    // but as we're at a safepoint that's not safe.

    UnlinkAfterAcquire (Self, &node) ;
    if (_succ == Self) _succ = NULL ;

    assert (_succ != Self, "invariant") ;
    if (_Responsible == Self) {
        _Responsible = NULL ;
        OrderAccess::fence(); // Dekker pivot-point
	.........
           

这里的重点是UnlinkAfterAcquire,

void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
    assert (_owner == Self, "invariant") ;
    assert (SelfNode->_thread == Self, "invariant") ;

    if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
        // Normal case: remove Self from the DLL EntryList .
        // This is a constant-time operation.
        ObjectWaiter * nxt = SelfNode->_next ;
        ObjectWaiter * prv = SelfNode->_prev ;
        if (nxt != NULL) nxt->_prev = prv ;
        if (prv != NULL) prv->_next = nxt ;
        if (SelfNode == _EntryList ) _EntryList = nxt ;
        assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        TEVENT (Unlink from EntryList) ;
    } else {
           

它会将_EntryList的值做更新,从而让锁的获取继续下去,保证不会出错。

到这里为止,我们终于大致走完了一遍synchronized锁与lock锁分别在JVM和JUC中的实现。

那么有个问题,linux中pthread锁的实现,行为模式又是怎么样的呢?

我们尝试将使用pthread来执行测试这两个例子:

锁获取代码:

编译生成执行文件testLock:gcc -pthread testLock.c -o testLock

执行:./testLock

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

int main(){
//  int N = 10;
    pthread_t threads[N];
    int i = 0;
    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    return 0;

}
           

输出:

0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
           

锁获取+阻塞等待代码:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

void* runTaskWithWait(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d wait first get lock\n", (int)pm);
    pthread_cond_wait(&cond, &mutex);
    printf("%d wait second get lock\n", (int)pm);
    usleep(300000);
    pthread_mutex_unlock(&mutex);
}
int main(){
//  int N = 10;
    pthread_t threads[N];
    pthread_t threadsForWaits[N];
    int i = 0;
    for(; i < N; ++i){
        pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
        usleep(100000);
    }

    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    //pthread_cond_broadcast(&cond);
    for(i = 0; i < N; ++i)
        pthread_cond_signal(&cond);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    for(i = 0; i < N; ++i){
        pthread_join(threadsForWaits[i], NULL);
    }
    return 0;

} 
           

输出:

0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock
           

只能说等待线程转移到阻塞线程之后的排列,看起来是没啥规律 ([email protected][email protected]=)