天天看點

面試官:談一談java中基于AQS的并發鎖原理

面試官:談一談java中基于AQS的并發鎖原理

我:java中的AQS是指AbstractQueuedSynchronizer類,java中并發鎖控制邏輯都是基于這個類來實作的。

面試官:能說一下你用過的基于AQS的并發類有哪些嗎?

我:首先是基于AQS在内部類實作了公平鎖和非公平鎖,具體有3個類:ReentrantLock、ReentrantReadWriteLock、Semaphore,UML類圖如下:

面試官:談一談java中基于AQS的并發鎖原理
面試官:談一談java中基于AQS的并發鎖原理
面試官:談一談java中基于AQS的并發鎖原理
還有2個Latch類基于AQS實作了并發控制,他們是CountDownLatch和LimitLatch,UML類圖如下:
面試官:談一談java中基于AQS的并發鎖原理
面試官:談一談java中基于AQS的并發鎖原理

面試官:談一下AQS是怎麼擷取鎖的?

我:首先,AbstractQueuedSynchronizer是一個基于FIFO的隊列實作的并發控制,隊列中的元素通過操作共享資源state來擷取和釋放鎖,state是一個volatile修飾的int類型變量。我以ReentrantLock中獨占鎖為例,如果有一個線程來擷取鎖,這時如果隊列中沒有元素,那就把這個線程加入隊列,同時線程申請的數量加入到state變量。如果隊列中已經有元素,這個線程入隊尾,之後線程中斷等待隊列前一個元素釋放鎖後喚醒。

下面的流程是一個擷取鎖的流程,如果下面的流程傳回false,則把目前線程加入到等待隊列。

面試官:談一談java中基于AQS的并發鎖原理

面試官:讀過這部分的源代碼嗎,能不能講一下?

我:看一下流程中的源碼

final void lock() {
    acquire(1);//擷取1個資源
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//擷取鎖失敗,入隊
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //是否有等待時間更長的元素
            compareAndSetState(0, acquires)) {//自旋鎖
            setExclusiveOwnerThread(current);//設定目前線程為獨占線程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//目前隊列中是否有等待時間更長的元素
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
下面的代碼是加入到等待隊列的過程
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {//前置節點是head并且擷取成功
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {//注意:如果前置節點不通知,繼續往前查找,找到一個可以通知的節點作為目前節點的node節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {//把前置節點的狀态設定為通知狀态-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //把目前節點加入到隊尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { //隊列為空,初始化隊列,head=tail=node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//隊列非空,放到隊尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}      

 這兒要注意:隊列上的元素是Node對象,node中定義了waitStatus變量,有4個狀态,

static final int CANCELLED =  1;//目前線程已經取消鎖等待
static final int SIGNAL    = -1;//下一個節點需要被喚醒
static final int CONDITION = -2;//目前線程正在等待condition,這個狀态需要跟condition配合使用
static final int PROPAGATE = -3;//釋放鎖的時候使用,這個狀态隻給頭節點,并且要不斷傳播下去      

面試官:再說一下釋放鎖的過程?

我:鎖的釋放過程比較簡單,還是以ReentrantLock為例。首先嘗試釋放鎖(state變量中減去1),把目前鎖的擁有者置空,通知隊列中下一個節點。整個流程入下:

面試官:談一談java中基于AQS的并發鎖原理

面試官:這部分的源代碼能不能講一下?

我:以ReentrantLock為例,源碼入下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {//嘗試釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//通知連結清單中下個等待節點
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())//鎖的目前擁有者不是目前線程,抛出異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);//設定目前鎖擁有者為空
    }
    setState(c);
    return free;
}
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
private void unparkSuccessor(Node node) {
	//把目前節點的等待狀态置為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	//如果隊列中下一個節點為空或者下一個節點等待狀态是取消狀态(1),則從隊尾開始查找,知道找到一個非空并且等待狀态小于0的節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)//為什麼從隊尾開始查找,這樣周遊整個隊列啊?但是如果目前節點的下一個是null,那就始終找不到下下個節點了,必須從隊尾找。
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//喚醒等待線程
        LockSupport.unpark(s.thread);
}      

面試官:上面你講的是獨占鎖,那AQS中的共享鎖怎麼用的?

我:首先共享鎖是指多個線程可以同時使用這個鎖,AQS中的使用是隻要不超過共享鎖允許的總數,都可以擷取到。在擷取讀鎖時,首先嘗試擷取共享鎖,如果擷取失敗,入隊後等待擷取。以ReentrantReadWriteLock為例,擷取共享鎖流程入下:

面試官:談一談java中基于AQS的并發鎖原理

從上面這個流程可以看到,如果擷取不到鎖,就會進入fullTryAcquireShared,這個方式是在死循環中不斷嘗試擷取到鎖,直到成功。

面試官:這部分的源代碼能介紹一下嗎?

我:源碼入下

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//先嘗試擷取,擷取失敗,則進入隊列等待擷取
        doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)//其他線程已經擷取到排它鎖
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() && //是否需要阻塞,在公平鎖中,如果隊列中已經有元素了那就傳回true;在非公平鎖中,隊列中第二個元素不是共享節點,傳回true
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {//cas失敗
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {//下面的方法前面都講過了
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//共享模式入隊
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {//擷取成功
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  //失敗後等待前置節點通知
                parkAndCheckInterrupt()) //睡眠後等待喚醒
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//不再擷取
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//自己成功頭結點進而喚醒
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();//喚醒下一個節點
        }
    }      

