天天看點

Java多線程安全

多線程能夠提高CPU的使用效率,為使用者提供宏觀上同時執行多個動作的效果,強大的功能背後也存線上程安全性問題。多個線程同時操作共享變量導緻程式輸出結果與設計者的初衷不符,程式設計過程中可以使用以下幾種方式實作線程安全。

鎖同步機制

在多線程中很常見的問題就是生産者與消費者問題,生産者負責生産對象并且将對象放到産品隊列中,消費者負責從産品隊列中取出并消費對象;産品隊列是一個容量大小有限制的容器,如果生産出來的産品堆滿了産品隊列,生産者就需要等待有消費者取出産品,産品隊列有空位後再放入新産品;如果産品隊列裡沒有産品對象,消費者同樣要等待生産者産生新産品加入隊列。

生産者和消費者通常都被抽象成為線程對象,此時産品隊列就是一個共享的變量,要保證多個生産者消費者線程能夠正确使用産品隊列就需要做互斥操作,也就是說在有某個生産者或消費者線程操作産品隊列的時候,其他的線程都不能操作産品隊列都要等待;同時還要做好産品隊列的通知操作,也就是當有生産者放入新産品的時候要通知消費者可以繼續取産品,當有消費者取出産品後也要通知生産者産品隊列有了空位可以添加新産品。

public class SynchroinzedTest {
	
	public static void main(String[] args) {
		ProductQueue queue = new ProductQueue();
		new ProducerThread(queue).start();
		new ProducerThread(queue).start();
		new ProducerThread(queue).start();

		new ConsumerThread(queue).start();
		new ConsumerThread(queue).start();
		new ConsumerThread(queue).start();
	}
	
	static class ProductQueue {
		LinkedList<Object> list = new LinkedList<>();
		
		public synchronized void add(Object product) {
			while (list.size() >= 10) {
				ThreadUtils.wait(this);
			}
			list.addFirst(product);
			notify();
		}
		
		public synchronized Object remove() {
			while (list.isEmpty()) {
				ThreadUtils.wait(this);
			}
			notify();
			return list.removeLast();
		}
	}
	
	static class ProducerThread extends Thread {
		private ProductQueue queue;
		
		
		public ProducerThread(ProductQueue queue) {
			super();
			this.queue = queue;
		}

		@Override
		public void run() {
			while (true) {
				Object obj = new Object();
				System.out.println(ThreadUtils.getName() + ": Produce one object");
				queue.add(obj);
				ThreadUtils.sleep(500);
			}
		}
	}
	
	static class ConsumerThread extends Thread {
		private ProductQueue queue;
		
		public ConsumerThread(ProductQueue queue) {
			super();
			this.queue = queue;
		}

		@Override
		public void run() {
			while (true) {
				Object obj = queue.remove();
				System.out.println(ThreadUtils.getName() + ": Consume one object");
				ThreadUtils.sleep(2000);
			}
		}
	}
}	
           

ProductQueue就代表了産品隊列對象,add()方法會向内部的連結清單頭部添加新産品,remove()方法則負責從連結清單尾部删除産品并傳回。這兩個方法都是用了synchronized修飾,它們共同使用目前ProductQueue.this對象作為鎖對象,如果有多個生産者或者消費者線程來通路産品隊列,由于鎖對象的存在每次隻有目前擷取到this鎖的線程可以繼續執行,其他的線程需要繼續等待鎖對象釋放在參與鎖競争。

在add()和remove()方法内部還使用while循環來調用wait()方法使目前線程等待,前面介紹Thread接口的時候提到sleep()和wait()方法都可以讓目前線程暫停,它們的差別又在哪裡呢?sleep()方法會暫停目前線程但是依然會保持擷取的鎖對象,喚醒之後不需要再重新擷取鎖對象;而wait()方法在暫停線程後會釋放鎖對象,等到重新被喚醒再競争鎖對象。在add()方法中由于産品隊列已經被填滿,需要等待有消費者消費後空出新空間,而消費如果需要消費産品一定要在調用remove()方法時擷取鎖對象,是以這裡就使用wait()方法來暫停線程并釋放鎖對象,進而讓remove()方法有機會獲得鎖對象。

