1. Spark底層通信原理(Akka)
1.1 原理
- 主節點Master,從節點Worker
- 每個節點底層都是Akka原理,主從之間通信,其實就是actor之間通信。
- 老大ActorSystem
- 老大負責建立actor
- worker通信給master,需要使用master的引用,master給worker通信,直接sender。
1.2 RPC
Akka底層也是RPC原理,RPC(Remote Procedure Call)-遠端過程調用,他是一種通過網絡從遠端計算機程式上請求服務,而不需要了解底層網絡技術的協定。
2. Akka實作Spark通信程式設計-項目概述
2.1 需求
- 目前大多數的分布式架構底層通信都是通過RPC實作的,RPC架構非常大。
- Hadoop項目底層也是RPC通信架構,但是Hadoop在設計之初就是為了運作長達數小時的批量而設計的。在某些極端的情況下,任務送出的延遲很高,是以Hadoop的RPC顯得有些笨重。
- Spark的RPC是通過Akka類庫實作的,Akka用Scala語言開發,基于Actor并發模式實作,Akka具有高可靠、高性能、可擴充等特點,使用Akka可以輕松實作分布式RPC功能。
2.2 Akka簡介
- Akka基于Actor模型,提供了一個用于建構可擴充的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平台。
- Actor模型:在計算機科學領域,Actor模型是一個并行計算(Concurrent Comutation)模型,它把Actor作為并行計算的基本元素來對象:為了響應一個接收到的消息,一個Actor能夠自己做出一些決策,如建立更多的Actor,或發送更多的消息,或者确定如何讓去響應接到到的下一個消息。
- Actor是Akka中最核心的概念,它是一個封裝了狀态和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正确地并發程式和并行系統,Actor具有如下特性:
- 提供了一種進階抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的程式設計開發。
- 提供了異步非阻塞的、高性能的時間驅動程式設計模式
- 超輕量級事件處理(每GB堆記憶體幾百萬Actor)
3. Akka簡易版Spark通信程式設計,實作兩個程序間的通信。
3.1 重要類介紹
- ActorSystem:在Akka中,ActorSystem是一個重量級的結構,他需要配置設定多個線程,是以在實際應用中,ActorSystem通常是一個單例對象,我們可以使用這個ActorSystem建立很多Actor。
- 注意:
- ActorSystem是一個程序中的老大,它負責建立和監督Actor。
- ActorSystem是一個單例對象
- Actor負責通訊。
3.2 Actor
在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法。
- **preStart()方法:**該方法在Actor對象構造執行後執行,整個Actor生命周期僅執行一次。
- **receive()方法:**該方法子Actor的preStart方法執行完成後執行,用于接收消息,會被反複執行。
3.3 具體代碼
- Master類
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
/**
* spark的rpc通信底層是Akka, akka的底層就是actor模型(并行計算)
* Master
* 1. 繼承Actor
* 2. 重寫preStart()方法
* 重寫receive()方法, 接收消息
* 3. 半生對象中建立ActorSystem老大, 老大能夠建立actor
*/
//1. 繼承Actor
class SimpleMaster extends Actor{
println("master constructor invoked")
//2. 重寫初始化方法, 這個方法隻會被調用一次
override def preStart(): Unit = {
println("master 初始化方法被調用")
}
//3. 重寫receive方法,接收消息,發一條消息,這個方法被調用一次
override def receive: Receive = {
//接收消息
case "connect" => {
println("master 接收到注冊消息")
sender ! "register success"
}
}
}
//伴生對象中建立akka
object SimpleMaster {
def main(args: Array[String]): Unit = {
//接收參數
val host = args(0)
val port = args(1)
//準備配置檔案資訊
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//建立配置對象
val config: Config = ConfigFactory.parseString(configStr)
//3. 建立actorSystem入口類, 老大
val masterActorSystem: ActorSystem = ActorSystem("masterActorSystem",config)
//通過老大建立actor對象
masterActorSystem.actorOf(Props(new SimpleMaster), "masterActor")
}
}
- Worker類
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
/**
*
*/
//1. 繼承Actor
class SimpleWorker extends Actor {
println("worker 構造器别調用")
//2. 重寫兩個方法, 一個初始化方法,一個接收歇息的方法
override def preStart(): Unit = {
println("worker 初始化方法被調用")
//a. 需要通過context上下文對象擷取到actorMaster的引用
val masterSelection: ActorSelection = context.actorSelection("akka.tcp://[email protected]:8888/user/masterActor")
//b. 發送消息到master
masterSelection ! "connect"
}
override def receive: Receive = {
case "register success" => {
println("worker 接收到master回報的消息")
}
}
}
//3. 建立actorSystem, 建立actor
object SimpleWorker {
def main(args: Array[String]): Unit = {
//接收外界參數
val host = args(0)
val port = args(1)
//建立配置檔案
val configStr: String =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通過配置工廠建立配置對象
val config: Config = ConfigFactory.parseString(configStr)
//建立actorSystem老大對象
val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem", config)
//通過老大建立actor對象
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new SimpleWorker), "workerActor")
}
}
-
Worker配置啟動參數
*
- 啟動:先啟動Master,後啟動Worker。