天天看點

MongoDB分片遷移原理與源碼(2)MongoDB分片遷移原理與源碼

MongoDB分片遷移原理與源碼

源碼

下面将從源碼角度分析與遷移相關的若幹過程,源碼基于MongoDB-4.0.3版本。

split chunk

split chunks 一般是在插入、更新、删除資料時,由 mongos 發出到分片的 splitVector 指令,此時分片才會判斷是否需要 split。

_runAutosplit()函數

//預設的chunk最大位元組數。該大小可以調整,範圍為[1,1024]M
const uint64_t ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes{64 * 1024 * 1024};
/*系統會排程一個自動split的任務,而任務會調用下述接口。該接口會确定是否應該分割指定的塊,然後執行任何必要的分割。它還可以執行“top chunk”優化,其中包含MaxKey或MinKey的結果塊将被移到另一個碎片上,以減輕原始所有者的負載*/
void _runAutosplit(const NamespaceString& nss,
                       const BSONObj& min,
                       const BSONObj& max,
                       long dataWritten) {
    //......
    
    //經過一些參數判斷,比如判斷根據min擷取的chunk包含的range是否與要split的range相同;是否打開了自動split等;

    //調用splitVector來判斷是否需要split
    auto splitPoints = uassertStatusOK(splitVector(opCtx.get(),
                                                       nss,
                                                       cm->getShardKeyPattern().toBSON(),
                                                       chunk.getMin(),
                                                       chunk.getMax(),
                                                       false,
                                                       boost::none,
                                                       boost::none,
                                                       boost::none,
                                                       maxChunkSizeBytes));

    if (splitPoints.size() <= 1) {
        /*沒有分割點意味着沒有足夠的資料可供分割;一個分割點意味着我們有一半的塊大小到完整的塊大小,是以還沒有必要分割*/
        return;
    }
    
    //......
    //進行實際的split操作
    uassertStatusOK(splitChunkAtMultiplePoints(opCtx.get(),
                                               chunk.getShardId(),
                                               nss,
                                               cm->getShardKeyPattern(),
                                               cm->getVersion(),
                                               chunkRange,
                                               splitPoints));
                                                   
    //判斷是否需要進行balance;包括判斷支援的balance設定為kAutoSplitOnly,即隻支援在自動split後balance;以及發生split的nss支援balance;
    const bool shouldBalance = isAutoBalanceEnabled(opCtx.get(), nss, balancerConfig);
                           
    //如果啟用了autobalance選項,并且作為頂部塊優化的一部分在集合的第一個塊或最後一個塊進行分割,則平衡結果塊。
    if (!shouldBalance || topChunkMinKey.isEmpty()) {
        return;
    }
    
    //嘗試将頂部塊移出shard,以防止熱點停留在單個shard上。這是基于以下假設:後續插入将落在頂部塊上。這是因為split觸發的一次move。
    moveChunk(opCtx.get(), nss, topChunkMinKey);
}           

splitVector()函數