wait()/notify()方法都是繼承自Object類的方法,也就是說Java中的對象都可以作為鎖對象存在,而且能夠支援鎖同步操作。在調用wait()方法為何外面要用while循環來檢測等待條件呢,直接使用if判斷來檢測可不可以呢?檢視Object.wait()方法的注釋會發現下面這段話:

As in the one argument version, interrupts and spurious wakeups are possible, 
and this method should always be used in a loop:
synchronized (obj) {
while (condition does not hold)
 obj.wait();
 	// Perform action appropriate to condition
 }
           

這段注釋的意思是中斷操作和僞喚醒操作都會導緻wait()方法傳回,而這兩種情況下可能等待條件并沒有被破壞,因而在wait()傳回之後還要重新判斷目前的等待條件是否滿足。

ProductQueue的add() 和remove()方法最後都會調用notify()方法來喚醒一個線程繼續執行,Object中其實還有一個notifyAll()的方法,用來喚醒所有等待在鎖對象上的線程。生産者和消費者問題裡由于一次隻能生産或者消費一個産品,後續隻需要喚醒一個生産者或者消費者線程,即使把所有的線程都喚醒也隻有一個能夠執行,這裡就隻調用了notify()喚醒一個線程。

鎖同步機制能夠很好的保證多線程操作的安全性,但是鎖機制引入了鎖對象,鎖競争機制,線程等待隊列等等,這些都需要消耗記憶體和CPU資源,特别是在切換線程後由于沒有獲得鎖對象浪費了系統配置設定給線程的時間片資源,因而鎖同步機制是一種相對昂貴的線程安全機制。

不可變對象

共享變量在多線程操作會出現語義錯誤問題很大部分是因為共同修改共享變量導緻的,如果共享變量是不可變的,沒有線程能夠修改它的值,所有的線程都隻是讀取它的值,那麼就不會存線上程不安全問題。在Java中有不少類就被設計為不可變對象,它們可以安全的在對個線程之間共享。比如String類對象就是不可以修改的,隻要對String做修改就會産生一個新的String對象。

String text = “abc1234”;

String tmp = text;

text = text.replace(“abc”, “xyz”);

System.out.println(text == tmp); // false

如果能夠确定共享變量目前并不需要修改,可以将共享變量做成不可變對象,所有的線程都隻是共享的讀取它的内容,這樣不需要鎖同步機制就保證了共享變量的多線程安全。

線程本地量

前面讨論的都是共享變量導緻的線程不安全性,如果不共享變量那是不是就不存線上程不安全問題了呢,不共享的變量隻會在目前線程裡操作,相當于單線程操作,自然也就不存在問題了。Java當中為開發者提供了ThreadLocal線程本地量的機制,當修改線程本地量的時候實際上操作的隻是綁定在目前線程的變量,其他線程的變量依然保持不變。

public class ThreadLocalTest {
	private static ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
		@Override
		protected Integer initialValue() {
			return 0;
		}
	};
	public static void main(String[] args) {
		for (int i = 0; i < 50; i++) {
			new MyThread().start();
		}
	}
	
	static class MyThread extends Thread {
		@Override
		public void run() {
			for (int i = 0; i < 200; i++) {
				int x = threadLocal.get();
				x++;
				threadLocal.set(x);
			}
			
			System.out.println(threadLocal.get());
		}
	}
}
           

上面的代碼定義了共享變量threadLocal,它會在多個線程中被擷取被執行,但是最終每個線程列印出來的結果都是200,并沒有發生多線程語義問題。ThreadLocal内部是如何實作的呢,檢視JDK的Thread實作代碼會發現它擁有一個ThreadLocal.ThreadMap的屬性變量。

public class Thread implements Runnable {
    ThreadLocal.ThreadLocalMap threadLocals = null;
}
           

threadLocals成員并不是在Thread類中被建立的,而是在ThreadLocal類中被建立的,當使用者第一次從ThreadLocal對象中擷取本地變量的時候會檢視目前Thread内部是否已經建立了ThreadLocalMap對象,如果沒建立就會new ThreadLocalMap()并指派給Thread.threadLocals屬性。

