天天看點

并發程式設計(5)—— 常用方法

上一篇:并發程式設計(4)—— 線程的生命周期

一、線程優先級(1-10 預設值5)

在不同的作業系統中,設定線程優先級是一個不穩定的,因為有的作業系統會忽略調優先級的設定,使用方式: thread.setPriority(int newPriority);

二、守護線程

守護線程和主線程是共死的,主線程退出,守護線程肯定會退出,守護線程一般做的是支援行工作,例如記憶體方面的回收,對資源的清理;

測試代碼:

public class TestDaemo {

    public static void main(String[] args) {
        Thread thread = new DaemonThread();
        thread.setName("DaemonThread");
        thread.start();
    }

    public static class DaemonThread extends Thread{
        @Override
        public void run(){
            int a = 1;
            while (true){
                a++;
                System.out.println(Thread.currentThread().getName() + a);
            }
        }

    }
}
           

運作結果:

DaemonThread2
DaemonThread3
DaemonThread4
DaemonThread5
DaemonThread6
DaemonThread7
DaemonThread8
DaemonThread9
	  .
	  .
	  .
	  .
           

沒有任何操作的的情況下線程會一直運作下去。

如果我們把它設定為守護線程:

public class TestDaemo {

public static void main(String[] args) {
        Thread thread = new DaemonThread();
        thread.setName("DaemonThread");
        thread.setDaemon(true);
        thread.start();
        System.out.println("end");
    }

    public static class DaemonThread extends Thread{
        @Override
        public void run(){
            int a = 1;
            while (true){
                a++;
                System.out.println(Thread.currentThread().getName() + a);
            }
        }

    }
}
           

運作結果:

并發程式設計(5)—— 常用方法

子線程還沒來的及運作主線程就運作完了,是以子線程也不會繼續運作。

在守護線程中,finally代碼塊也不一定會執行

在設定守護線程的時候一定要在start()方法之前;

三、内置鎖 synchronized

synchronized詳解

四、wait()、notify()、notifyAll();

  • wait():讓線程處于等待狀态(調用前必須持有鎖,調用之後會自動釋放鎖,當wati()傳回的時候目前線程會持有鎖,這兩步由虛拟機自動執行)。
  • notify():喚醒等待狀态中的某一個線程(調用前也必須持有鎖,等同步代碼塊跑完才會釋放鎖)。
  • notifyAll():喚醒等待中的所有線程(調用前也必須持有鎖,等同步代碼塊跑完才會釋放鎖)。

代碼示例(模拟快遞發送通知):

快遞實體類:

public class Express {

    public static final String city = "上海市";
    private int km;
    private String site;
    public Express() {
    }
    public Express(int km, String site) {
        this.km = km;
        this.site = site;
    }

    public synchronized void setSite(String site) {
        this.site = site;
        notifyAll();
    }

