天天看点

Spark源码分析之BlockManager通信机制一BlockManagerMasterEndpoint二BlockManagerSlaveEndpoint

BlockManagerMasterEndpoint主要用于向BlockManagerSlaveEndpoint发送消息,主要分析他们都接受哪些消息,接受到消息之后怎么处理?

一BlockManagerMasterEndpoint

首先它维护了3个重要映射:

维护一个<BlockManagerId,BlockManagerInfo>的映射

维护一个<ExecuotorId,BlockManagerId>的映射

维护一个<BlockId,Set<BlockManagerId>>映射,多个Block Manager Id包含这个blockId

1.1receiveAndReply接收消息

//接收消息并返回结果

override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {

  // 注册BlockManager

  case RegisterBlockManager(blockManagerId,maxMemSize, slaveEndpoint) =>

    register(blockManagerId,maxMemSize, slaveEndpoint)

    context.reply(true)

  // 更新block信息

  case _updateBlockInfo@ UpdateBlockInfo(

    blockManagerId, blockId, storageLevel, deserializedSize,size, externalBlockStoreSize) =>

    context.reply(updateBlockInfo(

      blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))

    listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))

  // 根据blockId获取对应的所有BlockManagerId列表

  case GetLocations(blockId) =>

    context.reply(getLocations(blockId))

  // 根据指定的blockId列表,返回多个blockId对应的BlockManagerId集合

  case GetLocationsMultipleBlockIds(blockIds) =>

    context.reply(getLocationsMultipleBlockIds(blockIds))

 // 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId

  case GetPeers(blockManagerId) =>

    context.reply(getPeers(blockManagerId))

  // 根据executorId获取RPC远程主机和端口号

  case GetRpcHostPortForExecutor(executorId) =>

    context.reply(getRpcHostPortForExecutor(executorId))

  // 获取内存状态

  case GetMemoryStatus=>

    context.reply(memoryStatus)

  // 获取存储状态

  case GetStorageStatus=>

    context.reply(storageStatus)

  // 返回所有block manager的block状态

  case GetBlockStatus(blockId,askSlaves) =>

    context.reply(blockStatus(blockId,askSlaves))

  // 获取与过滤条件相匹配的blockId

  case GetMatchingBlockIds(filter,askSlaves) =>

    context.reply(getMatchingBlockIds(filter,askSlaves))

  // 删除指定rdd对应的所有blocks

  case RemoveRdd(rddId) =>

    context.reply(removeRdd(rddId))

  // 删除该shuffle对应的所有block

  case RemoveShuffle(shuffleId) =>

    context.reply(removeShuffle(shuffleId))

  // 删除广播数据对应的block

  case RemoveBroadcast(broadcastId,removeFromDriver) =>

    context.reply(removeBroadcast(broadcastId,removeFromDriver))

  //  从worker节点(slave节点)删除对应block

  case RemoveBlock(blockId) =>

    removeBlockFromWorkers(blockId)

    context.reply(true)

  // 试图从BlockManagerMaster移除掉这个Executor

  case RemoveExecutor(execId) =>

    removeExecutor(execId)

    context.reply(true)

  // 停止StopBlockManagerMaster消息

  case StopBlockManagerMaster=>

    context.reply(true)

    stop()

  // 发送BlockManager心跳检测消息

  case BlockManagerHeartbeat(blockManagerId) =>

    context.reply(heartbeatReceived(blockManagerId))

  // 判断executorId对应的BlockManager是否有缓存的block

  case HasCachedBlocks(executorId) =>

    blockManagerIdByExecutor.get(executorId)match {

      case Some(bm) =>

        if (blockManagerInfo.contains(bm)) {

          val bmInfo= blockManagerInfo(bm)

          context.reply(bmInfo.cachedBlocks.nonEmpty)

        } else {

          context.reply(false)

        }

      case None => context.reply(false)

    }

}

1.2removeRdd 删除该rdd对应的所有block

首先删除和该rdd相关的元数据信息;然后再向BlockManager从节点发送RemoveRdd进行具体的删除

private def removeRdd(rddId: Int): Future[Seq[Int]] = {
  // 将所有可以转化为rdd的blockId转化为rddId,然后过滤出和当前指定rddId相等的blocks
  val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
  // 遍历和该rdd的blocks,从该block对应的BlockManager中删除该block
  // 并且blockLocations也要移除这个block
  blocks.foreach { blockId =>
    val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
    bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
    blockLocations.remove(blockId)
  }

  // 然后通过BlockManagerSlaveEndpoint向slave发送RemoveRdd消息
  val removeMsg = RemoveRdd(rddId)
  Future.sequence(
    blockManagerInfo.values.map { bm =>
      bm.slaveEndpoint.ask[Int](removeMsg)
    }.toSeq
  )
}      

