天天看點

關于并發程式設計與線程安全的思考與實踐

作者:京東健康 張娜

一、并發程式設計的意義與挑戰

并發程式設計的意義是充分的利用處理器的每一個核,以達到最高的處理性能,可以讓程式運作的更快。而處理器也為了提高計算速率,作出了一系列優化,比如:

1、硬體更新:為平衡CPU 内高速存儲器和記憶體之間數量級的速率差,提升整體性能,引入了多級高速緩存的傳統硬體記憶體架構來解決,帶來的問題是,資料同時存在于高速緩存和主記憶體中,需要解決緩存一緻性問題。

2、處理器優化:主要包含,編譯器重排序、指令級重排序、記憶體系統重排序。通過單線程語義、指令級并行重疊執行、緩存區加載存儲3種級别的重排序,減少執行指令,進而提高整體運作速度。帶來的問題是,多線程環境裡,編譯器和CPU指令無法識别多個線程之間存在的資料依賴性,影響程式執行結果。

并發程式設計的好處是巨大的,然而要編寫一個線程安全并且執行高效的代碼,需要管理可變共享狀态的操作通路,考慮記憶體一緻性、處理器優化、指令重排序問題。比如我們使用多線程對同一個對象的值進行操作時會出現值被更改、值不同步的情況,得到的結果和理論值可能會天差地别,此時該對象就不是線程安全的。而當多個線程通路某個資料時,不管運作時環境采用何種排程方式或者這些線程如何交替執行,這個計算邏輯始終都表現出正确的行為,那麼稱這個對象是線程安全的。是以如何在并發程式設計中保證線程安全是一個容易忽略的問題,也是一個不小的挑戰。

是以,為什麼會有線程安全的問題,首先要明白兩個關鍵問題:

1、線程之間是如何通信的,即線程之間以何種機制來交換資訊。

2、線程之間是如何同步的,即程式如何控制不同線程間的發生順序。

二、Java并發程式設計

Java并發采用了共享記憶體模型,Java線程之間的通信總是隐式進行的,整個通信過程對程式員完全透明。

2.1 Java記憶體模型

為了平衡程式員對記憶體可見性盡可能高(對編譯器和處理的限制就多)和提高計算性能(盡可能少限制編譯器處理器)之間的關系,JAVA定義了Java記憶體模型(Java Memory Model,JMM),約定隻要不改變程式執行結果,編譯器和處理器怎麼優化都行。是以,JMM主要解決的問題是,通過制定線程間通信規範,提供記憶體可見性保證。

JMM結構如下圖所示:

關于并發程式設計與線程安全的思考與實踐

以此看來,線程内建立的局部變量、方法定義參數等隻線上程内使用不會有并發問題,對于共享變量,JMM規定了一個線程如何和何時可以看到由其他線程修改過後的共享變量的值,以及在必須時如何同步的通路共享變量。

為控制工作記憶體和主記憶體的互動,定義了以下規範:

•所有的變量都存儲在主記憶體(Main Memory)中。

•每個線程都有一個私有的本地記憶體(Local Memory),本地記憶體中存儲了該線程以讀/寫共享變量的拷貝副本。

•線程對變量的所有操作都必須在本地記憶體中進行,而不能直接讀寫主記憶體。

•不同的線程之間無法直接通路對方本地記憶體中的變量。

具體實作上定義了八種操作:

1.lock:作用于主記憶體,把變量辨別為線程獨占狀态。

2.unlock:作用于主記憶體,解除獨占狀态。

3.read:作用主記憶體,把一個變量的值從主記憶體傳輸到線程的工作記憶體。

4.load:作用于工作記憶體,把read操作傳過來的變量值放入工作記憶體的變量副本中。

5.use:作用工作記憶體,把工作記憶體當中的一個變量值傳給執行引擎。

6.assign:作用工作記憶體,把一個從執行引擎接收到的值指派給工作記憶體的變量。