/*給定一個塊,确定它是否可以分割,如果可以,則傳回分割點。這個函數的功能相當于splitVector指令。如果指定了maxSplitPoints,并且有多個“maxSplitPoints”拆分點,則隻傳回第一個“maxSplitPoints”拆分點。
如果指定了maxChunkObjects,那麼它訓示拆分每個“maxChunkObjects”的th鍵。
預設情況下,我們将資料塊分割,這樣每個新資料塊大約有maxChunkSize資料塊一半的鍵。我們隻分割“maxChunkObjects”的第一個鍵,如果它将分割的鍵數低于預設值。maxChunkSize是塊的最大大小(以兆位元組為機關)。如果資料塊超過這個大小,我們應該分塊。雖然maxChunkSize和maxChunkSizeBytes是boost::optional,但至少必須指定一個。如果設定了force,則在塊的中點處進行分割。這也有效地使maxChunkSize等于塊的大小。
*/
StatusWith<std::vector<BSONObj>> splitVector(OperationContext* opCtx,
                                             const NamespaceString& nss,
                                             const BSONObj& keyPattern,
                                             const BSONObj& min,
                                             const BSONObj& max,
                                             bool force,
                                             boost::optional<long long> maxSplitPoints,
                                             boost::optional<long long> maxChunkObjects,
                                             boost::optional<long long> maxChunkSize,
                                             boost::optional<long long> maxChunkSizeBytes) {
    // maxChunkObjects一直有預設值。kMaxObjectPerChunk=25000
    if (!maxChunkObjects) {
        maxChunkObjects = kMaxObjectPerChunk;
    }
    //......
    
    //擷取集合相關資訊
    const long long recCount = collection->numRecords(opCtx);
    const long long dataSize = collection->dataSize(opCtx);

    /*現在我們已經有了大小估計,檢查一下其餘的參數,并應用這裡指定的最大大小限制。強制分割相當于讓maxChunkSize等于目前塊的大小,下面的邏輯将把這一大塊分成兩半*/

    if (force) {
        maxChunkSize = dataSize;
    } else if (!maxChunkSize) {
        if (maxChunkSizeBytes) {
            maxChunkSize = maxChunkSizeBytes.get();
        }
    } else {
        maxChunkSize = maxChunkSize.get() * 1 << 20;
    }

    //我們需要一個最大的塊大小,除非我們實際上不能找到任何分裂點。
    if ((!maxChunkSize || maxChunkSize.get() <= 0) && recCount != 0) {
        return {ErrorCodes::InvalidOptions, "need to specify the desired max chunk size"};
    }

    //如果沒有足夠的資料來處理多個塊,就沒有必要繼續了。
    if (dataSize < maxChunkSize.get() || recCount == 0) {
        std::vector<BSONObj> emptyVector;
        return emptyVector;
    }

    //我們将使用平均對象大小和對象數量來找到每個塊應該擁有的鍵數。如果提供了maxChunkSize或maxChunkObjects,我們将按其一半進行拆分。
    const long long avgRecSize = dataSize / recCount;

    long long keyCount = maxChunkSize.get() / (2 * avgRecSize);

    if (maxChunkObjects.get() && (maxChunkObjects.get() < keyCount)) {
        log() << "limiting split vector to " << maxChunkObjects.get() << " (from " << keyCount
              << ") objects ";
        keyCount = maxChunkObjects.get();
    }

    /*周遊索引并将第keyCount個鍵添加到結果中。如果這個鍵之前出現在結果中,我們就忽略它。這裡的不變式是,給定鍵值的所有執行個體都位于同一塊中。*/
    
    //......
    
    /*使用每個第keyCount個鍵作為一個分裂點。我們添加初始鍵作為标記,在結束時移除。如果一個鍵出現的次數超過塊上允許的條目數,我們将發出警告并對下面的鍵進行拆分。*/
    
    //......
    
    //傳回所有分裂點
}                                                        

splitChunkAtMultiplePoints()函數會調用splitChunk()函數進行分裂操作

StatusWith<boost::optional<ChunkRange>> splitChunk(OperationContext* opCtx,
                                                   const NamespaceString& nss,
                                                   const BSONObj& keyPatternObj,
                                                   const ChunkRange& chunkRange,
                                                   const std::vector<BSONObj>& splitKeys,
                                                   const std::string& shardName,
                                                   const OID& expectedCollectionEpoch) {
    //......
    
    //将split資訊送出到config伺服器,使用的“_configsvrCommitChunkSplit”指令
    auto request =
        SplitChunkRequest(nss, shardName, expectedCollectionEpoch, chunkRange, splitKeys);

    auto configCmdObj =
        request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());

    auto cmdResponseStatus =
        Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
            opCtx,
            kPrimaryOnlyReadPreference,
            "admin",
            configCmdObj,
            Shard::RetryPolicy::kIdempotent);
    
    //......
}                                                              

balance

MongoDB balancer 是一個背景程序,它監視每個分片上的塊的數量。當給定分片上的塊數量達到特定的遷移門檻值時,平衡器嘗試在分片之間自動遷移塊,并在每個分片上達到相同數量的塊。