    public synchronized void  checkKm(Integer km){
        this.km = km;
        notifyAll();

    }
    public synchronized void waitKm(){
        while (this.km < 100){
            try {
                System.out.println("目前公裡數是"+this.km+"小于100 繼續等待!!!");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("目前公裡數是"+this.km+"大于100 進行通知!!!");
    }

    public synchronized void waitCity(){
        while (site.equals(this.city)){
            try {
                System.out.println("還沒有出"+this.city+",繼續等待");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("已到達"+this.site);
    }
}
           

測試類:

public class TestThread extends Thread {

    private static Express express = new Express(0,"上海市");

    //檢測公裡數
    public static class CheckKm extends Thread{
        @Override
        public void run(){
            express.waitKm();
        }
    }
    //檢測到了哪哥城市
    public static class ChectCity extends Thread{
        @Override
        public void run(){
            express.waitCity();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++){
            Thread checkKm = new CheckKm();
            checkKm.start();
        }

        for (int i = 0; i < 3; i++){
            Thread ChectCity = new ChectCity();
            ChectCity.start();
        }
            
        express.checkKm(10);
        Thread.sleep(3000L);
        express.checkKm(90);
        Thread.sleep(3000L);
        express.checkKm(122);
        Thread.sleep(3000L);
        express.setSite("北京");
    }
}
           

運作結果:

目前公裡數是10小于100 繼續等待!!!
目前公裡數是10小于100 繼續等待!!!
目前公裡數是10小于100 繼續等待!!!
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
目前公裡數是90小于100 繼續等待!!!
目前公裡數是90小于100 繼續等待!!!
目前公裡數是90小于100 繼續等待!!!
目前公裡數是122大于100 進行通知!!!
目前公裡數是122大于100 進行通知!!!
目前公裡數是122大于100 進行通知!!!
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
還沒有出上海市,繼續等待
已到達北京
已到達北京
已到達北京

Process finished with exit code 0
           

五、使用wait(),notify(),notifyAll()完成等待逾時模式

模拟資料庫連接配接池擷取連接配接和放回連接配接

SqlConnetion.java資料庫連接配接

public class SqlConnetion implements Connection {

    /*拿一個資料庫連接配接*/
    public static final Connection fetchConnection(){
        return new SqlConnetion();
    }


    @Override
    public Statement createStatement() throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {

    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }

    @Override
    public void commit() throws SQLException {
        SleepTools.ms(100);
    }

    @Override
    public void rollback() throws SQLException {

    }

    @Override
    public void close() throws SQLException {

    }

    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {

    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {

    }

    @Override
    public String getCatalog() throws SQLException {
        return null;
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {

    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }

    @Override
    public void clearWarnings() throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

    }

    @Override
    public void setHoldability(int holdability) throws SQLException {

    }

    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {

    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }

    @Override
    public Clob createClob() throws SQLException {
        return null;
    }

    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }

    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {

    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {

    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }

    @Override
    public void setSchema(String schema) throws SQLException {

    }

    @Override
    public String getSchema() throws SQLException {
        return null;
    }

    @Override
    public void abort(Executor executor) throws SQLException {

    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {

    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}
           

DBPool.java 模拟資料庫連接配接池

public class DBPool {

    //模拟連接配接池
    private static LinkedList<Connection> pool = new LinkedList<>();

    //初始化鍊連接配接池
    public DBPool(int initsize){
        if(initsize > 0){
            for (int i = 0; i < initsize; i++) {
                pool.addLast(SqlConnetion.fetchConnection());
            }
        }
    }

    //從連接配接池擷取連接配接
    public Connection getConnection(long time) throws InterruptedException {
        synchronized (pool){
            if(time <= 0){
                while (pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else{
                long overTime = System.currentTimeMillis() + time;//等待最終時間
                long retime = time;//需要等待多久
                while (pool.isEmpty() && retime > 0){
                    pool.wait(retime);
                    retime = overTime - System.currentTimeMillis();//計算剩餘等待時間
                }
                Connection result = null;
                if(!pool.isEmpty()){
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }

    //歸還連接配接到連接配接池
    public void releaseConnection(Connection conn){
        if(conn != null){
            synchronized (pool){
                pool.addLast(conn);
                pool.notifyAll();
            }
        }
    }
}
           

DBPoolTest.java 從連接配接池中擷取連接配接

public class DBPoolTest {
    //初始化連接配接池
    static DBPool pool  = new DBPool(10);

    private static CountDownLatch countDownLatch;

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 50;//啟動50個線程
        int workCount = 20;//每個線程操作20次
        //擷取到連接配接的次數
        AtomicInteger get = new AtomicInteger(0);
        //未擷取到的次數
        AtomicInteger notGet = new AtomicInteger(0);
        countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i< threadCount; i++){
            Thread thread = new Thread(new Worker(workCount,get,notGet),"線程"+i);
            thread.start();

        }
        countDownLatch.await();
        System.out.println("成功擷取"+get.get()+"個連結");
        System.out.println("失敗擷取"+notGet.get()+"個連結");
    }


     static class Worker implements  Runnable{
        private int count;

        private AtomicInteger get;

        private AtomicInteger notGet;

        public Worker(int count, AtomicInteger get, AtomicInteger notGet){
            this.count = count;
            this.get = get;
            this.notGet = notGet;
        }

        @Override
        public void run() {
            while (count > 0){
                try {
                    //擷取連接配接
                    Connection connection = pool.fetchConn(1000);
                    if(connection != null){
                        try {
                            connection.createStatement();//模拟操作資料庫
                            connection.commit();//模拟操作資料庫
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }finally {
                            pool.releaseConn(connection);//歸還連接配接
                            get.incrementAndGet();//記錄得到連接配接的次數
                        }

                    }else{
                        notGet.incrementAndGet();//記錄為得到連接配接的次數
                        System.out.println(Thread.currentThread().getName()+"等待逾時");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    count --;
                }
            }
            countDownLatch.countDown();
        }
    }
}
           

運作結果:

線程19等待逾時
線程32等待逾時
	.
	.
	.
	.
線程29等待逾時
成功擷取890個連結
失敗擷取110個連結

Process finished with exit code 0
每次運作結果都不一樣,大家可自己嘗試
           

wait和notify/notifyAll的原理

調用wait方法,首先會擷取螢幕鎖,獲得成功以後,會讓目前線程進入等待狀态進入等待隊列并且釋放鎖。

當其他線程調用notify後,會選擇從等待隊列中喚醒任意一個線程,而執行完notify方法以後,并不會立馬喚醒線程,原因是目前的線程仍然持有這把鎖,處于等待狀态的線程無法獲得鎖。必須要等到目前的線程執行完按monitorexit指令以後,也就是鎖被釋放以後,處于等待隊列中的線程就可以開始競争鎖了。

wait和notify/notifyAll為什麼需要在synchronized裡面?

wait方法的語義有兩個,一個是釋放目前的對象鎖、另一個是使得目前線程進入阻塞隊列,而這些操作都和螢幕是相關的,是以wait必須要獲得一個螢幕鎖。

而對于notify來說也是一樣,它是喚醒一個線程,既然要去喚醒,首先得知道它在哪裡,是以就必須要找到這個對象擷取到這個對象的鎖,然後到這個對象的等待隊列中去喚醒一個線程。

六、yeild()

yield()的作用是讓步。它能讓目前線程由“運作狀态”進入到“就緒狀态”。

yield()方法就是目前線程把占有的cpu資源讓出來,然後同優先級的所有線程再去争搶這個cpu資源,可能會有同優先級的其他線程擷取到cpu資源,也可能還是這個線程擷取到cpu資源。

yield()不會釋放鎖,隻是讓出了cup的資源,讓出了執行權

例:有A、B兩個人做搶答題,一開始A搶到了答題的機會,但是A沒有去答題,而是提出在重新進行一次搶答,這時就相當于A讓出了答題的機會,然後A

、B又開始去搶答題的機會。

測試代碼:

public class TestYeild {

    public static int a = 1;

    public static Object object = new Object();

    public static class Answer extends Thread {

        @Override
        public void run() {
            synchronized (object){
                for(int i = 0; i< 5; i++){
                    System.out.println(Thread.currentThread().getName()+"擷取到答題機會");
                    Thread.yield();
                }

                System.out.println("随便列印點啥------------------");
            }


        }
    }

    public static class Answer2 extends Thread {

        @Override
        public void run() {
            synchronized (object){
                System.out.println(Thread.currentThread().getName()+"擷取到答題機會");

            }

        }
    }

    public static void main(String[] args) {
        for (int i = 0; i< 5; i++){
            Thread a = new Answer();
            a.setName("A");

            Thread b = new Answer2();
            b.setName("B");a.start();
            b.start();

        }


    }
}
           

運作結果:

一、加鎖的運作結果

A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥------------------
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥------------------
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥------------------
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥-----------------
B擷取到答題機會
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥------------------
B擷取到答題機會

Process finished with exit code 0
           

二、不加鎖(去掉synchronized關建字)運作結果:

A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
B擷取到答題機會
随便列印點啥------------------
A擷取到答題機會
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
B擷取到答題機會
随便列印點啥------------------
随便列印點啥------------------
B擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
A擷取到答題機會
随便列印點啥------------------
随便列印點啥------------------

Process finished with exit code 0
           

七、join()

join()方法使調用該方法的線程在此之前執行完畢,也就是等待該方法的線程執行完畢後再往下繼續執行。注意該方法也需要捕捉異常

示例代碼:

public class TestJoin {

    public static class JoinThread extends Thread{

        private Thread thread;
        public JoinThread(Thread thread){this.thread = thread;};
        @Override
        public void run(){
            for (int i = 0; i < 2; i++){
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("這是 "+Thread.currentThread().getName()+"線程");
            }
        }
    }

    public static class UserThread extends Thread{
        @Override
        public void run(){
            for (int a = 0; a< 5; a++)
            System.out.println("這是 UserThread 線程");
        }
    }

    public static void main(String[] args) {

        Thread t1 = new UserThread();
        Thread join = new JoinThread(t1);
        join.setName("join ");
        join.start();
        t1.start();
    }
}
           

運作結果:

這是 UserThread 線程
這是 UserThread 線程
這是 UserThread 線程
這是 UserThread 線程
這是 UserThread 線程
這是 join 線程
這是 join 線程

Process finished with exit code 0
           

代碼中可以看到,在join線程中調用的user線程的join方法,雖然我們先join.start()

但是還是等user線程運作完之後再運作的join線程