天天看点

【Elasticsearch选主流程】

Discovery模块负责发现集群中的节点,以及选择主节点。ES支持多种不同的Discovery选择,内置的实现称为Zen Discovery,其封装了节点发现(ping)、选主等实现过程。本文基于ES 6.7。

这三种Node你晓得伐

【Elasticsearch选主流程】
Node Name Node Role 看不懂英文? 配置(其他配成false)
Master-eligible node A node that has node.master set to true (default), which makes it eligible to be elected as the master node, which controls the cluster. 主节点:负责集群层面的相关操作,管理集群变更。尽可能做少量的工作,生产环境应该尽量分离主节点和数据节点。 node.master: true
Data node A node that has node.data set to true (default). Data nodes hold data and perform data related operations such as CRUD, search, and aggregations. 数据节点:负责保存数据、执行数据相关操作:CRUD、搜索、聚合等。对CPU、内存、IO要求较高。一般情况下,数据读写流程只和数据节点交互,不会和主节点打交道(异常情况除外)。 node.data: true
Ingest node A node that has node.ingest set to true (default). Ingest nodes are able to apply an ingest pipeline to a document in order to transform and enrich the document before indexing. With a heavy ingest load, it makes sense to use dedicated ingest nodes and to mark the master and data nodes as node.ingest: false. 预处理节点:从5.0引入的概念。通过定义一系列的processors处理器和pipeline管道,对数据进行某种转换、富化。 node.ingest: true

Master节点的特殊性

ES中有一项工作是Master独有的:维护集群状态。集群状态信息,只由Master节点进行维护,并且同步到集群中所有节点,其他节点只负责接收从Master同步过来的集群信息而没有维护的权利。集群状态包括以下信息:

  • 集群层面的配置
  • 集群内有哪些节点
  • 各索引的设置,映射,分析器和别名等
  • 索引内各分片所在的节点位置
【思维拓展】ES集群中的每个节点都会存储集群状态,知道索引内各分片所在的节点位置,因此在整个集群中的任意节点都可以知道一条数据该往哪个节点分片上存储。反之也知道该去哪个分片读。所以,Elasticsearch不需要将读写请求发送到Master节点,任何节点都可以作为数据读写的切入点对请求进行响应。这样进一步减轻了Master节点的网络压力,同时提高了集群的整体路由性能。

主从模式 VS. 无主模式

分布式系统的集群方式大致可以分为主从模式(Master-Slave)和无主模式。

模式 代表组件 优点 缺点
主从模式 ES/HDFS/HBase 简化系统设计,Master作为权威节点,负责维护集群原信息。 Master节点存在单点故障,需要解决在被问题,并且集群规模会受限于Master节点的管理能力。
无主模式 Cassandra 分布式哈希表(DHT),支持每小时数千个节点的离开和加入。集群没有master的概念,所有节点都是同样的角色,彻底避免了整个系统的单点问题导致的不稳定性。 多个节点可能操作同一条数据,数据一致性上可能比较难以保证。

为什么主从模式更适合ES

ES的典型场景中的节点数没有那么多(目前官方推荐是几百节点)。一般情况下,节点的数量远远小于单个节点能够维护的连接数,并且网络环境下不必经常处理节点的加入和离开。这就是为什么主从模式更加适合ES。

主节点的选举机制

【举个栗子】通常一个HBase集群存在多个HMaster节点(有资格成为Active HMaster),每个节点都会向ZooKeeper注册,在正常情况下有且仅有一个节点会成为Active Master,其余都为Backup Master。它们将一直处于阻塞状态,直至/hbase/master节点发生delete事件,当Zookeeper Watcher监听到此事件,回唤醒阻塞的Backup Master再次去/master节点注册,如果注册成功就会成为Active HMaster,对外提供服务;如果注册失败,说明已经有节点注册成功,就只能再次阻塞等待被唤醒。