切分叢集的平衡過程對使用者和應用程式層是完全透明的,盡管在此過程中可能會有一些性能影響。

從MongoDB 3.4開始,balancer在config伺服器副本集(CSRS)的主節點上運作.

balancer 基本過程大緻相同:

  1. config.shards 讀取分片資訊;
  2. config.collections 讀取所有集合資訊,并且随機排序儲存到一個數組中;
  3. 對每個集合從 config.chunks 讀取 chunks 的資訊;
  4. 含有最多 chunks 數量 (maxChunksNum)的分片為源分片,含有最少 chunks 數量(minChunksNum)的分片為目的分片; 如果 maxChunksNum - idealNumberOfChunksPerShardForTag(每個碎片的最優塊數的上限) 大于遷移的門檻值 (threshold), 那麼就是不均衡狀态,需要遷移,源分片的 chunks 第一個 chunk 為待遷移的 chunk ,構造一個遷移任務(源分片,目的分片,chunk)。

構造遷移任務時,如果某個集合含有最多數量的分片或者最少數量 chunks 的分片,已經屬于某一個遷移任務,那麼此集合本輪 balancer 不會發生遷移,即,一個分片不能同時參與多個塊的遷移。要從一個分片遷移多個塊,平衡器一次遷移一個塊。。最後,本次檢測出的遷移任務完成以後才開始下次 balancer 過程。

balancer 過程中,會對集合做一次随機排序,當有多個集合的資料需要均衡時,遷移時也是随機的,并不是遷移完一個集合開始下一個集合。

void Balancer::_mainThread() {
    //......
    // balancer主循環
    while (!_stopRequested()) {
        BalanceRoundDetails roundDetails;

        _beginRound(opCtx.get());

        try {
            shardingContext->shardRegistry()->reload(opCtx.get());

            //判斷balance是否打開,如果沒有打開,_endRound會sleep 10s(kBalanceRoundDefaultInterval);沒有打開包括:balance為off、或者是隻在split後進行的balance、或者balance隻支援在某個視窗時間;
            if (!balancerConfig->shouldBalance()) {
                _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
                continue;
            }

            {
                //對分片的集合進行splitChunk操作
                Status status = _enforceTagRanges(opCtx.get());
                if (!status.isOK()) {
                    warning() << "Failed to enforce tag ranges" << causedBy(status);
                } else {
                    LOG(1) << "Done enforcing tag range boundaries.";
                }

                //選擇需要遷移的chunk
                const auto candidateChunks = uassertStatusOK(
                    _chunkSelectionPolicy->selectChunksToMove(opCtx.get(), _balancedLastTime));

                if (candidateChunks.empty()) {
                    LOG(1) << "no need to move any chunk";
                    _balancedLastTime = false;
                } else {
                    //為指定的塊集合安排遷移,并傳回成功處理了多少塊。
                    _balancedLastTime = _moveChunks(opCtx.get(), candidateChunks);

                    roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()),
                                              _balancedLastTime);
                }
            }

            //預設的檢測周期為 10s, 如果發生了moveChunk, 檢測周期為 1s
            //const Seconds kBalanceRoundDefaultInterval(10);
            //const Seconds kShortBalanceRoundInterval(1);
            _endRound(opCtx.get(),
                      _balancedLastTime ? kShortBalanceRoundInterval
                                        : kBalanceRoundDefaultInterval);
        } catch (const std::exception& e) {
            //......
            _endRound(opCtx.get(), kBalanceRoundDefaultInterval);
        }
    }
}           

selectChunksToMove()函數最終傳回所有需要遷移的chunk資訊

StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMove(
    OperationContext* opCtx, bool aggressiveBalanceHint) {
    //......
    //讀取分片資訊
    const auto shardStats = std::move(shardStatsStatus.getValue());
    if (shardStats.size() < 2) {
        return MigrateInfoVector{};
    }
    //usedShards儲存那些已經涉及到某一個塊遷移的(from shard, to shard)資訊,同一次balance,一個shard隻參與一個塊的遷移,不管是from shard還是to shard。
    std::set<ShardId> usedShards;
    
    //讀取所有集合資訊,并且随機排序儲存到一個數組中
    std::shuffle(collections.begin(), collections.end(), _random);
    
    for (const auto& coll : collections) {
        //如果集合已經被删了,跳過
        if (coll.getDropped()) {
            continue;
        }

        const NamespaceString nss(coll.getNs());
        //如果集合不允許balance,掉過
        if (!coll.getAllowBalance()) {
            LOG(1) << "Not balancing collection " << nss << "; explicitly disabled.";
            continue;
        }
        //擷取目前集合需要遷移的chunk資訊
        auto candidatesStatus = _getMigrateCandidatesForCollection(
                opCtx, nss, shardStats, aggressiveBalanceHint, &usedShards);
        //此處會判斷candidatesStatus結果,如果集合被删了,跳過;如果其他錯誤,列印日志後跳過
        candidateChunks.insert(candidateChunks.end(),
                               std::make_move_iterator(candidatesStatus.getValue().begin()),
                               std::make_move_iterator(candidatesStatus.getValue().end()));
    }           

_getMigrateCandidatesForCollection()函數擷取目前集合需要遷移的chunk資訊

StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection(
    OperationContext* opCtx,
    const NamespaceString& nss,
    const ShardStatisticsVector& shardStats,
    bool aggressiveBalanceHint,
    std::set<ShardId>* usedShards) {
    //......
    
    //讀取集合的所有chunk資訊
    
    //傳回該集合建議在shard之間遷移的chunk集合
    return BalancerPolicy::balance(shardStats, distribution, aggressiveBalanceHint, usedShards);
}           

balance()函數計算集合内各shard上chunk的個數,确定遷移變化的情況。

在4.0中(其實是從3.4)開始,遷移門檻值與官方文檔中的介紹不符

遷移門檻值

不符;官方文檔的描述是3.2版本中的設計。

3.2 版本, chunks 數量小于 20 的時候為 2, 小于 80 的時候為 4, 大于 80 的時候為 8 。也就是說假設兩分片叢集,某個表有 100 個chunk , 每個分片分别有 47 和 53 個chunk 。那麼此時 balance 認為是均衡的,不會發生遷移。

int threshold = 8;

if (balancedLastTime || distribution.totalChunks() < 20) threshold = 2;

else if (distribution.totalChunks() < 80)

threshold = 4;

4.0 版本,chunks 數量差距大于 2 的時候就會發生遷移。

/*傳回一組建議的塊,根據碎片的指定狀态(耗盡、達到最大大小等)和該集合的塊的數量移動碎片。如果政策不建議移動任何内容,則傳回一個空向量。vector do中的條目都是針對單獨的源/目标碎片的,是以不需要串行執行,可以并行排程。
平衡邏輯為每個區域計算每個碎片的最佳塊數,如果任何碎片的塊數足夠高,建議将塊移動到低于這個數字shard。
shouldAggressivelyBalance參數導緻塊的門檻值可能會降低碎片之間的差異。
usedShards參數是in/out,它包含一組已經用于遷移的shards。這樣我們就不會為同一個碎片傳回多個沖突遷移。*/
vector<MigrateInfo> BalancerPolicy::balance(const ShardStatisticsVector& shardStats,
                                            const DistributionStatus& distribution,
                                            bool shouldAggressivelyBalance,
                                            std::set<ShardId>* usedShards) {
    vector<MigrateInfo> migrations;
    
    // 1) Check for shards, which are in draining mode
    // 這一部分是将處于draining模式的shard中的chunk移到其他shard,從該被删除的shard上拿一個chunk,找一個chunk最少的非from shard作為to shard(即即将被移除的shard)
    
    // 2) Check for chunks, which are on the wrong shard and must be moved off of it
    // 調整因為Tag設定不比對引起的chunk内的資料分布shard轉換
    
    // 3) for each tag balance
    //shouldAggressivelyBalance由最上層的Balancer::_mainThread()中_balancedLastTime指派,表明上一次遷移round中遷移個數,0為false
    //即如果已經在一次遷移中了或集合的塊總數少于20,則遷移門檻值為1;否則為2
    const size_t imbalanceThreshold = (shouldAggressivelyBalance || distribution.totalChunks() < 20)
        ? kAggressiveImbalanceThreshold
        : kDefaultImbalanceThreshold;
    
    vector<string> tagsPlusEmpty(distribution.tags().begin(), distribution.tags().end());
    tagsPlusEmpty.push_back("");

    for (const auto& tag : tagsPlusEmpty) {
        const size_t totalNumberOfChunksWithTag =
            (tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag));

        size_t totalNumberOfShardsWithTag = 0;

        for (const auto& stat : shardStats) {
            if (tag.empty() || stat.shardTags.count(tag)) {
                totalNumberOfShardsWithTag++;
            }
        }
        
        //計算每個碎片的最優塊數的上限
        const size_t idealNumberOfChunksPerShardForTag =
            (totalNumberOfChunksWithTag / totalNumberOfShardsWithTag) +
            (totalNumberOfChunksWithTag % totalNumberOfShardsWithTag ? 1 : 0);

        while (_singleZoneBalance(shardStats,
                                  distribution,
                                  tag,
                                  idealNumberOfChunksPerShardForTag,
                                  imbalanceThreshold,
                                  &migrations,
                                  usedShards))
            ;
    }
    return migrations;
}                                                       

