天天看点

HBase数据达到Memstore的过程

往HBase表中写入数据的主要流程。

1 RegionServer启动的初始化工作

启动

RegionServer

进程的入口类是

HRegionServer

,通过执行该类的

main

方法开始。

其简单流程如下:

//通过main方法的执行流程
//main()
new HRegionServerCommandLine(regionServerClass).doMain(args);

//doMain
int ret = ToolRunner.run(HBaseConfiguration.create(), this, args);

//ToolRunner.run, 启动的tool就是HRegionServerCommandLine对象
run(tool.getConf);

//HRegionServerCommandLine.run  HRegionServerCommandLine.start
//构造一个HRegionServer线程对象,并启动
HRegionServer hrs = HRegionServer.constructRegionServer(regionServerClass, conf);
hrs.start();
handleReportForDutyResponse(w);
startServices();
initializeMemStoreChunkCreator();

//这里定义chunksize的默认大小为2M
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
// init the chunkCreator
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
                        initialCountPercentage, this.hMemManager);

//initialize
instance = new ChunkCreator(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
            initialCountPercentage, heapMemoryManager,
            MemStoreLABImpl.INDEX_CHUNK_PERCENTAGE_DEFAULT);

//ChunkCreator构造方法
this.chunkSize = chunkSize; // in case pools are not allocated
initializePools(chunkSize, globalMemStoreSize, poolSizePercentage, indexChunkSizePercentage,
                initialCountPercentage, heapMemoryManager);

//initializePools
//在这里定义了dataChunk、indexChunk的size大小,默认为2M
this.dataChunksPool = initializePool("data", globalMemStoreSize,
            (1 - indexChunkSizePercentage) * poolSizePercentage,
            initialCountPercentage, chunkSize, heapMemoryManager);
// The index chunks pool is needed only when the index type is CCM.
// Since the pools are not created at all when the index type isn't CCM,
// we don't need to check it here.
this.indexChunksPool = initializePool("index", globalMemStoreSize,
                                      indexChunkSizePercentage * poolSizePercentage,
                                      initialCountPercentage, (int) (indexChunkSizePercentage * chunkSize),
                                      heapMemoryManager);
           

2 put操作的流程

当往HBase表中put数据时,最终会调用

HRgion

中的

doMiniBatchMutate

方法,接下来的调用流程如下所示。

//doMiniBatchMutate
//在此方法内,会先将修改写入到WAl中
// STEP 3. Build WAL edit
List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);

// STEP 4. Append the WALEdits to WAL and sync.
for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
    Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
    walEdit = nonceKeyWALEditPair.getSecond();
    NonceKey nonceKey = nonceKeyWALEditPair.getFirst();

    if (walEdit != null && !walEdit.isEmpty()) {
        writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
               nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
	}
    
    // Complete mvcc for all but last writeEntry (for replay case)
    if (it.hasNext() && writeEntry != null) {
        mvcc.complete(writeEntry);
        writeEntry = null;
    }
}

//再将数据写入到Memstore中
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);

//writeMiniBatchOperationsToMemStore
super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());

//writeMiniBatchOperationsToMemStore
applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);

//applyFamilyMapToMemStore
region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);

//applyToMemStore  修改或者添加分别走upsert add逻辑,其实差不多,这里以add为例
if (upsert) {
    store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
    store.add(cells, memstoreAccounting);
}

//add
this.memstore.add(cell, memstoreSizing);

//add  遍历Cells集合,对每一个cell进行处理
for (Cell cell : cells) {
    add(cell, memstoreSizing);
}

// add -> doAddOrUpsert -> doAdd
//第一步尝试去为Cell分配一个Chunk
Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
//将Cell写入到Chunk中,并最终加入到ConcurrentSkipList对象中
internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
           

从这里开始就分开了,先看下是如何分配Chunk对象的。

2.1 分配Chunk
//maybeCloneWithAllocator
currentActive.maybeCloneWithAllocator(cell, forceCloneOfBigCell);

//maybeCloneWithAllocator -> copyCellInto -> copyCellInto
private Cell copyCellInto(Cell cell, int maxAlloc) {
    //...
    Chunk c = null;
    int allocOffset = 0;
    while (true) {
        //开始分配一个Chunk
        c = getOrMakeChunk();
        
        //为Chunk申请内存空间
        if (c != null) {
            allocOffset = c.alloc(size);
            //一旦申请完毕,就跳出循环
            if (allocOffset != -1) {
                break;
            }
            //否则尝试重新分配
            tryRetireChunk(c);
        }
    }
    //将Cell中的数据拷贝到新申请的Chunk上
    return copyToChunkCell(cell, c.getData(), allocOffset, size);
}
           

