天天看點

多線程高并發程式設計(1) -- 基礎及詳解

多線程高并發程式設計

背景:

  程序和線程的差別:

多線程高并發程式設計(1) -- 基礎及詳解

  程序的記憶體大小為:堆記憶體+線程數量*棧記憶體,即線程數量 =( 最大位址空間[MaxProcessMemory] - JVM堆記憶體 - 系統保留記憶體[ReservedOsMemory] )/ ThreadStackSize(XSS),從中可以看出,線程的數量随棧記憶體的增多而減少。

  線程是程式執行的一個路徑,每一個線程都有自己的局部變量表、程式計數器(指向正在執行的指令指針)以及各自的生命周期。當啟動了一個Java虛拟機(JVM)時,從作業系統開始就會建立一個新的程序(JVM程序),JVM程序将會派生或者建立很多線程。

  • 一個線程的建立肯定是由另一個線程完成的;
  • 被建立線程的父線程是建立它的線程;

  線程會帶來額外的開銷,如CPU排程時間、并發控制開銷等;每個線程在自己的工作記憶體互動,加載和存儲主記憶體控制不當會造成資料不一緻。

一.線程建立方式:

  • 構造Thread類:實作線程的執行單元run有兩種方式,分别是下面
    • 繼承Thread,重寫run方法:Thread實作了Runnable接口,使用start開啟線程,start開啟後線程會加入排程器,然後調用run方法,start會調用start0本地方法跟OS進行互動運作;下面是start源碼解析
      /**
       * Causes this thread to begin execution; the Java Virtual Machine
       * calls the <code>run</code> method of this thread.
       * 開啟線程,JVM會調用run方法【start使用了模闆方法】
       * <p>
       * It is never legal to start a thread more than once.
       * 不能兩次啟動線程,否則報IllegalThreadStateException異常
       * In particular, a thread may not be restarted once it has completed
       * execution.
       * 一個線程生命周期結束,也就是到了TERMINATED狀态,再次調用start方法是不允許的,
       * 也就是TERMINATED狀态沒法回到RUNNABLE/RUNNING狀态。
       *
       * @exception  IllegalThreadStateException  if the thread was already
       *               started.
       * @see        #run()
       * @see        #stop()
       */
      public synchronized void start() {//線程安全的
          /**
           * This method is not invoked for the main method thread or "system"
           * group threads created/set up by the VM. Any new functionality added
           * to this method in the future may have to also be added to the VM.
           * 這個方法不會被主線程調用或通過虛拟機系統線程組建立起來。未來任何添加到該方法裡的新功能可能需要加入到虛拟機中
           *
           * A zero status value corresponds to state "NEW".
           * 線程被構造後的new狀态,threadStatus的屬性值是0
           */
          if (threadStatus != 0)
              throw new IllegalThreadStateException();
      
          /* Notify the group that this thread is about to be started
           * so that it can be added to the group\'s list of threads
           * and the group\'s unstarted count can be decremented. 
           * 通知線程組新線程将要啟動,以便它可以添加到線程組清單并且線程組沒有開始計數*/
          group.add(this);//加入線程組
      
          boolean started = false;
          try {
              start0();//調用本地方法
              started = true;
          } finally {
              try {
                  if (!started) {//啟動失敗
                      group.threadStartFailed(this);//線程啟動失敗,從組中移除該線程
                  }
              } catch (Throwable ignore) {
                  /* do nothing. If start0 threw a Throwable then
                    it will be passed up the call stack */
              }
          }
      }
      
            
      void add(Thread t) {
          synchronized (this) {
              if (destroyed) {//線程組狀态校驗
                  throw new IllegalThreadStateException();
              }
              if (threads == null) {
                  threads = new Thread[4];//初始化長度為4的線程組
              } else if (nthreads == threads.length) {
                  threads = Arrays.copyOf(threads, nthreads * 2);//數組滿了就擴容2倍
              }
              threads[nthreads] = t;//目前線程添加到線程組中
      
              // This is done last so it doesn\'t matter in case the
              // thread is killed
              nthreads++;//線程數+1
      
              // The thread is now a fully fledged member of the group, even
              // though it may, or may not, have been started yet. It will prevent
              // the group from being destroyed so the unstarted Threads count is
              // decremented.
              nUnstartedThreads--;//未啟動線程數-1
          }
      }      
      private native void start0();//本地方法調用重寫的run方法
            
      void threadStartFailed(Thread t) {
          synchronized(this) {
              remove(t);//移除目前線程
              nUnstartedThreads++;//沒有啟動的線程數量+1
          }
      }
      
      //=======================測試============================      
      Thread t = new Thread(){
        @Override
        public void run(){
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
      };
      t.start();
      t.start();//不能兩次啟動,第二次啟動是不允許的,報IllegalThreadStateException,此時該線程是處于運作狀态
      //=======================測試==============================
      Thread t = new Thread(){
        @Override
        public void run(){
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
      };
      t.start();
      TimeUnit.SECONDS.sleep(10);//設定休眠時間,上面的線程的生命周期已經終止,下面再次啟動報IllegalThreadStateException
      t.start();      
    • 實作Runnable接口,重寫run方法并且将Runnable執行個體用作構造Thread的參數【單繼承有局限性,推薦使用接口】:将線程的控制(start)和業務邏輯(run)的運作徹底分離開來,使用的是政策模式;Thread的run方法是不能共享的,但Runnbale的run方法可以共享,使用同一個Runnable的執行個體構造不同的Thread執行個體;把實作類對象(實作Runnable接口的類的執行個體化)放入代理類對象(Thread構造方法)中,使用的是代理模式;下面是靜态代理的代碼解釋:
      public class StaticProxy {
          public static void main(String[] args) {
              new Weeding(new Me()).happyMarry();
      //        new Thread(對象).start();類似
          }
      }
      
      interface Marry {
          void happyMarry();
      }
      //真實角色
      class Me implements Marry {
          @Override
          public void happyMarry() {
              System.out.println("me will marry!");
          }
      }
      //代理對象
      class Weeding implements Marry{
          //真實角色
          private Marry marry;
          public Weeding(Marry marry){
              this.marry=marry;
          }
          @Override
          public void happyMarry() {
              System.out.println("start");
              marry.happyMarry();
              System.out.println("end");
          }
      }      
  • 實作Callable接口,重寫call方法,Future擷取傳回值:Callable能接受一個泛型,然後在call方法中傳回一個指定類型的值;
    public interface Callable<V> {
        V call() throws Exception;
    }       
    //線程池隊列開啟線程,不會産生髒讀資料
    //使用步驟:
    //1.建立目标對象new
    //2.建立執行服務線程池
    //3.送出執行submit
    //4.擷取結構get
    //5.關閉服務shutdownNow
    public class MyThread implements Callable {
        private static int count = 20;
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            MyThread m1 = new MyThread();
            MyThread m2 = new MyThread();
            MyThread m3 = new MyThread();
            MyThread m4 = new MyThread();
            ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
            Future submit = service.submit(m1);
            Future submit1 = service.submit(m2);
            Future submit2 = service.submit(m3);
            Future submit3 = service.submit(m4);
            System.out.println(submit.get());
            System.out.println(submit1.get());
            System.out.println(submit2.get());
            System.out.println(submit3.get());
            service.shutdown();
    
        }
    
        @Override
        public Object call() throws Exception {
            count--;
            return count;
        }
    }      
  • 匿名内部類;
    new Thread(){//相當于繼承Thread的方式
                public void run(){
                    System.out.println("thread1 start ... ");
                }
            }.start();
    
    
            new Thread(new Runnable() {//相當于實作Runnable接口的方式
                @Override
                public void run() {
                    System.out.println("thread2 start .... ");
                }
            }).start();      
  • 定時器(Timer);
    Timer timer = new Timer();//建立時間器
            timer.schedule(new TimerTask() {//使用schedule,參數為定時器任務并重寫run方法
                @Override
                public void run() {
                    System.out.println("timer task is run");
                }
            }, 0, 1000);      
  • 線程池(内部使用隊列,是以加入線程池的線程是順序執行):使用execute和重寫Runnbale的run方法;
    ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2,
                    new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build());
            service.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("run test");
                }
            });      
  • lambda表達式;
    new Thread(()-> {
                for(int i = 1 ; i<10 ; i++){
                    System.out.println("It is a lambda function!");
                }
    
            }).start();      
  • Spring方式(@Async注解);
    @Test
        public void test() {
            run();
        }
        @Async
        public void run(){
            System.out.println("Async Test");
        }      