_singleZoneBalance()函數去尋找滿足遷移門檻值限制的from shard和to shard以及chunk

bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats,
                                        const DistributionStatus& distribution,
                                        const string& tag,
                                        size_t idealNumberOfChunksPerShardForTag,
                                        size_t imbalanceThreshold,
                                        vector<MigrateInfo>* migrations,
                                        set<ShardId>* usedShards) {
    //擷取含有最多chunk數量的分片為源分片,from shard
    const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards);
    if (!from.isValid())
        return false;
    
    //最大的chunk量
    const size_t max = distribution.numberOfChunksInShardWithTag(from, tag);

    // Do not use a shard if it already has less entries than the optimal per-shard chunk count
    if (max <= idealNumberOfChunksPerShardForTag)
        return false;

    //擷取含有最少chunk數量的分片為源分片,to shard
    const ShardId to = _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards);
    if (!to.isValid()) {
        if (migrations->empty()) {
            log() << "No available shards to take chunks for zone [" << tag << "]";
        }
        return false;
    }

    //最小的chunk量
    const size_t min = distribution.numberOfChunksInShardWithTag(to, tag);

    // Do not use a shard if it already has more entries than the optimal per-shard chunk count
    if (min >= idealNumberOfChunksPerShardForTag)
        return false;

    //最大的chunk數與每個碎片的最優塊數的上限之間的內插補點
    const size_t imbalance = max - idealNumberOfChunksPerShardForTag;    
    
    //這個內插補點超過了遷移門檻值,之前算出來的2,則遷移
    if (imbalance < imbalanceThreshold)
        return false;                
    
    //把需要遷移的chunk,構造一個遷移任務
    const vector<ChunkType>& chunks = distribution.getChunks(from);

    unsigned numJumboChunks = 0;
    
    for (const auto& chunk : chunks) {
        if (distribution.getTagForChunk(chunk) != tag)
            continue;

        if (chunk.getJumbo()) {
            numJumboChunks++;
            continue;
        }

        migrations->emplace_back(to, chunk);
        invariant(usedShards->insert(chunk.getShard()).second);
        invariant(usedShards->insert(to).second);
        return true;
    }
}                                                   

在完成遷移塊的選擇之後,Balancer::_mainThread()會調用Balancer::_moveChunks(),_moveChunks調用MigrationManager::executeMigrationsForAutoBalance()執行moveChunk。

未完,待續

參考文檔

MongoDB官方文檔 孤兒文檔是怎樣産生的(MongoDB orphaned document) MongoDB疑難解析:為什麼更新之後負載升高了? 由資料遷移至MongoDB導緻的資料不一緻問題及解決方案