面試官:好的,共享鎖的釋放流程是怎樣的?

我:跟共享鎖的擷取流程一樣,先嘗試釋放(state變量中減去1),成功後喚醒隊列中下一個等待線程

這部分代碼如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//目前線程是第一個擷取到鎖的線程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;//釋放一個count數減1
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))//CAS修改state值
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//目前節點waiteStatus置為0
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//喚醒下一個節點
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//頭節點waiteStatus置為-3
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}      

面試官:前面你提到了公平鎖和非公平鎖,這2個有什麼差別呢?

我:一般情況下,公平鎖是指目前線程在釋放鎖的時候,會通知一個等待隊列中等待時間最長的線程,而非公平鎖,目前線程釋放鎖的時候,會随機通知一個線程。非公平鎖代碼如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}      

在ReentrantLock中,公平鎖和非公平鎖的不同就是非公平鎖會直接嘗試compareAndSetState(0, 1),失敗後才走擷取鎖流程。而公平鎖直接走擷取鎖流程。

在ReentrantReadWriteLock中,公平鎖隻要隊列中有其他線程占用鎖,讀寫鎖就需要阻塞,而非公平鎖中寫鎖不阻塞,讀鎖隻有隊列中第一個排隊等待線程使用獨占鎖時才阻塞,代碼如下

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();//隊列中第一個等待線程以獨占方式等待鎖
    }
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();//隊列中有元素正在使用鎖或者第一個等待的線程不是目前線程
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}      

面試官:好的,再談一談CountDownLatch的使用和原理?

我:上面的原理了解了之後,CountDownLatch的使用就非常簡單了。示例代碼如下:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);//将state置為3
        for (int i = 0; i < 3; i++){
            LocalThreadPool.run(new WorkerThread(latch));
        }
        System.out.println("latch test begin");
        latch.await();//擷取到共享鎖
        System.out.println("latch test end");
    }

    static class WorkerThread implements Runnable {
        private CountDownLatch latch;
        public WorkerThread(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " finished");
            latch.countDown();//将state值減1,最後一個線程将state減為0後,釋放上面await方法擷取到的共享鎖。
        }
    }
}      

上面輸出結果如下:

latch test begin
pool-1-thread-3 finished
pool-1-thread-2 finished
pool-1-thread-1 finished
latch test end      

初始化的時候,批量将state值設定為線程數量,然後通過await擷取共享鎖。每個線程執行完成後調用釋放鎖的方法将state減1,最後一個線程将state減為0後傳回true,這時CountDownLatch就會釋放掉共享鎖。看下面源代碼

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//state減為0才會執行這個邏輯
        doReleaseShared();//釋放await()方法擷取到的共享鎖
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;//減為0時傳回true
    }
}      

面試官:那CountDownLatch在await方法中加入參數是怎麼實作等待逾時的呢?

我:這個就是在一個死循環中不斷擷取鎖,直到逾時,這個是AQS本身就支援的邏輯,代碼如下:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//設定申請鎖失敗時間
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {//逾時,取消擷取鎖
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}      

面試官:那CountDownLatch設定等待逾時時間,有什麼好處呢?

我:這個主要作用是設定主線程等待時間,以免長期阻塞主線程。從上面源代碼看出這個并不影響任務線程的執行,不過如果等待任務執行線程執行完成後再做一些日志或者通知,就會失敗,因為逾時後直接就會調用這些日志或通知,不一定真的所有任務都完成了。