二.線程生命周期

  • new新生狀态:當用new建立一個Thread對象時,此時它并不處于執行狀态,因為沒有調用star啟動該線程,那麼線程的狀态為new狀态,也就是說,它隻是Thread對象的狀态,在沒有start之前,該線程是不存在的;
  • runnable就緒狀态:線程對象進入runnable就緒狀态必須調用start方法,那麼此時才是真正地在JVM程序中建立了一個線程;就緒狀态不會直接進入阻塞狀态和死亡狀态,即使是線上程的執行邏輯中調用wait、sleep或其他block的IO操作等,也必須先獲得CPU的排程執行權才可以,嚴格來說,就緒狀态的線程隻能意外終止或進入運作狀态;
  • running運作狀态:一旦CPU通過輪詢或其他方式從任務可執行隊列中選中了線程,此時它才能真正地執行自己的邏輯代碼;一個正在running狀态的線程事實上也是一個runnable的,但是反過來則不成立;
多線程高并發程式設計(1) -- 基礎及詳解
  • sleep:使目前線程進入指定毫秒級的休眠,暫停執行,但不會放棄monitor鎖的所有權,即不會釋放鎖資源;使用TimeUnit來替代Thread.sleep,省去了時間機關的換算步驟;
  • yield:屬于一種啟發式的方法,其會提醒排程器我願意放棄目前的CPU資源,如果CPU的資源不緊張,則會忽略這種提醒;yield隻是一個提示(hint),CPU排程器并不會擔保每次都能滿足yield提示;
  • sleep和yield的差別:
    • sleep會導緻目前線程暫停指定的時間,沒有CPU時間片的消耗;
    • yield隻是對CPU排程器的一個提示,如果CPU排程器沒有忽略這個提示,它會導緻線程上下文的切換;
    • sleep會使線程短暫block,會在給定的時間内釋放CPU資源;
    • yield會使running狀态的線程進入runnable狀态(如果CPU排程器沒有忽略這個提示的話);
    • sleep幾乎百分之百地完成了給定時間的休眠,但yield的提示并不能一定擔保;
    • 一個線程sleep另一個線程interrupt會捕獲到中斷信号,而yield則不會;
  • join:join某個線程A,會使目前線程B進入等待,直到線程A結束生命周期;可以使用join來達到線程順序執行的效果;
  • wait:表示線程一直等待,直到其他線程通知,與sleep不同的是它會釋放鎖;調用wait會加入wait set中,notify會随機喚醒一個,notifyAll會彈出所有線程;
