背景介绍
为了保证服务可用性和数据可靠性,一些重要业务会将存储系统部署在多地域多机房。比如在北京,上海,深圳 每个地域的机房各存储一份数据副本,保证即使某个地域的机房无法提供访问,也不会影响业务的正常使用。
在多机房部署时,需要考虑多机房之间的网络延迟问题。以作者的ping测试结果为例,上海<-->深圳的网络延迟约为30ms,而在一个机房内部,网络延迟仅在0.1ms左右。
腾讯云MongoDB在架构上,结合L5就近接入以及内部的“nearest”访问模式,实现了业务对机房的就近访问,避免了多机房带来的网络延迟问题。整体架构如下图所示,其中mongos为接入节点,可以理解为proxy;mongod为存储节点,存储用户的实际数据,并通过 1 Primary 多 Secondary的模式形成多个副本,分散到多个机房中

下面主要对腾讯云MongoDB中nearest模式的实现和使用方式做详细介绍。
2. 什么是nearest访问模式
MongoDB中,副本集 是指保存相同数据的多个副本节点的集合。用户可以通过Driver直接访问副本集,或者通过mongos访问副本集。如下图所示
副本集内部通过raft算法来选主,通过oplog同步数据。
MongoDB默认读写都在Primary节点上执行,但是也提供了接口进行读写请求分离,充分发挥系统的整体性能。读写分离的关键在于设置请求的readPreference。这个参数标识了用户希望读取哪种节点,目前可配置的类型共5种,如下所示
2.3 读写一致性保证
有些读者可能已经产生了疑问:如果Secondary节点从Primary同步数据可能存在延迟,如何保证在从节点能够读取到刚刚写入的数据?解决方法是:设置写入操作的WriteConcern,保证数据写入到全部节点之后再返回,此时再去从节点,肯定可以读取到最新的数据。
写操作是需要跨机房同步数据的,所以如果业务模型是写多读少,需要谨慎考虑。
3. nearest实现原理解析
如果业务通过mongos接入(腾讯云MongoDB架构常用方式),则mongos侧来完成到mongod的就近访问。如果业务直接接入副本集,则在driver层会完成到mongod的就近访问。
下面会结合mongos(腾讯云MongoDB代码),mgo-driver,以及官方最新发布的go-driver,来分析如何实现nearest访问,并给出一些使用上的建议。
mongos <code>每隔5秒</code>会对集群中的每个副本集启动探测线程,执行 <code>isMaster命令</code>并采集自己到每个节点的网络延迟情况,采集方式如下所示:
然后根据本次采集的延迟进行平滑更新, 核心如下所示:
节点选取的算法,可以参考SetState::getMatchingHost方法。大致的选取流程为:按照每个节点的延迟升序排序 -> 排除延迟太高的节点(比最近节点的延迟大15ms)-> 随机返回一个符合条件的节点。
<code>case ReadPreference::SecondaryOnly:</code>
<code>case ReadPreference::Nearest: {</code>
<code> BSONForEach(tagElem, criteria.tags.getTagBSON()) {</code>
<code> uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());</code>
<code> BSONObj tag = tagElem.Obj();</code>
<code> std::vector<const Node*> matchingNodes;</code>
<code> for (size_t i = 0; i < nodes.size(); i++) {</code>
<code> // 如果是SecondaryOnly模式,需要进行过滤</code>
<code> if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag)) {</code>
<code> matchingNodes.push_back(&nodes[i]);</code>
<code> }</code>
<code> }</code>
<code> // don't do more complicated selection if not needed</code>
<code> if (matchingNodes.empty())</code>
<code> continue;</code>
<code> if (matchingNodes.size() == 1)</code>
<code> return matchingNodes.front()->host;</code>
<code> // order by latency and don't consider hosts further than a threshold from the</code>
<code> // closest.</code>
<code> // 对候选节点按延迟进行排序</code>
<code> std::sort(matchingNodes.begin(), matchingNodes.end(), compareLatencies);</code>
<code> for (size_t i = 1; i < matchingNodes.size(); i++) {</code>
<code> int64_t distance =</code>
<code> matchingNodes[i]->latencyMicros - matchingNodes[0]->latencyMicros;</code>
<code> if (distance >= latencyThresholdMicros) {</code>
<code> // this node and all remaining ones are too far away</code>
<code> // 剔除延迟超过阈值(默认15ms,可配置)的节点</code>
<code> matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());</code>
<code> break;</code>
<code> // of the remaining nodes, pick one at random (or use round-robin)</code>
<code> if (ReplicaSetMonitor::useDeterministicHostSelection) {</code>
<code> // only in tests</code>
<code> return matchingNodes[roundRobin++ % matchingNodes.size()]->host;</code>
<code> } else {</code>
<code> // normal case</code>
<code> // 从剩余的候选节点中,随机选取一个返回</code>
<code> return matchingNodes[rand.nextInt32(matchingNodes.size())]->host;</code>
<code> };</code>
<code> }</code>
<code> return HostAndPort();</code>
<code>}</code>
可以注意到mongos代码中有一个 <code>默认的15ms配置</code>,含义为:如果有一个节点的延迟比最近节点的延迟还要大15ms,则认为这个节点不应该被nearest策略选中。但是15ms并不是对每一个业务都合理。如果业务对延迟非常敏感,可以根据自己的需要来进行设置方法是在mongos配置文件中添加下面配置选项:
mgo driver <code>每隔15秒</code>会通过 <code>ping命令</code>采集自己到mongod节点的网络延迟状况,并将最近6次采集结果的最大值作为当前网络延迟的参考值。代码如下所示:
<code>for {</code>
<code> if loop {</code>
<code> time.Sleep(delay) // 每隔一段时间(默认15秒)采集一次</code>
<code> op := op</code>
<code> socket, _, err := server.AcquireSocket(0, delay)</code>
<code> if err == nil {</code>
<code> start := time.Now()</code>
<code> _, _ = socket.SimpleQuery(&op) // 执行ping命令</code>
<code> delay := time.Now().Sub(start) // 并统计耗时</code>
<code> server.pingWindow[server.pingIndex] = delay</code>
<code> server.pingIndex = (server.pingIndex + 1) % len(server.pingWindow)</code>
<code> server.pingCount++</code>
<code> var max time.Duration</code>
<code> for i := 0; i < len(server.pingWindow) && uint32(i) < server.pingCount; i++ {</code>
<code> if server.pingWindow[i] > max {</code>
<code> max = server.pingWindow[i] // 统计最近6次(默认)采集的最大值</code>
<code> socket.Release()</code>
<code> server.Lock()</code>
<code> if server.closed {</code>
<code> loop = false</code>
<code> server.pingValue = max // 将最大值作为网络延迟统计,作为后续选择节点时的评估依据</code>
<code> server.Unlock()</code>
<code> logf("Ping for %s is %d ms", server.Addr, max/time.Millisecond)</code>
<code> } else if err == errServerClosed {</code>
<code> return</code>
<code> if !loop {</code>
和mongos相同,会排除延迟太高(>15ms)的节点。但是区别在于不是随机返回一个满足条件的节点,而是尽量返回当前压力比较小的节点(通过当前使用的连接数来判定),这样可以尽量做到负载均衡。代码如下所示:
官方go driver <code>每隔10秒</code>会通过 <code>isMaster</code>命令采集自己到mongod节点的网络延迟状况:
<code>now := time.Now() // 开始统计耗时</code>
<code>// 去对应的节点上执行isMaster命令</code>
<code>isMasterCmd := &command.IsMaster{Compressors: s.cfg.compressionOpts}</code>
<code>isMaster, err := isMasterCmd.RoundTrip(ctx, conn)</code>
<code>...</code>
<code>delay := time.Since(now) // 得到耗时统计</code>
<code>desc = description.NewServer(s.address, isMaster).SetAverageRTT(s.updateAverageRTT(delay)) // 进行平滑统计</code>
采集完成后,会结合历史数据进行平滑统计,如下:
以Find命令为例,go driver会生成一个 <code>复合选择器</code>,复合选择器会依次执行各项选择算法,得到一个候选节点列表:
其中对于节点延迟的选择主要依赖于 <code>LatencySelector</code>。大致流程为:统计到所有节点的最小延迟min-->计算延迟满足标准:min+15ms(默认)-->返回所有满足延迟标准的节点列表。核心代码如下:
<code>func (ls *latencySelector) SelectServer(t Topology, candidates []Server) ([]Server, error) {</code>
<code> if ls.latency < 0 {</code>
<code> return candidates, nil</code>
<code> switch len(candidates) {</code>
<code> case 0, 1:</code>
<code> default:</code>
<code> min := time.Duration(math.MaxInt64)</code>
<code> for _, candidate := range candidates {</code>
<code> if candidate.AverageRTTSet { // 计算所有候选节点的最小延迟</code>
<code> if candidate.AverageRTT < min {</code>
<code> min = candidate.AverageRTT</code>
<code> }</code>
<code> if min == math.MaxInt64 {</code>
<code> return candidates, nil</code>
<code> // 用最小延迟加阈值配置(默认15ms)作为最大容忍延迟</code>
<code> max := min + ls.latency</code>
<code> var result []Server</code>
<code> if candidate.AverageRTTSet {</code>
<code> if candidate.AverageRTT <= max {</code>
<code> // 返回所有符合延迟标准(最大容忍延迟)的节点</code>
<code> result = append(result, candidate)</code>
<code> return result, nil</code>
最后根据选择得到的候选列表,随机返回一个正常节点作为目标节点。核心代码如下:
<code> // 根据前面介绍的“复合选择器”,得到候选节点列表</code>
<code> suitable, err := t.selectServer(ctx, sub.C, ss, ssTimeoutCh)</code>
<code> if err != nil {</code>
<code> return nil, err</code>
<code> // 随机选择一个作为目标节点</code>
<code> selected := suitable[rand.Intn(len(suitable))]</code>
<code> selectedS, err := t.FindServer(selected)</code>
<code> switch {</code>
<code> case err != nil:</code>
<code> case selectedS != nil:</code>
<code> return selectedS, nil</code>
<code> // We don't have an actual server for the provided description.</code>
<code> // This could happen for a number of reasons, including that the</code>
<code> // server has since stopped being a part of this topology, or that</code>
<code> // the server selector returned no suitable servers.</code>
关于上述15ms的默认配置,官方go driver也提供了设置接口。对于延迟敏感的业务,可以通过这个接口配置ClientOptions,降低阈值。
4. 总结
MongoDB通过nearest模式支持多机房部署场景中客户端driver->mongod以及mongos->mongod的就近读。本文结合腾讯云MongoDB内核代码和常用的go driver代码对nearest的原理进行分析,并给出了一些使用建议。