比如下面的代碼:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        for (int i = 0; i < 3; i++){
            LocalThreadPool.run(new WorkerThread(latch));
        }
        System.out.println("latch test begin");
        latch.await(1, TimeUnit.SECONDS);
        System.out.println("latch test end");
    }

    static class WorkerThread implements Runnable {
        private CountDownLatch latch;
        public WorkerThread(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " finished");
            latch.countDown();
        }
    }
}      

輸出結果是

latch test begin
latch test end
pool-1-thread-2 finished
pool-1-thread-1 finished
pool-1-thread-3 finished      

面試官:再聊一聊Semaphore的使用?

我:Semaphore的使用也是基于AQS的,它更像一個限流器,初始化是給state指派,每次執行任務時Semaphore擷取共享鎖并且将state值減1。如果state值小于0則入隊等待。任務執行完成後,Semaphore釋放鎖,首先state值加1,如果state小于0,則通知隊列中下一個等待線程。我們可以用Semaphore實作一個資料庫連接配接池,代碼如下:

class ConnectionPool {
    final List<Connection> pool = new ArrayList<>(10);//這兒沒有考慮線程安全
    final Semaphore sem = new Semaphore(10);//将state設定為10
    //構造函數初始化連接配接池
    ConnectionPool(int size) {
        for (int i = 0; i < size; i++) {
            pool.add(new Connection() {
                //省略實作代碼
            });
        }
    }
    //執行任務過程
    void doExecution() throws InterruptedException {
        Connection connection = null;
        sem.acquire();//擷取可中斷共享鎖,如果state小于1,則進入鎖等待隊列
        try {
            connection = pool.remove(0);//從連接配接池取連接配接
            connection.commit();//執行事務
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            pool.add(connection);//用完後歸還連接配接池
            sem.release();//釋放鎖,将state值加1,如果state小于0,去等待隊列喚醒下一個等待線程
        }
    }
}      

面試官:再聊一聊Condition的使用?

我:Condition類的作用是可以實作wait-notify機制,比synchronized靈活,可以在一個Lock上建立多個Condition,線程選擇注冊不同的Condition進項排程。假設我們現在有一個需求,2個線程輪流列印出1到10的數字,這個時候我們可以用信号量來做,代碼如下:

public class ConditionTest {
    private ReentrantLock lock = new ReentrantLock();
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();
    private int count = 0;
    public String printThreadA() throws InterruptedException {
        while (count < 10){
            lock.lock();
            System.out.println(Thread.currentThread().getName() + ":" +  (++count));
            conditionB.signal();//列印後喚醒B線程列印
            conditionA.await();//列印後等待B線程喚醒
            lock.unlock();
        }
        return null;
    }

    public String printThreadB() throws InterruptedException {
        while (count < 10){
            lock.lock();
            System.out.println(Thread.currentThread().getName() + ":" + (++count));
            conditionA.signal();//列印後喚醒A線程列印
            conditionB.await();//列印後等待A線程喚醒
            lock.unlock();
        }
        return null;
    }

    public void printTenNumber(){
        LocalThreadPool.call(() -> printThreadA());
        LocalThreadPool.call(() -> printThreadB());
    }

    public static void main(String[] args){
        ConditionTest conditionTest = new ConditionTest();
        conditionTest.printTenNumber();
    }
}      

上面signal()方法具體實作為将目前線程從condition等待隊列轉入鎖等待隊列隊尾等待前一個節點喚醒,await()方法的具體實作為在condition等待隊列隊尾新加一個等待者,釋放鎖并且喚醒下一個等待線程。是以,condition的await和signal本質上是目前線程在condition等待隊列和鎖等待隊列直接的轉移。

代碼實作如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
        //waitStatus置為初始化
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
		//放入鎖等待隊列隊尾并且把前置節點置為SIGNAL,以通知目前線程
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();//等待隊列隊尾新加一個等待者
    int savedState = fullyRelease(node);//釋放鎖并且喚醒下一個等待線程
    //删除部分代碼
}      

面試官:最後一個問題,如果我們自己實作一個AQS的類,需要實作哪些方法呢?

我:AQS類中以下5個方法沒有實作,都是throw new UnsupportedOperationException(),這些方法留給自定義子類來實作。

protected boolean tryAcquire(int arg):獨占方式擷取鎖
protected boolean tryRelease(int arg):獨占方式釋放鎖
protected int tryAcquireShared(int arg):共享方式擷取鎖
protected boolean tryReleaseShared(int arg):共享方式釋放鎖
isHeldExclusively():判斷目前線程是否正在獨占鎖      

面試官:恭喜你,通過了。