7.store:作用于工作記憶體的變量,把工作記憶體的一個變量的值傳送到主記憶體中。

8.write:作用于主記憶體的變量,把store操作傳來的變量的值放入主記憶體的變量中。

這些操作都滿足以下原則:

•不允許read和load、store和write操作之一單獨出現。

•對一個變量執行unlock操作之前,必須先把此變量同步到主記憶體中(執行store和write操作)。

2.2 Java中的并發關鍵字

Java基于以上規則提供了volatile、synchronized等關鍵字來保證線程安全,基本原理是從限制處理器優化和使用記憶體屏障兩方面解決并發問題。如果是變量級别,使用volatile聲明任何類型變量,同基本資料類型變量、引用類型變量一樣具備原子性;如果應用場景需要一個更大範圍的原子性保證,需要使用同步塊技術。Java記憶體模型提供了lock和unlock操作來滿足這種需求。虛拟機提供了位元組碼指令monitorenter和monitorexist來隐式地使用這兩個操作,這兩個位元組碼指令反映到Java代碼中就是同步塊-synchronized關鍵字。

這兩個字的作用:volatile僅保證對單個volatile變量的讀/寫具有原子性,而鎖的互斥執行的特性可以確定整個臨界區代碼的執行具有原子性。在功能上,鎖比volatile更強大,在可伸縮性和執行性能上,volatile更有優勢。

2.3 Java中的并發容器與工具類

2.3.1 CopyOnWriteArrayList

CopyOnWriteArrayList在操作元素時會加可重入鎖,一次來保證寫操作是線程安全的,但是每次添加删除元素就需要複制一份新數組,對空間有較大的浪費。

public E get(int index) {
        return get(getArray(), index);
    }

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }           

2.3.2 Collections.synchronizedList(new ArrayList<>());

這種方式是在 List的操作外包加了一層synchronize同步控制。需要注意的是在周遊List是還得再手動做整體的同步控制。

public void add(int index, E element) {
        // SynchronizedList 就是在 List的操作外包加了一層synchronize同步控制
        synchronized (mutex) {list.add(index, element);}
    }
    public E remove(int index) {
        synchronized (mutex) {return list.remove(index);}
    }           

2.3.3 ConcurrentLinkedQueue

通過循環CAS操作非阻塞的給隊列添加節點,

public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p是尾節點,CAS 将p的next指向newNode.
                if (p.casNext(null, newNode)) {
                    if (p != t) 
                        //tail指向真正尾節點
                        casTail(t, newNode);
                    return true;
                }
            }
            else if (p == q)
                // 說明p節點和p的next節點都等于空,表示這個隊列剛初始化,正準備添加節點,是以傳回head節點
                p = (t != (t = tail)) ? t : head;
            else
                // 向後查找尾節點
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }           

三、線上案例

3.1 問題發現

在網際網路醫院醫生端,醫生打開問診IM聊天頁,需要加載幾十個功能按鈕。在2022年12月抗疫期間,QPS全天都很高,高峰時是平日的12倍,偶現報警提示按鈕顯示不全,問題出現機率大概在百萬分之一。

3.2 排查問題的詳細過程

醫生問診IM頁面的加載屬于業務黃金流程,上面的每一個按鈕就是一個業務線的入口,是以處在核心邏輯的上的報警均使用自定義報警,該類報警不設定收斂,無論何種異常包括按鈕個數異常就會立即報警。

1. 根據報警資訊,開始排查,卻發現以下問題:

(1)沒有異常日志:順着異常日志的logId排查,過程中竟然沒有異常日志,按鈕莫名其妙的變少了。

(2)不能複現:在預發環境,使用相同入參,接口正常傳回,無法複現。

2. 代碼分析,縮小異常範圍:

醫生問診IM按鈕處理分組進行:

// 多個線程結果集合
    List<DoctorDiagImButtonInfoDTO> multiButtonList = new ArrayList<>();           
