天天看點

網際網路技術14——Semaphore信号量

SemaPhore信号量非常适合高并發通路,在系統上線之前,要對系統通路量記性評估,當然這個肯定不是随便寫的,是經過以往的經驗、資料、曆年的通路量,已經推廣力度進行一個合理的評估,當然評估标準不能太大也不能太小。太大的話則投入的資源得不到實際的效果,會浪費資源。當評估值調小則某個時間點一個高峰值的通路量上回直接壓垮系統。

解決高并發關鍵在業務,要将業務進行劃分,劃分為不同的層次和子產品,針對某個具體的子產品,使用Nginx等進行分流,分給部署同一個系統的多個Tomcat伺服器,使其負載均衡。整個架構可能包括商品系統。促銷系統、購物車系統。支付系統等,在網絡入口處視同nginx,将流量配置設定到不同的子產品,分别進行處理。下面是網際網路企業關系的一些相關概念,Nginx理論上可以大概處理2000萬的并發類,若并發量更大,可以考慮在Nginx之前,添加LVS。

pv(page view):網站的總通路量,頁面浏覽量或點選量,使用者每重新整理一次就會被記錄一次。

UV(unique visitor):通路網站的一台電腦用戶端為一個方可,一般來講,時間上以00.00-24“00之間相同的id的用戶端隻記錄一次。

QPB(query per second):每秒查詢書,qps很大程度上代表了系統業務上的繁忙程度,每次請求的背後,可能對應着多次磁盤io、多次忘了請求,多個cpu時間片等。一旦目前qps超過鎖設定的預警門檻值,可以考慮增機器對叢集擴容,一面壓力過大導緻當機,可以根據前期的壓力測試得到估值,在結合後期綜合運維的情況,估算出門檻值。

RT(response time)請求的相應時間,這個名額非常關鍵,直接說明前段使用者的體驗。是以任何系統設計師動想降低rt的時間。

當然還涉及cpu、記憶體、網絡、磁盤等情況,更多細節的問題很多,如:每秒select、update、delete操作的資料量扥資料層面的統計。

一般來講,主要上線的系統進行多輪壓力測試以後,可以對系統進行峰值評估,采用所謂的80/20原則,即80%的通路請求将在20%的時間内到達。這樣我們可以根據系統對應的pv計算出峰值qps。

峰值qps=(總PV*80%)/(60*60*24*20%)

然後再講總峰值qps除以單台機器所能承受的最高qps值,就是所需要機器的數量。

機器數=總得峰值qps/壓力測試得到的單台機器極限qps

Semaphore控制系統的流量

semaphore又稱信号量,是作業系統中的一個概念,在java并發程式設計中,信号量控制的是線程并發的數量。samephore管理一些列許可證。release釋放一個許可證。acquire():目前行代碼從samephore中獲得一個許可證,release():目前行代碼向samephore中釋放一個許可證。

semaphire經常用于限制擷取某種資源的線程數量。比如說有5條鐵路,每個鐵路上隻能走一條火車,那們就可以初始化一個new  Semaphore(5),每次火車行駛前都要執行acquire()來擷取同行許可,如果目前行駛的火車數小于5則可直接獲得許可,如果等于5,則要等待有一個火車行駛完畢,執行release()歸還許可。

示例代碼:

我直接用的随機數表示是同一個線程,雖然不嚴瑾,但是在1000以内随機,不同線程得到相同随機數機率很小,作為示範,還是可以接受。

package com.company.concurrentTest;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by BaiTianShi on 2018/8/23.
 */
public class SemaphoreTest {

    public static void main(String[] args) {
         Semaphore semaphore = new Semaphore(1);
         ExecutorService executorService = Executors.newCachedThreadPool();
        for(  int i=0;i<6;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        int x=(int)(Math.random()*1000);
                        semaphore.acquire();
                        System.out.println("我産生的随機數是"+x+",開始執行任務");
                        Thread.sleep(1000);
                        System.out.println("我産生的随機數是"+x+",任務執行結束");
                        Thread.sleep(1000);
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }



    }

}
           

運作結果:永遠都是一個線程執行完畢,在執行另一個線程。

我是随機98線程開始
我是随機98線程結束
我是随機977線程開始
我是随機977線程結束
我是随機173線程開始
我是随機173線程結束
我是随機503線程開始
我是随機503線程結束
我是随機422線程開始
我是随機422線程結束
我是随機764線程開始
我是随機764線程結束
           

把new Semaphore(1) 改成 new Semaphore(3),再看運作結果

網際網路技術14——Semaphore信号量

可以看見,此時是前三個線程同時執行,然後一個線程執行完畢後釋放一個信号,再開始一個等待着的線程。

同時samephore的acquire()方法和release()可以傳入一個int類型的參數,用以指定一次性擷取或釋放許可證的數量。

同時samephore()還有兩個兩個重要的方法

1.avaliablePermits():傳回可用許可數量

2.drainPermits():傳回所有可用的許可數量,并将許可數置0

avaliablePermits示例:

package com.company.concurrentTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by BaiTianShi on 2018/8/23.
 */
public class SemaphoreTest2 {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        try {
            semaphore.acquire(2);
            System.out.println(semaphore.availablePermits());
            semaphore.release();
            System.out.println(semaphore.availablePermits());
            semaphore.release(5);
            System.out.println(semaphore.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}
           

運作結果:

網際網路技術14——Semaphore信号量

drainPermits()使用示例:

package com.company.concurrentTest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * Created by BaiTianShi on 2018/8/23.
 */
public class SemaphoreTest2 {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        try {
            semaphore.acquire(2);
            //期望結果 1
            System.out.println(semaphore.availablePermits());
            semaphore.release();
            //期望結果 2
            System.out.println(semaphore.availablePermits());
            semaphore.release(5);
            //期望結果 7
            System.out.println(semaphore.availablePermits());

            //期望結果 7
            System.out.println(semaphore.drainPermits());
            //期望結果 0
            System.out.println(semaphore.availablePermits());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}
           

運作結果:

網際網路技術14——Semaphore信号量

繼續閱讀