1.3removeShuffle

只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的block

private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
  // 只是向slave发送RemoveShuffle消息,让slave去删除shuffle相关的block
  val removeMsg = RemoveShuffle(shuffleId)
  Future.sequence(
    blockManagerInfo.values.map { bm =>
      bm.slaveEndpoint.ask[Boolean](removeMsg)
    }.toSeq
  )
}      

1.4removeBlockManager 删除BlockManager

private def removeBlockManager(blockManagerId: BlockManagerId) {
  // 根据blockManaerId获取BlockInfo
  val info = blockManagerInfo(blockManagerId)

  // 从<ExecutorId,BlockManagerId>中移除diaper该block manager对应的executorId
  blockManagerIdByExecutor -= blockManagerId.executorId

  // 从<BlockManagerId,BlockMangerInfo>中移除掉这个BlockManager
  blockManagerInfo.remove(blockManagerId)
  // 遍历该BlockManager所对应的所有block
  val iterator = info.blocks.keySet.iterator
  while (iterator.hasNext) {
    // 获取每一个blockId
    val blockId = iterator.next
    // 从<BlockId,Set<BlockManagerId>>映射中得到该block所对应的所有BlockManager
    val locations = blockLocations.get(blockId)
    // 所有BlockManager中移除当前要移除的blockManagerId
    locations -= blockManagerId
    // 移除完了之后,Set<BlockManagerId>大小,如果没有数据了,则表示没有对应的
    // BlockManger与之对应,我们应该从<BlockId,Set<BlockManagerId>>移除这个blockId
    if (locations.size == 0) {
      blockLocations.remove(blockId)
    }
  }
  listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
  logInfo(s"Removing block manager $blockManagerId")
}      

1.5removeBlockFromWorkers 从worker节点(slave节点)删除对应block

private def removeBlockFromWorkers(blockId: BlockId) {
  // 获取该block所在的那些BlockManagerId的列表
  val locations = blockLocations.get(blockId)
  if (locations != null) {
    // 遍历blockManagerId列表,然后获取每一个blockManagerId对应的BlockManager
    // 如果这个BlockManager已经定义,则向slave节点发送RemoveBlock消息
    locations.foreach { blockManagerId: BlockManagerId =>
      val blockManager = blockManagerInfo.get(blockManagerId)
      if (blockManager.isDefined) {
        blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
      }
    }
  }
}      

1.6blockStatus 返回所有block manager的block状态

private def blockStatus(blockId: BlockId,
    askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
  // 创建GetBlockStatus对象
  val getBlockStatus = GetBlockStatus(blockId)
  // 遍历注册过的BlockManagerInfo,如果需要向slave查询,则向BlockManagerSlaveEndpoint发送BlockStatus消息
  // 否则将返回结果封装Future中,最后将结果转化成Map[BlockManagerId, Future[Option[BlockStatus]]]
  blockManagerInfo.values.map { info =>
    val blockStatusFuture =
      if (askSlaves) {
        info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
      } else {
        Future { info.getStatus(blockId) }
      }
    (info.blockManagerId, blockStatusFuture)
  }.toMap
}      

1.7register 注册

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
  val time = System.currentTimeMillis()
  // 如果还没有被注册
  if (!blockManagerInfo.contains(id)) {
    // 获取该executor对应的BlockManagerId
    blockManagerIdByExecutor.get(id.executorId) match {
      // 但是该block对应的executor已经有对应的BlockManager,则表示是旧的BlockManager,则把该Executor删除掉
      case Some(oldId) =>
        logError("Got two different block manager registrations on same executor - "
            + s" will replace old one $oldId with new one $id")
        // 从内存中移除该Executor以及Executor对应的BlockManager
        removeExecutor(id.executorId)
      case None =>
    }
    logInfo("Registering block manager %s with %s RAM, %s".format(
      id.hostPort, Utils.bytesToString(maxMemSize), id))
    // <ExecuotorId,BlockManagerId> 映射加入这个BlockManagerId
    blockManagerIdByExecutor(id.executorId) = id
    // 创建BlockManagerInfo,加入到<BlockManagerId, BlockManagerInfo>
    blockManagerInfo(id) = new BlockManagerInfo(
      id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
  }
  listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}      

