1. Spark底层通信原理(Akka)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR90drRVTwklaNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0MTNzUDOzQTM4ITNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.1 原理
- 主节点Master,从节点Worker
- 每个节点底层都是Akka原理,主从之间通信,其实就是actor之间通信。
- 老大ActorSystem
- 老大负责创建actor
- worker通信给master,需要使用master的引用,master给worker通信,直接sender。
1.2 RPC
Akka底层也是RPC原理,RPC(Remote Procedure Call)-远程过程调用,他是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。
2. Akka实现Spark通信编程-项目概述
2.1 需求
- 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常大。
- Hadoop项目底层也是RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的。在某些极端的情况下,任务提交的延迟很高,所以Hadoop的RPC显得有些笨重。
- Spark的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模式实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。
2.2 Akka简介
- Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
- Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Comutation)模型,它把Actor作为并行计算的基本元素来对象:为了响应一个接收到的消息,一个Actor能够自己做出一些决策,如创建更多的Actor,或发送更多的消息,或者确定如何让去响应接到到的下一个消息。
Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。 - Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
- 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发。
- 提供了异步非阻塞的、高性能的时间驱动编程模式
- 超轻量级事件处理(每GB堆内存几百万Actor)
3. Akka简易版Spark通信编程,实现两个进程间的通信。
3.1 重要类介绍
- ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
- 注意:
- ActorSystem是一个进程中的老大,它负责创建和监督Actor。
- ActorSystem是一个单例对象
- Actor负责通讯。
3.2 Actor
在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
- **preStart()方法:**该方法在Actor对象构造执行后执行,整个Actor生命周期仅执行一次。
- **receive()方法:**该方法子Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
3.3 具体代码
- Master类
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
/**
* spark的rpc通信底层是Akka, akka的底层就是actor模型(并行计算)
* Master
* 1. 继承Actor
* 2. 重写preStart()方法
* 重写receive()方法, 接收消息
* 3. 半生对象中创建ActorSystem老大, 老大能够创建actor
*/
//1. 继承Actor
class SimpleMaster extends Actor{
println("master constructor invoked")
//2. 重写初始化方法, 这个方法只会被调用一次
override def preStart(): Unit = {
println("master 初始化方法被调用")
}
//3. 重写receive方法,接收消息,发一条消息,这个方法被调用一次
override def receive: Receive = {
//接收消息
case "connect" => {
println("master 接收到注册消息")
sender ! "register success"
}
}
}
//伴生对象中创建akka
object SimpleMaster {
def main(args: Array[String]): Unit = {
//接收参数
val host = args(0)
val port = args(1)
//准备配置文件信息
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//创建配置对象
val config: Config = ConfigFactory.parseString(configStr)
//3. 创建actorSystem入口类, 老大
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem",config)
//通过老大创建actor对象
masterActorSystem.actorOf(Props(new SimpleMaster), "masterActor")
}
}
- Worker类
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
/**
*
*/
//1. 继承Actor
class SimpleWorker extends Actor {
println("worker 构造器别调用")
//2. 重写两个方法, 一个初始化方法,一个接收歇息的方法
override def preStart(): Unit = {
println("worker 初始化方法被调用")
//a. 需要通过context上下文对象获取到actorMaster的引用
val masterSelection: ActorSelection = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")
//b. 发送消息到master
masterSelection ! "connect"
}
override def receive: Receive = {
case "register success" => {
println("worker 接收到master反馈的消息")
}
}
}
//3. 创建actorSystem, 创建actor
object SimpleWorker {
def main(args: Array[String]): Unit = {
//接收外界参数
val host = args(0)
val port = args(1)
//创建配置文件
val configStr: String =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过配置工厂创建配置对象
val config: Config = ConfigFactory.parseString(configStr)
//创建actorSystem老大对象
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)
//通过老大创建actor对象
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new SimpleWorker), "workerActor")
}
}
-
Worker配置启动参数
*
Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。 Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。 - 启动:先启动Master,后启动Worker。