多線程高并發程式設計(1) -- 基礎及詳解
  • notify:喚醒一個處于等待狀态的線程;
  • notifyAll:喚醒同一個對象上所有調用wait方法的線程,優先級高的線程優先排程;
  • synchronized:同步,内置鎖、互斥鎖、可重入鎖,鎖定共享資源(共享資源對象不能為null,使用static修飾保持對象引用位址隻有一份),依賴JVM,JVM指令是monitor enter和monitor exit;synchronized的指令嚴格遵守java happens-before規則,一個monitor exit指令之前必定要有一個monitor enter;不可中斷鎖,适合競争不激烈,可讀性好;
    • 由于同一程序的多個線程共享同一塊存儲空間,在帶來友善的同時,也帶來了通路沖突的問題。為了保證資料在方法中被通路時的正确性,在通路時加入鎖機制(synchronized),當一個線程獲得對象的排他鎖,獨占資源,其他線程必須等待,使用後釋放鎖即可。但存在以下問題:
      • 一個線程持有鎖會導緻其他所有需要此鎖的線程挂起;
      • 在多線程競争下,加鎖、釋放鎖會導緻比較多的上下文切換和排程延時,引起性能問題;
      • 如果一個優先級高的線程等待一個優先級低的線程釋放鎖會導緻優先級倒置,引起性能問題;
    • 鎖資訊存在對象頭中:
      • Mark Word
        • 線程id
        • Epoch
        • 對象的分代年齡資訊
        • 是否是偏向鎖
        • 鎖标志位
      • Class Metadata Address
    • 使用範圍:
      • 修飾代碼塊:大括号括起來的代碼,作用于調用的對象;
        @Override
            public void run() {
                synchronized (this) {//類A實作了Runnable,重寫了run方法,執行個體化對象a,b分别加入到Thread構造方法中并開啟線程,this是兩個不同的對象;
                    System.out.println(this.hashCode());
                }
            }
                MyThread thread = new MyThread();
                MyThread thread2 = new MyThread();
                Thread t1 = new Thread(thread, "t1");
                Thread t2 = new Thread(thread2, "t2");
                Thread t3 = new Thread(thread, "t3");
                Thread t4 = new Thread(thread, "t4");
        
                t1.start();
                t2.start();
                t3.start();
                t4.start();
        ===========結果=================
        693024158
        1259146238
        1259146238
        1259146238      
      • 修飾方法:整個方法,作用于調用的對象;
        @Override
            public synchronized void run() {//類A實作了Runnable,重寫了run方法,執行個體化對象a,b分别加入到Thread構造方法中并開啟線程,this是兩個不同的對象;
                System.out.println(this.hashCode());
            }
                MyThread thread = new MyThread();
                MyThread thread2 = new MyThread();
                Thread t1 = new Thread(thread, "t1");
                Thread t2 = new Thread(thread2, "t2");
                Thread t3 = new Thread(thread, "t3");
                Thread t4 = new Thread(thread, "t4");
        
                t1.start();
                t2.start();
                t3.start();
                t4.start();
        ===============結果===================
        487590100
        697138600
        697138600
        697138600      
      • 修飾靜态方法:整個靜态方法,作用于所有對象;
        @Override
            public void run() {
                    test1();
            }
            public static synchronized void test1(){
                System.out.println(MyThread.class.hashCode());
            }
                MyThread thread = new MyThread();
                MyThread thread2 = new MyThread();
                Thread t1 = new Thread(thread, "t1");
                Thread t2 = new Thread(thread2, "t2");
                Thread t3 = new Thread(thread, "t3");
                Thread t4 = new Thread(thread, "t4");
        
                t1.start();
                t2.start();
                t3.start();
                t4.start();
        ===============結果===================
        6566818
        6566818
        6566818
        6566818      
      • 修飾類:括号括起來的部分,作用于所有對象;
        @Override
            public void run() {
                    test1();
            }
            public static  void test1(){
                synchronized (MyThread.class) {
                    System.out.println(MyThread.class.hashCode());
                }
            }
                MyThread thread = new MyThread();
                MyThread thread2 = new MyThread();
                Thread t1 = new Thread(thread, "t1");
                Thread t2 = new Thread(thread2, "t2");
                Thread t3 = new Thread(thread, "t3");
                Thread t4 = new Thread(thread, "t4");
        
                t1.start();
                t2.start();
                t3.start();
                t4.start();
        ==========結果=============
        6566818
        6566818
        6566818
        6566818      
  • Lock:顯示鎖,依賴特殊的CPU指令;可中斷鎖,多樣化同步,競争激烈時能維持常态;
    public class MyLock implements Lock {//自定義Lock
        private boolean isLocked = false;
        @Override
        public void lock() {
            while (isLocked){//已經獲得鎖
                try {
                    wait();//等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            isLocked=true;//獲鎖成功
    
        }
    
        @Override
        public void unlock() {
            isLocked = false;//釋放鎖
            notify();//喚醒等待線程
        }
    }