背景知識
Linux層面
linux對于檔案的讀取,提供了不同的函數,引用資料如下:
當對同一檔案句柄(在Windows下)或是檔案描述符(在Linux下)進行随機讀寫操作時,會存在檔案指針的定位與讀/寫倆個步驟,但由于這不是一個原子操作,就可能産生如下問題:程序A對某檔案先定位到 f1 處,然後被中斷,然後程序B對同一檔案定位到 f2 處,然後被中斷,程序A再次執行,從檔案的目前指針處開始讀或是寫,于是這便産生了不是期望的結果了。(這裡要注意,對同一檔案的倆次打開,得到的将是倆個不同的句柄或是描述符,是以不用擔心這種情況會出問題)
解決辦法:
在Linux下,pread函數就好像是專門為上面的問題服務的,它本身就是原子性的操作,定位檔案指針與讀操作一氣呵成,而且讀操作并不改變檔案指針。
總體來說,常用的有seek()+read() 和 pread()這2種方式,優劣如下:
seek()+read()非線程安全,但由于利用了檔案描述符所儲存的檔案指針,不需要每次讀取時都去定位,是以讀取效率較高,應用層面多線程通路時則需要做同步;
pread()是原子操作,線程安全,但由于每次都需要定位檔案指針,是以讀取效率較低;
Hdfs層面
hdfs基于linux的不同函數,提供了不同的實作,對應issue如下(
https://issues.apache.org/jira/browse/HADOOP-519):
HDFS File API should be extended to include positional read
HDFS Input streams should support positional read. Positional read (such as the pread syscall on linux) allows reading for a specified offset without affecting the current file offset. Since the underlying file state is not touched, pread can be used efficiently in multi-threaded programs.
Here is how I plan to implement it.
Provide PositionedReadable interface, with the following methods:
int read(long position, byte[] buffer, int offset, int length);
void readFully(long position, byte[] buffer, int offset, int length);
void readFully(long position, byte[] buffer);
Abstract class FSInputStream would provide default implementation of the above methods using getPos(), seek() and read() methods. The default implementation is inefficient in multi-threaded programs since it locks the object while seeking, reading, and restoring to old state.
DFSClient.DFSInputStream, which extends FSInputStream will provide an efficient non-synchronized implementation for above calls.
In addition, FSDataInputStream, which is a wrapper around FSInputStream, will provide wrapper methods for above read methods as well.
HBase中的應用
HBase中,定義了2種不同的ReadType:PREAD和STREAM,分别代表pread()和seek()+read():
@InterfaceAudience.Public
public enum ReadType {
DEFAULT, STREAM, PREAD
}
讀取hfile需要通過scanner,而建立StoreFileScanner的時候,會根據ReadType進入到不同的流程:
for (int i = 0, n = files.size(); i < n; i++) {
HStoreFile sf = sortedFiles.remove();
StoreFileScanner scanner;
if (usePread) {
scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
} else {
scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
canOptimizeForNonNullColumn);
}
scanners.add(scanner);
}
其中,getPreadScanner會直接傳回共享的reader對象,即底層共享同一個inputStream:
/**
* Get a scanner which uses pread.
* <p>
* Must be called after initReader.
*/
public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
/**
* @return Current reader. Must call initReader first else returns null.
* @see #initReader()
*/
public StoreFileReader getReader() {
return this.reader;
}
而getStreamScanner會建立一個新的reader,在fileInfo.open方法中,會打開一個新的inputStream,然後讀取hfile中相關中繼資料資訊,如果啟用了preFetchOnOpen也會觸發讀取資料塊:
/**
* Get a scanner which uses streaming read.
* <p>
* Must be called after initReader.
*/
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false,
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
}
private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
initReader();
StoreFileReader reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind, -1L,
primaryReplica, refCount, false);
reader.copyFields(this.reader);
return reader;
}
/**
* Open a Reader for the StoreFile
* @param fs The current file system to use.
* @param cacheConf The cache configuration and block cache reference.
* @return The StoreFile.Reader for the file
*/
public StoreFileReader open(FileSystem fs, CacheConfig cacheConf, boolean canUseDropBehind,
long readahead, boolean isPrimaryReplicaStoreFile, AtomicInteger refCount, boolean shared)
throws IOException {
FSDataInputStreamWrapper in;
FileStatus status;
final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
if (this.link != null) {
// HFileLink
in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind, readahead);
status = this.link.getFileStatus(fs);
} else if (this.reference != null) {
// HFile Reference
Path referencePath = getReferredToFile(this.getPath());
in = new FSDataInputStreamWrapper(fs, referencePath, doDropBehind, readahead);
status = fs.getFileStatus(referencePath);
} else {
in = new FSDataInputStreamWrapper(fs, this.getPath(), doDropBehind, readahead);
status = fs.getFileStatus(initialPath);
}
long length = status.getLen();
hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs);
StoreFileReader reader = null;
if (this.coprocessorHost != null) {
reader = this.coprocessorHost.preStoreFileReaderOpen(fs, this.getPath(), in, length,
cacheConf, reference);
}
if (reader == null) {
if (this.reference != null) {
reader = new HalfStoreFileReader(fs, this.getPath(), in, length, cacheConf, reference,
isPrimaryReplicaStoreFile, refCount, shared, conf);
} else {
reader = new StoreFileReader(fs, status.getPath(), in, length, cacheConf,
isPrimaryReplicaStoreFile, refCount, shared, conf);
}
}
if (this.coprocessorHost != null) {
reader = this.coprocessorHost.postStoreFileReaderOpen(fs, this.getPath(), in, length,
cacheConf, reference, reader);
}
return reader;
}
這裡有2個疑問
1:共享reader是哪裡來的
在open region或者由于flush和bulkload産生新的hfile時,都會open hfile,此時會建立一個reader進行中繼資料讀取,此reader即為共享reader,其shared屬性被設定為true;
StoreFileReader.java
// indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
// close the internal reader when readCompleted is called.
@VisibleForTesting
final boolean shared;
2:pread和stream分别什麼時候使用
預設情況下,get請求使用pread,compact scan使用stream;
對于user scan,則有以下規則:
- 如果用戶端明确指定readType,則直接使用
- 如果用戶端未指定,則服務端會以pread方式開始,讀取超過4個blocksize大小資料時,切換為stream方式,該門檻值通過hbase.storescanner.pread.max.bytes進行配置
- 如果不希望服務端進行上述切換,而固定使用pread,可将hbase.storescanner.use.pread配置為true
另外,在讀取完成,關閉scanner時,會調用readCompleted方法,該方法會判斷shared來決定是否關閉所使用的reader:
StoreFileScanner.java
@Override
public void close() {
if (closed) return;
cur = null;
this.hfs.close();
if (this.reader != null) {
this.reader.readCompleted();
}
closed = true;
}
StoreFileReader.java
/**
* Indicate that the scanner has finished reading with this reader. We need to decrement the ref
* count, and also, if this is not the common pread reader, we should close it.
*/
void readCompleted() {
refCount.decrementAndGet();
if (!shared) {
try {
reader.close(false);
} catch (IOException e) {
LOG.warn("failed to close stream reader", e);
}
}
}
問題和優化
以上為2.0版本中的代碼,其中有個很明顯的問題,就是很多scan都重複執行了fileInfo.open方法,而該方法包含的邏輯過多,造成了很多不必要的讀取,影響了scan性能且浪費系統資源,社群較新的版本對此處進行了優化,相關的issue為
https://issues.apache.org/jira/browse/HBASE-22888;