2.1.1 如何申请Chunk

这里的主要思路就是:

1 获取当前正在使用的Chunk对象,不为null,则表示已经申请到,直接返回

2 为null,表示当前环境中没有Chunk对象,需要申请 getChunk

3 创建一个Chunk对象,并为其分配指定大小的一段连续内存空间buf,默认2M,返回

直接获取Chunk或创建一个Chunk

private Chunk getOrMakeChunk() {
    //currChunk代表当前正在使用的chunk对象的原子引用
    Chunk c = currChunk.get();
    
    //如果已经存在Chunk,那么直接返回Chunk对象即可
    if (c != null) {
        return c;
    }
    
    //如果Chunk为null,则需要从ChunkPool中申请新的Chunk,申请Chunk需要上锁
    if (lock.tryLock()) {
        try {
            // 类似于双重锁定,防止其他线程已经成功申请了Chunk
            c = currChunk.get();
            if (c != null) {
                return c;
            }
            
            //通过chunkCreator对象去创建Chunk       
            c = this.chunkCreator.getChunk(idxType);
            //Chunk创建成功之后,就会使用currChunk来引用,表示目前正在使用的chunk
            //同时,将创建的chunk放到Set集合chunks中
            if (c != null) {
                // set the curChunk. No need of CAS as only one thread will be here
                currChunk.set(c);
                chunks.add(c.getId());
                return c;
            }
        } finally {
            lock.unlock();
        }
    }
    return null;
}
           

创建Chunk对象

ChunkCreator

创建Chunk的步骤如下

//chunkCreator.getChunk -> getChunk -> getChunk  这里以DataChunk为例
//这里需要说明的是chunksize的默认大小为2M
//dataChunksPool前面已经说过了,dataChunksPool.getChunkSize()的值默认就是chunksize
Chunk getChunk(CompactingMemStore.IndexType chunkIndexType, ChunkType chunkType) {
    switch (chunkType) {
        case INDEX_CHUNK:
            if (indexChunksPool != null) {
                return getChunk(chunkIndexType, indexChunksPool.getChunkSize());
            }
        case DATA_CHUNK:
            if (dataChunksPool == null) {
                return getChunk(chunkIndexType, chunkSize);
            } else {
                return getChunk(chunkIndexType, dataChunksPool.getChunkSize());
            }
        default:
            throw new IllegalArgumentException(
                "chunkType must either be INDEX_CHUNK or DATA_CHUNK");
    }
}

//getChunk,创建Chunk对象,完成之后,对Chunk对象进行初始化工作
if (chunk == null) {
    chunk = createChunk(false, chunkIndexType, size);
}
chunk.init();
return chunk;

//这里根据配置,可以创建两种类型的Chunk,这里以OnheapChunk
private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType, int size) {
    Chunk chunk = null;

    if (pool && this.offheap) {
        chunk = new OffheapChunk(size, id, pool);
    } else {
        chunk = new OnheapChunk(size, id, pool);
    }

    return chunk;
}

//OnHeapChunk -> Chunk 构造方法,至此创建一个成员变量size为2M的Chunk对象
//此时并没有 为Chunk对象分配2M的内存空间
//Chunk对象创建成功之后,借由上面的getChunk进行init工作,分配内存空间,并返回
public Chunk(int size, int id, boolean fromPool) {
    this.size = size;
    this.id = id;
    this.fromPool = fromPool;
}

//init   -> allocateDataBuffer
//在这里为Chunk持有的data开辟2M的内存空间
void allocateDataBuffer() {
    if (data == null) {
        data = ByteBuffer.allocate(this.size);
        data.putInt(0, this.getId());
    }
}
           

2.1.2 从Chunk中申请内存空间,用于Cell数据的写入

主要思路

1 根据Cell,从已经申请到的Chunk对象中申请一段内存空间,用于Cell数据的存储

2 获取到Chunk的buf中已经使用的偏移量offset

3 Cell中的数据,写入到buf中的offset 至 (offset+size) 的位置

申请完chunk之后,接下来就是在已经申请的这个chunk中分配一段内存空间,用于存储当前cell。

//copyCellInto
//先获取要写入的cell的大小
int size = Segment.getCellLength(cell);
//从已经申请的chunk中的分配一段内存空间,size的大小视Cell而定
allocOffset = c.alloc(size);