Elasticsearch不像Solr,HDFS和HBase依赖于ZooKeeper,Elasticsearch自己有一套选举机制来保证集群的协同服务。

  1. Bully算法

    Leader选举的基本算法之一,优点是易于实现,该算法和Solr Leader Shard选举非常相似。

    该算法假定所有节点都有一个唯一的ID,使用该ID对节点进行排序,选择最小的节点作为Master。参考ElectMasterService的函数electMaster

    但是节点处于不稳定状态下会出问题,比如Master负载过重而假死(推迟选举解决假死 + 法定得票过半解决脑裂)。

  • 防止脑裂、防止数据丢失的极其重要的参数:

    discovery.zen.minimum_master_nodes=(master_eligible_nodes)/2+1

  • 这个参数的实际作用早已超越了其表面的含义(那建议换一个更霸气侧漏的名字以彰显其重要性),会用于至少以下多个重要时机的判断:

    1. 触发选主:进入选举临时的Master之前,参选的节点数需要达到法定人数。

    2. 决定Master:选出临时的Master之后,得票数需要达到法定人数,才确认选主成功。

    3. gateway选举元信息:向有Master资格的节点发起请求,获取元数据,获取的响应数量必须达到法定人数,也就是参与元信息选举的节点数。

    4. Master发布集群状态:成功向节点发布集群状态信息的数量要达到法定人数。

    5. NodesFaultDetection事件中是否触发rejoin:当发现有节点连不上时,会执行removeNode。接着审视此时的法定人数是否达标(discovery.zen.minimum_master_nodes),不达标就主动放弃Master身份执行rejoin以避免脑裂。

  • Master扩容场景:目前有3个master_eligible_nodes,可以配置quorum为2。如果将master_eligible_nodes扩容到4个,那么quorum就要提高到3。此时需要先把discovery.zen.minimum_master_nodes配置设置为3,再扩容Master节点。这个配置可以动态设置:

    PUT /_cluster/settings

    {

    “persistent”: {

    “discovery.zen.minimum_master_nodes”: 3

    }

    }

  • Master减容场景:缩容与扩容是完全相反的流程,需要先缩减Master节点,再把quorum数降低。
  • 修改Master以及集群相关的配置一定要非常谨慎!配置错误很有可能导致脑裂,甚至数据写坏、数据丢失等场景。
  • 注意:最新版本ES 7已经移除minimum_master_nodes配置,让Elasticsearch自己选择可以形成仲裁的节点。
  1. Paxos算法

    非常强大,选举的灵活性比简单的Bully算法有很大的优势。但Paxos实现起来非常复杂。

流程解析

【Elasticsearch选主流程】

【举个栗子】节点启动场景

Node.java -> start()

ZenDiscovery.java -> startInitialJoin() -> innerJoinCluster()

  1. ping所有节点,并获取PingResponse返回结果(findMaster)
  2. 过滤出具有Master资格的节点(filterPingResponses)
  3. 选出临时Master。根据PingResponse结果构建两个列表:activeMasters和masterCandidates。

    – 如果activeMasters非空,则从activeMasters中选择最合适的作为Master;

    – 如果activeMasters为空,则从masterCandidates中选举,结果可能选举成功,也可能选举失败。

  4. 判断临时Master是否是本节点。

    – 如果临时Master是本节点:则等待其他节点选我,默认30秒超时,成功的话就发布新的clusterState。(当选总统候选人,只等选票过半了)

    – 如果临时Master是其他节点:则不再接受其他节点的join请求,并向Master节点发送加入请求。(没资格选举,就只能送人头了)

private DiscoveryNode findMaster() {
	logger.trace("starting to ping");
	List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList(); // ping所有节点,并获取返回结果
	if (fullPingResponses == null) {
		logger.trace("No full ping responses");
		return null;
	}
	if (logger.isTraceEnabled()) {
		StringBuilder sb = new StringBuilder();
		if (fullPingResponses.size() == 0) {
			sb.append(" {none}");
		} else {
			for (ZenPing.PingResponse pingResponse : fullPingResponses) {
				sb.append("\n\t--> ").append(pingResponse);
			}
		}
		logger.trace("full ping responses:{}", sb);
	}

	final DiscoveryNode localNode = transportService.getLocalNode();

	// add our selves
	assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
		.filter(n -> n.equals(localNode)).findAny().isPresent() == false;

	fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

	// filter responses 选出具有Master资格的节点
	final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

	List<DiscoveryNode> activeMasters = new ArrayList<>();
	for (ZenPing.PingResponse pingResponse : pingResponses) {
		// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
		// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
		if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
			activeMasters.add(pingResponse.master());
		}
	}

	// nodes discovered during pinging
	List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
	for (ZenPing.PingResponse pingResponse : pingResponses) {
		if (pingResponse.node().isMasterNode()) {
			masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
		}
	}

	if (activeMasters.isEmpty()) {
		if (electMaster.hasEnoughCandidates(masterCandidates)) {
			final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
			logger.trace("candidate {} won election", winner);
			return winner.getNode();
		} else {
			// if we don't have enough master nodes, we bail, because there are not enough master to elect from
			logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
						masterCandidates, electMaster.minimumMasterNodes());
			return null;
		}
	} else {
		assert !activeMasters.contains(localNode) :
			"local node should never be elected as master when other nodes indicate an active master";
		// lets tie break between discovered nodes
		return electMaster.tieBreakActiveMasters(activeMasters);
	}
}
           
