天天看点

Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。

1. Spark底层通信原理(Akka)

Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。

1.1 原理

  • 主节点Master,从节点Worker
  • 每个节点底层都是Akka原理,主从之间通信,其实就是actor之间通信。
    • 老大ActorSystem
    • 老大负责创建actor
  • worker通信给master,需要使用master的引用,master给worker通信,直接sender。

1.2 RPC

Akka底层也是RPC原理,RPC(Remote Procedure Call)-远程过程调用,他是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

2. Akka实现Spark通信编程-项目概述

2.1 需求

  • 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常大。
  • Hadoop项目底层也是RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的。在某些极端的情况下,任务提交的延迟很高,所以Hadoop的RPC显得有些笨重。
  • Spark的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模式实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。

2.2 Akka简介

  • Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
  • Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Comutation)模型,它把Actor作为并行计算的基本元素来对象:为了响应一个接收到的消息,一个Actor能够自己做出一些决策,如创建更多的Actor,或发送更多的消息,或者确定如何让去响应接到到的下一个消息。
    Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。
  • Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
    • 提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发。
    • 提供了异步非阻塞的、高性能的时间驱动编程模式
    • 超轻量级事件处理(每GB堆内存几百万Actor)

3. Akka简易版Spark通信编程,实现两个进程间的通信。

Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。

3.1 重要类介绍

  • ActorSystem:在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
  • 注意:
    • ActorSystem是一个进程中的老大,它负责创建和监督Actor。
    • ActorSystem是一个单例对象
    • Actor负责通讯。

3.2 Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

  • **preStart()方法:**该方法在Actor对象构造执行后执行,整个Actor生命周期仅执行一次。
  • **receive()方法:**该方法子Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。

3.3 具体代码

  • Master类
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
  * spark的rpc通信底层是Akka, akka的底层就是actor模型(并行计算)
  * Master
  *   1. 继承Actor
  *   2. 重写preStart()方法
  *      重写receive()方法, 接收消息
  *   3. 半生对象中创建ActorSystem老大, 老大能够创建actor
  */
//1. 继承Actor
class SimpleMaster extends Actor{
  println("master constructor invoked")

  //2. 重写初始化方法, 这个方法只会被调用一次
  override def preStart(): Unit = {
    println("master 初始化方法被调用")
  }

  //3. 重写receive方法,接收消息,发一条消息,这个方法被调用一次
  override def receive: Receive = {
    //接收消息
    case "connect" => {
      println("master 接收到注册消息")

      sender ! "register success"
    }
  }
}

//伴生对象中创建akka
object SimpleMaster {
  def main(args: Array[String]): Unit = {
    //接收参数
    val host = args(0)
    val port = args(1)

    //准备配置文件信息
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //创建配置对象
    val config: Config = ConfigFactory.parseString(configStr)

    //3. 创建actorSystem入口类, 老大
    val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem",config)

    //通过老大创建actor对象
    masterActorSystem.actorOf(Props(new SimpleMaster), "masterActor")


  }
}
           
  • Worker类
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
  *
  */

//1. 继承Actor
class SimpleWorker extends Actor {
  println("worker 构造器别调用")

  //2. 重写两个方法, 一个初始化方法,一个接收歇息的方法
  override def preStart(): Unit = {
    println("worker 初始化方法被调用")
    //a. 需要通过context上下文对象获取到actorMaster的引用
    val masterSelection: ActorSelection = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")

    //b. 发送消息到master
    masterSelection ! "connect"


  }

  override def receive: Receive = {
    case "register success" => {
      println("worker 接收到master反馈的消息")
    }
  }

}

//3. 创建actorSystem, 创建actor
object SimpleWorker {
  def main(args: Array[String]): Unit = {
    //接收外界参数
    val host = args(0)
    val port = args(1)

    //创建配置文件
    val configStr: String =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    //通过配置工厂创建配置对象
    val config: Config = ConfigFactory.parseString(configStr)

    //创建actorSystem老大对象
    val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)

    //通过老大创建actor对象
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new SimpleWorker), "workerActor")

  }
}

           
  • Worker配置启动参数

    *

    Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。
    Scala实战编程、Akka实现一个简易版的spark通信编程、spark原理、Akka简介、启动Akka在IDEA中配置参数 271. Spark底层通信原理(Akka)2. Akka实现Spark通信编程-项目概述3. Akka简易版Spark通信编程,实现两个进程间的通信。
  • 启动:先启动Master,后启动Worker。