//alloc
public int alloc(int size) {
    while (true) {
        //先获取当前Chunk已经使用的偏移量
        int oldOffset = nextFreeOffset.get();
        //说明Chunk还在初始化,再等等啊再等等
        if (oldOffset == UNINITIALIZED) {
            Thread.yield();
            continue;
        }
        
        if (oldOffset == OOM) {
            // doh we ran out of ram. return -1 to chuck this away.
            return -1;
        }
		
        //当当前Chunk剩余的空间不足以盛放该Cell时,返回-1
        if (oldOffset + size > data.capacity()) {
            return -1; // alloc doesn't fit
        }
        
        //计算要写入的cell要占多少空间,预留下来,不再进行分配,同时返回chunk以使用的偏移量
        if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
            // we got the alloc
            allocCount.incrementAndGet();
            return oldOffset;
        }
    }
}
           

2.1.3 Cell数据写入到Chunk的buf

分配完内存空间之后,就开始往Chunk中写入Cell数据了。

//copyCellInto
return copyToChunkCell(cell, c.getData(), allocOffset, size);

//copyToChunkCell
//往Chunk(实际上是内部持有的的Buffer)中写入Cell数据
private static Cell copyToChunkCell(Cell cell, ByteBuffer buf, int offset, int len) {
    //先写入数据
    ((ExtendedCell) cell).write(buf, offset);
    
    //写入完成之后,包装成一个Chunk返回
    return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());
}
           

Cell数据写入过程

((ExtendedCell) cell).write(buf, offset);

//write
KeyValueUtil.appendTo(this, buf, offset, true);

//appendTo 
//实际的写入操作
public static int appendTo(Cell cell, ByteBuffer buf, int offset, boolean withTags) {
    //记录key的长度
    offset = ByteBufferUtils.putInt(buf, offset, keyLength(cell));// Key length
    //记录value的长度
    offset = ByteBufferUtils.putInt(buf, offset, cell.getValueLength());// Value length
    //记录key值包括时间戳、列族、列名这些信息,全部写入到key中
    offset = appendKeyTo(cell, buf, offset);
    //写入value的值
    offset = CellUtil.copyValueTo(cell, buf, offset);// Value bytes
    int tagsLength = cell.getTagsLength();
    if (withTags && (tagsLength > 0)) {
        offset = ByteBufferUtils.putAsShort(buf, offset, tagsLength);// Tags length
        offset = PrivateCellUtil.copyTagsTo(cell, buf, offset);// Tags bytes
    }
    return offset;
}
           

基于已写入Cell数据的buf,封装新Cell对象

当把Cell数据写入到buf中之后,接着封装成chunk Cell对象,并返回。

//写入完成之后,包装成一个Chunk返回
return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());

//createChunkCell
return createChunkCell(buf, offset, len, tagsLen, cell.getSequenceId());

//实际上就是基于写入数据的buf,再次将其封装为一个Cell对象,并返回
private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tagsLen,
                                    long sequenceId) {
    if (tagsLen == 0) {
        return new NoTagByteBufferChunkKeyValue(buf, offset, len, sequenceId);
    } else {
        return new ByteBufferChunkKeyValue(buf, offset, len, sequenceId);
    }
}
           
这里有个疑问,每写入一个cell,均会在这里new一个对象?那海量数据的写入,是否会导致内存激增???
2.2 将新Cell对象加入到Memostore中

封装成Cell对象返回之后,接下来就是将该cell对象加入到Memostore持有的

ConcurrentSkipListMap

中。

private void doAdd(MutableSegment currentActive, Cell cell, MemStoreSizing memstoreSizing) {
    //申请Chunk,并往其中的buf中写入Cell数据,之后封装成新的Cell对象返回
    Cell toAdd = maybeCloneWithAllocator(currentActive, cell, false);
    
    if (!mslabUsed) {
      toAdd = deepCopyIfNeeded(toAdd);
    }
    
    //开始往Memstore中添加
    internalAdd(currentActive, toAdd, mslabUsed, memstoreSizing);
  }
           

主要流程如下:

//internalAdd
currentActive.add(toAdd, mslabUsed, memstoreSizing, sizeAddedPreOperation);

//add
internalAdd(cell, mslabUsed, memStoreSizing, sizeAddedPreOperation);

//internalAdd
protected void internalAdd(Cell cell, boolean mslabUsed, MemStoreSizing memstoreSizing,
                           boolean sizeAddedPreOperation) {
    //将构建的cell添加到ConcurrentSkipListMap中
    boolean succ = getCellSet().add(cell);
    
    //接着就是更新元数据信息
    updateMetaInfo(cell, succ, mslabUsed, memstoreSizing, sizeAddedPreOperation);
}

//getCellSet().add(Cell)
public boolean add(Cell e) {
   //this.delegatee = new ConcurrentSkipListMap<>(c.getSimpleComparator());
    return this.delegatee.put(e, e) == null;
}

//todo
protected void updateMetaInfo()
           

继续阅读