1.8updateBlockInfo 更新数据块信息

private def updateBlockInfo(
    blockManagerId: BlockManagerId,
    blockId: BlockId,
    storageLevel: StorageLevel,
    memSize: Long,
    diskSize: Long,
    externalBlockStoreSize: Long): Boolean = {
  // 如果该blockManagerId还没有注册,则返回
  if (!blockManagerInfo.contains(blockManagerId)) {
    // 如果blockManagerId是driver上的BlockManager而且又不在本地,意思就是这个BlockManager是其他节点的
    if (blockManagerId.isDriver && !isLocal) {
      // We intentionally do not register the master (except in local mode),
      // so we should not indicate failure.
      return true
    } else {
      return false
    }
  }
  // 如果没有block,也不用更新block,所以返回
  if (blockId == null) {
    blockManagerInfo(blockManagerId).updateLastSeenMs()
    return true
  }
  // 调用BlockManagerInfo的updateBlockInfo方法,更新block
  blockManagerInfo(blockManagerId).updateBlockInfo(
    blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)

  var locations: mutable.HashSet[BlockManagerId] = null
  // 如果blockLocations包含blockId,则获取block对应的所有BlockManager集合,否则创建空的集合
  // 然后更新blockLocations集合
  if (blockLocations.containsKey(blockId)) {
    locations = blockLocations.get(blockId)
  } else {
    locations = new mutable.HashSet[BlockManagerId]
    blockLocations.put(blockId, locations)
  }
  // 存储级别有效,则向block对应的BlockManger集合里添加该blockManagerId
  // 如果无效,则移除之
  if (storageLevel.isValid) {
    locations.add(blockManagerId)
  } else {
    locations.remove(blockManagerId)
  }

  // 如果block对应的BlockManger集合为空,则没有BlockManager与之对应,则从blockLocations删除这个blockId
  if (locations.size == 0) {
    blockLocations.remove(blockId)
  }
  true
}      

1.9 getPeers 获取指定的blockManagerId是Executor的BlockManager,且不包括指定blockManagerId

private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
  // 获取所有BlockManagerId集合
  val blockManagerIds = blockManagerInfo.keySet
  // 如果包含指定的blockManagerId
  if (blockManagerIds.contains(blockManagerId)) {
    // 得到Executor的BlockManager,再得到和当前blockManagerId不相等的BlockMangerId集合
    blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
  } else {
    Seq.empty
  }
}      

二BlockManagerSlaveEndpoint

接收BlockManagerMasterEndpoint发送过来的指令,然后执行该指令

2.1 receiveAndReply接受消息

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // 接收master发送过来的RemoveBlock消息
  case RemoveBlock(blockId) =>
    doAsync[Boolean]("removing block " + blockId, context) {
      // 调用BlockManager删除block
      blockManager.removeBlock(blockId)
      true
    }
  // 接收master发送过来的RemoveRdd消息
  case RemoveRdd(rddId) =>
    doAsync[Int]("removing RDD " + rddId, context) {
      // 调用BlockManager删除rdd对应的block
      blockManager.removeRdd(rddId)
    }
  // 接收master发送过来的RemoveShuffle消息
  case RemoveShuffle(shuffleId) =>
    doAsync[Boolean]("removing shuffle " + shuffleId, context) {
      // 首先需要调用MapOutputTracker取消shuffleId的注册的
      if (mapOutputTracker != null) {
        mapOutputTracker.unregisterShuffle(shuffleId)
      }
      // 删除shuffle的元数据
      SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
    }
  // 接收master发送过来的RemoveBroadcast消息
  case RemoveBroadcast(broadcastId, _) =>
    doAsync[Int]("removing broadcast " + broadcastId, context) {
      // 调用BlockManagerd的removeBroadcast
      blockManager.removeBroadcast(broadcastId, tellMaster = true)
    }
  // 接收消息GetBlockStatus,调用blockManager的getStatus
  case GetBlockStatus(blockId, _) =>
    context.reply(blockManager.getStatus(blockId))
  // 接收GetMatchingBlockIds消息调用blockManager的getMatchingBlockIds方法
  case GetMatchingBlockIds(filter, _) =>
    context.reply(blockManager.getMatchingBlockIds(filter))
}      

继续阅读