天天看點

HBase的scan源碼分析用戶端部分之整體流程(一)

        scan的調用代碼示例如下:

        下面,我們對scan的整個流程進行分析。

        首先從Table的getScanner(Scan scan)方法入手,它的定義如下:

        它的實作是由HTable來完成的,源碼如下:

      這裡,我們先隻研究ClientScanner,其他三種以後再說。

        ClientScanner的構造方法中,首先是對各種成員變量指派,比如scan、tableName、connection等,然後是處理maxScannerResultSize、scannerTimeout、caching等scan需要用到的各種參數,這些都沒有什麼好說的。

        接下來,是初始化兩個重要的變量caller和rpcControllerFactory,caller為RpcRetryingCaller類型的,rpcControllerFactory為RpcControllerFactory類型的。

        最後調用initializeScannerInConstruction()方法,ok,我們也跟着繼續。

       緊接着,調用nextScanner()方法,注意,傳入兩個參數,一個是ClientScanner對象生成時的caching,另外一個是false。

        這個caching,如果在構造Scan對象時沒有設定,則取參數hbase.client.scanner.caching配置的值,參數未配置則預設為100,它的含義是每次RPC請求的最大行數。

        繼續追蹤nextScanner()方法,完整的代碼如下:

        我們一步步分析,由于ClientScanner在構造時,并沒有初始化callable成員變量,是以它必定為null,第一部分代碼略過,我們以後再講。

        接下來,由于currentRegion也沒有被初始化,是以,程式走的是else分支,也就是,将localStartKey設定為scan的startRow。

        緊接着,調用getScannerCallable()方法為成員變量callable指派,入參為行localStartKey和行數nbRows,這個nbRows即為ClientScanner構造時的caching值。方法定義如下:

        首先,把localStartKey設定為scan的startRow,後續每次疊代處理時,我們就能知道scan的起始行。

        然後,生成一個ScannerCallable類型的對象s,這個s是要作為ScannerCallableWithReplicas類型的sr的成員變量currentScannerCallable來使用的,實際與Region所在RegionServer通信的正是這個對象,并且,這個ScannerCallable對象中有一個byte[]類型的row成員變量,它會被初始化為scan的startRow,被用來進行Region的定位和其行号的定位。

        構造ScannerCallableWithReplicas類型的對象sr并傳回,其成員變量currentScannerCallable為上面的ScannerCallable類型的對象s。

        getScannerCallable()方法執行完後,緊接着會調用ClientScanner的call()方法,代碼如下:

        之前我們已知道,caller為RpcRetryingCaller類型,它的方法定義如下:

        由代碼可以看出,它實際上的處理流程為先調用callable的prepare()方法,再調用callable的call()方法。

        接下來,我們轉入callable的分析,這個callable為ScannerCallableWithReplicas類型,它的prepare()方法為空,我們重點分析call()方法,代碼如下:

        又是一個比較長的方法,我們還是一步步分析。

        首先,currentScannerCallable雖然已被初始化,但是它的closed還是為false,那麼方法的第一塊代碼會被跳過。

        接下來,調用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法,利用cConnection、tableName和currentScannerCallable.getRow()定位Region,得到Region的位置RegionLocations類型的rl。

        然後,構造一個BoundedCompletionService類型的資料結構cs,cs中包含需要執行的task、已經完成的task和線程池executor,并提供了submit、poll、take等方法。依靠它,利用線程池完成任務的排程與執行,并同步擷取執行結果。

        addCallsForCurrentReplica方法就是實作上述邏輯的方法,代碼如下:

        它将currentScannerCallable封裝出成為一個RetryingRPC對象,送出到cs中執行,并添加到資料結構outstandingCallables中。

        我們知道,這個currentScannerCallable對象是ScannerCallable類型的,它被線程池排程執行時,依靠call()方法完成業務邏輯,call方法定義如下:

        進入該方法時,closed由于為false,是以執行else分支,并且,由于scannerId未被指派,是以它最終執行的是openScanner()方法,打開一個Scanner,并将ID指派給scannerId。

        接下來,cs.poll()方法的調用,則是從cs的BlockingQueue<Future<V>>類型的completed中擷取任務完成情況l。

        然後,調用updateCurrentlyServingReplica()方法,

        至此,整個Table.getScanner()方法分析完畢。總結起來,它主要完成了ResultScanner的初始化工作,并未真正請求資料。同時,它還做了以下幾件事情:

        1、生成一個實作了ResultScanner接口的對象,一般為ClientScanner(其他三個類型以後再分析);

        2、ClientScanner對象中callable被指派,

        接下來,我們繼續分析之後的工作。在完成了ResultScanner的初始化後,資料是如何擷取的呢?

        我們知道,ResultScanner繼承自Iterable接口,那麼在其實作ClientScanner的抽象父類AbstractClientScanner類中,定義了iterator()方法的實作,主要是通過hasNext()和next()方法來完成周遊的。那我們就來看下ClientScanner的next()方法,代碼如下:

        非常簡單,如果scanner的closed為true,并且cache沒有資料,則直接傳回null,如果closed為false,并且cache沒有資料,那麼通過loadCache()方法加載資料,然後其他的情況是,隻要cache有資料,則直接傳回。

        在這裡,我們就能知道,scan實際上并不是把資料直接全部加載到用戶端的,而是在用到的時候才去取。這麼做有什麼好處呢?

        1、避免全部資料尤其是大資料量的情況下全部緩存在用戶端,造成用戶端的壓力;

        2、如果等到全部資料都擷取後才可以用,用戶端的IO、叢集網絡IO會在資料擷取階段居高不下,延遲較高,而且不如這種類似懶加載的機制,資料邊用邊效果好的多。

        我們繼續分析loadCache()方法。

        首先,初始化兩個變量,remainingResultSize和countdown,這兩個變量的含義分别是一次RPC調用擷取的資料總大小和總行數,當然,如果remainingResultSize減去已整數行資料大小,比一行資料小,且還有資料要擷取,總行數又低于countdown的話,那麼我們還是要擷取完整的一行資料的,這兩個變量随着資料的擷取,不斷的遞減,并參與到是否應該擷取資料的邏輯判斷,相應代碼如下:

        接下來,進入一個do-while循環,因為skipFirst初始值為false,是以,第一次會直接調用call()方法,結果又到了執行ScannerCallableWithReplicas的call()方法。我們再次分析,同樣是先定位Region,擷取Region的位置資訊,并執行currentScannerCallable即ScannerCallable的call()方法,隻不過這次調用,因為有了scannerId,而實際調用RegionServer上的RPC服務,真正去擷取資料。代碼如下:

        這段代碼的最後,如果不能擷取到資料,則将scannerId恢複為-1,并将closed設定為true,以便下次調用時,将上次的callable關閉,并重新生成一個scanner。

        後續的邏輯是,如果scanner還有需要擷取的資料,重新定位scan的startRow,發送RPC請求至RegionSever,繼續擷取該Region上的資料,否則,關閉上一個callable,并重新生成一個scanner和callable,并重複以前的邏輯,繼續擷取資料,直到檢索到endRow,或者比endRow大的資料。

        還有很多細節和特殊情況沒有分析,留待以後慢慢分析吧~~