/**
 * the main function of a join thread. This function is guaranteed to join the cluster
 * or spawn a new join thread upon failure to do so.
 */
private void innerJoinCluster() {
	DiscoveryNode masterNode = null;
	final Thread currentThread = Thread.currentThread();
	nodeJoinController.startElectionContext();
	while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
		masterNode = findMaster();
	}

	if (!joinThreadControl.joinThreadActive(currentThread)) {
		logger.trace("thread is no longer in currentJoinThread. Stopping.");
		return;
	}

	if (transportService.getLocalNode().equals(masterNode)) { // 如果是本节点当选为Master
		final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
		logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
		nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout, // (1)等待其他节点的投票数超过requiredJoins数(即为discovery.zen.minimum_master_nodes配置数)。
				new NodeJoinController.ElectionCallback() {
					@Override
					public void onElectedAsMaster(ClusterState state) { // (2)成功选举自己为Master之后,发送集群状态到所有节点
						synchronized (stateMutex) {
							joinThreadControl.markThreadAsDone(currentThread);
						}
					}

					@Override
					public void onFailure(Throwable t) { // (3)如果等待超时后投票数没有超过半数,则认为选举失败,重新开始
						logger.trace("failed while waiting for nodes to join, rejoining", t);
						synchronized (stateMutex) {
							joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
						}
					}
				}

		);
	} else { // 如果是其他节点当选为Master
		// process any incoming joins (they will fail because we are not the master)
		nodeJoinController.stopElectionContext(masterNode + " elected"); // (1)不再接受其他节点的join请求。

		// send join request
		final boolean success = joinElectedMaster(masterNode); // (2)向Master发送请求,申请加入集群。最终当选的Master会先发布集群状态,才确认客户的join请求。

		synchronized (stateMutex) {
			if (success) {
				DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
				if (currentMasterNode == null) { // 检查收到的集群状态中的Master节点如果为空,则重新选举。
					// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
					// a valid master.
					logger.debug("no master node is set, despite of join request completing. retrying pings.");
					joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
				} else if (currentMasterNode.equals(masterNode) == false) { // 检查当选的Master是不是之前选择的节点,不符合的话则重新选举。
					// update cluster state
					joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
				}

				joinThreadControl.markThreadAsDone(currentThread);
			} else { // 获取集群状态,如果集群状态中与选择的Master不一致,则重新开始
				// failed to join. Try again...
				joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
			}
		}
	}
}
           
/**
 * checks if there is an on going request to become master and if it has enough pending joins. If so, the node will
 * become master via a ClusterState update task.
 */
private synchronized void checkPendingJoinsAndElectIfNeeded() { // waitToBeElectedAsMaster等待时间结束,检查投票数是否足够。
	assert electionContext != null : "election check requested but no active context";
	final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
	if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) { // 选票不够,需要进行新一轮选举。
		if (logger.isTraceEnabled()) {
			logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
				electionContext.requiredMasterJoins);
		}
	} else { // 票数过半,即将成为Master。
		if (logger.isTraceEnabled()) {
			logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
				electionContext.requiredMasterJoins);
		}
		electionContext.closeAndBecomeMaster();
		electionContext = null; // clear this out so future joins won't be accumulated
	}
}
           
public synchronized void addIncomingJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
	ensureOpen();
	joinRequestAccumulator.computeIfAbsent(node, n -> new ArrayList<>()).add(callback);
}

