天天看點

Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程序間的通信。

1. Spark底層通信原理(Akka)

Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程式間的通信。

1.1 原理

  • 主節點Master,從節點Worker
  • 每個節點底層都是Akka原理,主從之間通信,其實就是actor之間通信。
    • 老大ActorSystem
    • 老大負責建立actor
  • worker通信給master,需要使用master的引用,master給worker通信,直接sender。

1.2 RPC

Akka底層也是RPC原理,RPC(Remote Procedure Call)-遠端過程調用,他是一種通過網絡從遠端計算機程式上請求服務,而不需要了解底層網絡技術的協定。

2. Akka實作Spark通信程式設計-項目概述

2.1 需求

  • 目前大多數的分布式架構底層通信都是通過RPC實作的,RPC架構非常大。
  • Hadoop項目底層也是RPC通信架構,但是Hadoop在設計之初就是為了運作長達數小時的批量而設計的。在某些極端的情況下,任務送出的延遲很高,是以Hadoop的RPC顯得有些笨重。
  • Spark的RPC是通過Akka類庫實作的,Akka用Scala語言開發,基于Actor并發模式實作,Akka具有高可靠、高性能、可擴充等特點,使用Akka可以輕松實作分布式RPC功能。

2.2 Akka簡介

  • Akka基于Actor模型,提供了一個用于建構可擴充的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平台。
  • Actor模型:在計算機科學領域,Actor模型是一個并行計算(Concurrent Comutation)模型,它把Actor作為并行計算的基本元素來對象:為了響應一個接收到的消息,一個Actor能夠自己做出一些決策,如建立更多的Actor,或發送更多的消息,或者确定如何讓去響應接到到的下一個消息。
    Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程式間的通信。
  • Actor是Akka中最核心的概念,它是一個封裝了狀态和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正确地并發程式和并行系統,Actor具有如下特性:
    • 提供了一種進階抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的程式設計開發。
    • 提供了異步非阻塞的、高性能的時間驅動程式設計模式
    • 超輕量級事件處理(每GB堆記憶體幾百萬Actor)

3. Akka簡易版Spark通信程式設計,實作兩個程序間的通信。

Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程式間的通信。

3.1 重要類介紹

  • ActorSystem:在Akka中,ActorSystem是一個重量級的結構,他需要配置設定多個線程,是以在實際應用中,ActorSystem通常是一個單例對象,我們可以使用這個ActorSystem建立很多Actor。
  • 注意:
    • ActorSystem是一個程序中的老大,它負責建立和監督Actor。
    • ActorSystem是一個單例對象
    • Actor負責通訊。

3.2 Actor

在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法。

  • **preStart()方法:**該方法在Actor對象構造執行後執行,整個Actor生命周期僅執行一次。
  • **receive()方法:**該方法子Actor的preStart方法執行完成後執行,用于接收消息,會被反複執行。

3.3 具體代碼

  • Master類
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
  * spark的rpc通信底層是Akka, akka的底層就是actor模型(并行計算)
  * Master
  *   1. 繼承Actor
  *   2. 重寫preStart()方法
  *      重寫receive()方法, 接收消息
  *   3. 半生對象中建立ActorSystem老大, 老大能夠建立actor
  */
//1. 繼承Actor
class SimpleMaster extends Actor{
  println("master constructor invoked")

  //2. 重寫初始化方法, 這個方法隻會被調用一次
  override def preStart(): Unit = {
    println("master 初始化方法被調用")
  }

  //3. 重寫receive方法,接收消息,發一條消息,這個方法被調用一次
  override def receive: Receive = {
    //接收消息
    case "connect" => {
      println("master 接收到注冊消息")

      sender ! "register success"
    }
  }
}

//伴生對象中建立akka
object SimpleMaster {
  def main(args: Array[String]): Unit = {
    //接收參數
    val host = args(0)
    val port = args(1)

    //準備配置檔案資訊
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //建立配置對象
    val config: Config = ConfigFactory.parseString(configStr)

    //3. 建立actorSystem入口類, 老大
    val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem",config)

    //通過老大建立actor對象
    masterActorSystem.actorOf(Props(new SimpleMaster), "masterActor")


  }
}
           
  • Worker類
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

/**
  *
  */

//1. 繼承Actor
class SimpleWorker extends Actor {
  println("worker 構造器别調用")

  //2. 重寫兩個方法, 一個初始化方法,一個接收歇息的方法
  override def preStart(): Unit = {
    println("worker 初始化方法被調用")
    //a. 需要通過context上下文對象擷取到actorMaster的引用
    val masterSelection: ActorSelection = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")

    //b. 發送消息到master
    masterSelection ! "connect"


  }

  override def receive: Receive = {
    case "register success" => {
      println("worker 接收到master回報的消息")
    }
  }

}

//3. 建立actorSystem, 建立actor
object SimpleWorker {
  def main(args: Array[String]): Unit = {
    //接收外界參數
    val host = args(0)
    val port = args(1)

    //建立配置檔案
    val configStr: String =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    //通過配置工廠建立配置對象
    val config: Config = ConfigFactory.parseString(configStr)

    //建立actorSystem老大對象
    val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)

    //通過老大建立actor對象
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new SimpleWorker), "workerActor")

  }
}

           
  • Worker配置啟動參數

    *

    Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程式間的通信。
    Scala實戰程式設計、Akka實作一個簡易版的spark通信程式設計、spark原理、Akka簡介、啟動Akka在IDEA中配置參數 271. Spark底層通信原理(Akka)2. Akka實作Spark通信程式設計-項目概述3. Akka簡易版Spark通信程式設計,實作兩個程式間的通信。
  • 啟動:先啟動Master,後啟動Worker。