// ThreadLocal的源代碼
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // 擷取目前Thread.threadLocals屬性對象
     if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
         if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
   }
   return setInitialValue(); // 如果Thread.threadLocals屬性為空,初始化
}

private T setInitialValue() {
    T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); //  擷取目前Thread.threadLocals屬性對象
    if (map != null)
        map.set(this, value);
    else
       createMap(t, value); // 建立ThreadLocallMap對象
    return value;
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
           

ThreadLocalMap類是一個類似于HashMap的資料結構,但是它支援的操作比HashMap要少,可以認為是一個精簡版的HashMap。ThreadLocalMap内部會包含一個ThreadLocalMap.Entry類型的數組,ThreadLocalMap.Entry類包含兩個屬性,ThreadLocal類型的key鍵屬性,Object類型的value值屬性。這樣當ThreadLocal線程本地量對象擷取屬性的時候就從目前所線上程Thread.threadLocals的Map對象中拿出鍵為ThreadLocal對象的對應的值。

線程封閉

共享對象由于在多個線程之間操作容易出現語義問題,如果共享變量隻在一個線程之中操作,其他線程想要操作共享變量都委托給這個單獨的線程來執行,這種處理方式就稱作線程封閉。在Android系統中最常見的線程封閉執行個體就是主線程UI操作,Android規定所有的UI界面更新操作必須要全部都要在主線程中進行,在子線程中操作UI對象會抛出異常,屬于非法操作。

其實仔細考慮就會發現UI對象也是一種共享變量,它裡面的内容和普通的變量一樣都是可以被修改的,如果多個線程同時修改UI對象的内容,為了避免前面出現的語義性錯誤必須要使用鎖同步機制確定正确的更新操作;但是Android系統規定了所有的UI操作都必須在UI線程中執行,其他的線程想通路UI對象都會委托UI線程來執行,這就是Android UI對象的線程封閉。

在Android系統中有一個封裝了Looper的線程類HandlerThread,它在啟動的時候會在内部開啟一個消息循環隊列,其他線程向消息隊列投遞消息,所有的消息處理都可以在在HandlerThread線程裡執行,現在就用該類負責共享資料的遞增操作,不需要任何鎖同步機制就能保證線程安全性。

public class IncrementThread extends HandlerThread {
    private static final String TAG = "IncrementThread";

    public IncrementThread() {
        super("IncrementThread");
    }

    private int count = 0;

    public void increment() {
        for (int i = 0; i < 200; i++) {
            count++;
        }
        print();
    }

    public void print() {
        Log.e(TAG, "count = " + count);
    }
}
           

上面定義了IncrementThread繼承自HandlerThread,它的内部有一個count變量,這個變量隻有IncrementThread變量可以通路修改,其他線程如果想要修改它必須向IncrementThread線程發起修改請求。Android中的Handler可以綁定不同的線程的Looper對象,這樣Handler就能夠向不同的線程發送消息,并且Handler.handleMessage()方法會在開啟Looper對象的線程執行。

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    private IncrementThread mIncrementThread;
    private Handler mIncrementHandler;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        mIncrementThread = new IncrementThread();
        mIncrementThread.start();
        mIncrementHandler = new Handler(mIncrementThread.getLooper()) {
            @Override
            public void handleMessage(Message msg) {
                if (msg.what == 0) {
                    mIncrementThread.increment();
                }
            }
        };
    }

    class UserThread extends Thread {
        @Override
        public void run() {
            mIncrementHandler.sendEmptyMessage(0);
        }
    }

    public void startCount(View view) {
        for (int i = 0; i < 100; i++) {
            new UserThread().start();
        }
    }
}
           

MainActivity 當中會有startCount()方法,使用者點選按鈕後會觸發,在這裡會建立100個UserThread線程,每個UserThread都會想IncrementThread發起一次增加count變量的請求,多次測試會發現count每次增長都是20000,沒有出現增長值丢失的情況。

繼續閱讀