// 查看投票数是否已经足够,标准是达到requiredMasterJoins数(即为discovery.zen.minimum_master_nodes配置数)
public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
	final boolean hasEnough;
	if (requiredMasterJoins < 0) {
		// requiredMasterNodes is unknown yet, return false and keep on waiting
		hasEnough = false;
	} else {
		assert callback != null : "requiredMasterJoins is set but not the callback";
		hasEnough = pendingMasterJoins >= requiredMasterJoins;
	}
	return hasEnough;
}

private Map<DiscoveryNode, ClusterStateTaskListener> getPendingAsTasks() {
	Map<DiscoveryNode, ClusterStateTaskListener> tasks = new HashMap<>();
	joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger)));
	return tasks;
}

// 统计各个候选人的得票数,如果被推选为Master,则pendingMasterJoins自增1。
public synchronized int getPendingMasterJoinsCount() {
	int pendingMasterJoins = 0;
	for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
		if (node.isMasterNode()) {
			pendingMasterJoins++;
		}
	}
	return pendingMasterJoins;
}
           

节点失效检测

选主流程之后不可或缺的步骤,不执行失效检测可能会产生脑裂现象。

定期(默认为1s)发送ping请求探测节点是否正常,当失败达到一定次数(默认为3次),或者收到节点的离线通知时,开始处理节点离开事件。

我们需要启动两种失效探测器:

  • NodesFaultDetection:在Master节点启动,简称NodesFD。定期探测加入集群的节点是否活跃。当有节点连不上时,会执行removeNode。然后需要审视此时的法定人数是否达标(没错!老坛酸菜牛肉面,仍然是那个熟悉的配方:discovery.zen.minimum_master_nodes),不达标就主动放弃Master身份执行rejoin以避免脑裂。
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
	final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
	boolean removed = false;
	for (final Task task : tasks) {
		if (currentState.nodes().nodeExists(task.node())) {
			remainingNodesBuilder.remove(task.node());
			removed = true;
		} else {
			logger.debug("node [{}] does not exist in cluster state, ignoring", task);
		}
	}

	if (!removed) {
		// no nodes to remove, keep the current cluster state
		return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
	}

	final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);

	final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
	if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
		final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
		rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
												 masterNodes, electMasterService.minimumMasterNodes()));
		return resultBuilder.build(currentState);
	} else {
		ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.disassociateDeadNodes(remainingNodesClusterState);
		return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
	}
}
           
  • MasterFaultDetection:在非Master节点启动,简称MasterFD。定期探测Master节点是否活跃,Master下线则触发rejoin重新选举。
private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
	if (lifecycleState() != Lifecycle.State.STARTED) {
		// not started, ignore a master failure
		return;
	}
	if (localNodeMaster()) {
		// we might get this on both a master telling us shutting down, and then the disconnect failure
		return;
	}

	logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

	synchronized (stateMutex) {
		if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
			// flush any pending cluster states from old master, so it will not be set as master again
			pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
			rejoin("master left (reason = " + reason + ")");
		}
	}
}
           

触发选举的时机

  • 集群启动,从无主状态到产生新主时
  • 集群在正常运行过程中,Master探测到节点离开时(NodesFaultDetection)
  • 集群在正常运行过程中,非Master节点探测到Master离开时(MasterFaultDetection)

遗留问题

  1. 集群状态信息的同步方式是怎么样的?
  2. ES官方推荐几百节点 -> 我们期望大集群,怎么拓展Master的管理能力?
  3. Master扩容场景资料优化
  4. Master扩容是否会触发选主?只扩容Master节点不会触发选主,只要当前设置的法定人数不变,ES集群就认为自己的选举是合法的。
  5. 现网遇到Master长时间无法选出,根因未知。目前是靠重启所有Master节点来规避。最新版本ES 7已经移除minimum_master_nodes配置。
  6. ES实例管理界面添加Master主备显示?如果有查看Master的必要,可以加上主备信息的显示。
  7. 配置discovery.zen.minimum_master_nodes修改完之后,Master重启,需要重启普通节点吗? —— 动态生效!!!推荐!可以动态生效,为什么要重启节点呢?

Reference

Solr选主流程

ES Master机制及脑裂分析

Elasticsearch Reference [7.0] » Modules » Node

Elasticsearch 7.0中引入的新集群协调子系统如何使用?

Leader Election, Why Should I Care?

开源分布式NoSQL数据库系统——Cassandra

分布式数据库的取舍——Cassandra的选择及其后果

继续阅读