// 多線程并行處理
    Future<List<DoctorDiagImButtonInfoDTO>> multiButtonFuture = joyThreadPoolTaskExecutor.submit(() -> {
        List<DoctorDiagImButtonInfoDTO> multiButtonListTemp = new ArrayList<>();
        buttonTypes.forEach(buttonType -> {
            multiButtonListTemp.add(appButtonInfoMap.get(buttonType));
        });
        multiButtonList.addAll(multiButtonListTemp);
        return multiButtonListTemp;
    });           

3. 增加日志線上觀察

由于并發場景容易引發子線程失敗的情況,對各子線程分支增加必要節點日志上線後觀察:

(1)發生異常的請求處理過程中,所有子線程正常處理完成

(2)按鈕缺少個數随機等于子線程中處理的按鈕個數

(3)初步判斷是ArrayList并發addAll操作異常

4. 模拟複現

使用ArrayList源碼模拟複現問題:

(1)ArrayList源碼分析:

public boolean addAll(Collection<? extends E> c) {
         Object[] a = c.toArray();
         int numNew = a.length;
         ensureCapacityInternal(size + numNew); // Increments modCount
 
         //以目前size為起點,向數組中追加本次新增對象
         System.arraycopy(a, 0, elementData, size, numNew);
 
         //更新全局變量size的值,和上一步是非原子操作,引發并發問題的根源
         size += numNew;
         return numNew != 0;
     }
 
     private void ensureCapacityInternal(int minCapacity) {
         if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
             minCapacity = Math.max(DEFAULT_CAPACITY, minCapacity);
         }
 
         ensureExplicitCapacity(minCapacity);
     }
 
     private void ensureExplicitCapacity(int minCapacity) {
         modCount++;
 
         // overflow-conscious code
         if (minCapacity - elementData.length > 0)
             grow(minCapacity);
     }
 
     private void grow(int minCapacity) {
         // overflow-conscious code
         int oldCapacity = elementData.length;
         int newCapacity = oldCapacity + (oldCapacity >> 1);
         if (newCapacity - minCapacity < 0)
             newCapacity = minCapacity;
         if (newCapacity - MAX_ARRAY_SIZE > 0)
             newCapacity = hugeCapacity(minCapacity);
         // minCapacity is usually close to size, so this is a win:
         elementData = Arrays.copyOf(elementData, newCapacity);
     }
            

(2) 理論分析

在ArrayList的add操作中,變更size和增加資料操作,不是原子操作。

關于并發程式設計與線程安全的思考與實踐



(3)問題複現

複制源碼建立自定義類,為友善複現并發問題,增加停頓

public boolean addAll(Collection<? extends E> c) {
         Object[] a = c.toArray();
         int numNew = a.length;
         //第1次停頓,擷取目前size
         try {
             Thread.sleep(1000*timeout1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         ensureCapacityInternal(size + numNew); // Increments modCount
 
         //第2次停頓,等待copy
         try {
             Thread.sleep(1000*timeout2);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         System.arraycopy(a, 0, elementData, size, numNew);
 
         //第3次停頓,等待size+=
         try {
             Thread.sleep(1000*timeout3);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         size += numNew;
         return numNew != 0;
     }           
關于并發程式設計與線程安全的思考與實踐



3.3 解決問題

使用線程安全工具 Collections.synchronizedList 建立 ArrayList :

List<DoctorDiagImButtonInfoDTO> multiButtonList = Collections.synchronizedList(new ArrayList<>());            

上線觀察後正常。

3.4 總結反思

使用多線程處理問題已經變得很普遍,但是對于多線程共同操作的對象必須使用線程安全的類。

另外,還要搞清楚幾個靈魂問題:

(1)JMM的靈魂:Happens-before 原則

(2)并發工具類的靈魂:volatile變量的讀/寫 和 CAS

繼續閱讀