天天看點

《快學Scala》第20章部分習題參考解答(Actor)

注:本文所有的代碼使用的使Akka的Actor架構,而沒有使用Scala原生的Actor架構(deprecated)。

本文隻做了前兩題,actor程式設計還是很清晰的,我的架構可能不是最合理的,feel free to put your architect.

第一題

import CoordinatorActor.{CalRequest, CalRespond, StartCal}
import akka.actor.{Actor, ActorSystem, Props}

import scala.util.Random

object CoordinatorActor{
  def props(arr: Array[Int], n: Int): Props = Props(new CoordinatorActor(arr, n))

  case object StartCal
  final case class CalRequest(arr: Array[Int], spos: Int, epos: Int)
  final case class CalRespond(ans: Long)
}

class CoordinatorActor(arr: Array[Int], n: Int) extends Actor{
  val cal_cnt = 4
  val interval = n / cal_cnt
  var sum:Long = 0l
  var respondCnt = 0
  var tstamp1: Long = _

  override def receive: Receive = {
    case StartCal =>
      tstamp1 = System.currentTimeMillis()
      var last = 1
      for(i <- 1 to (cal_cnt - 1)){
        val calActor = context.actorOf(Props[CalActor], s"cal-$i")
        calActor ! CalRequest(arr, last, last + interval)
        last += (interval + 1)
      }
      val lastActor = context.actorOf(Props[CalActor], s"cal-$cal_cnt")
      lastActor ! CalRequest(arr, last, n)

    case CalRespond(partSum) =>
      sum += partSum
      respondCnt += 1

      if(respondCnt == cal_cnt){
        println(s"[Actor] total: $sum, average: ${sum / n}")
        val tstamp2 = System.currentTimeMillis()
        println(s"[Actor] cost time: ${tstamp2 - tstamp1}ms")
        context.stop(self)
      }
  }
}

class CalActor extends Actor{
  import CoordinatorActor._
  override def receive: Receive = {
    case CalRequest(arr, spos, epos) =>
      var sum: Long = 0l
      for(i <- spos to epos) sum += arr(i)
      sender() ! CalRespond(sum)
  }
}

object p1 extends App {

  val n = 10000000

  val rand = Random

  val arr = new Array[Int](n + 10)

  for(i <- 1 to n) arr(i) = rand.nextInt(100)

  var sum1: Long = 0l

  val tstamp1 = System.currentTimeMillis()
  for(i <- 1 to n) sum1 += arr(i)
  println(s"total: $sum1, average: ${sum1 / n}")
  val tstamp2 = System.currentTimeMillis()

  println(s"cost time: ${tstamp2 - tstamp1}ms")


  val system = ActorSystem("average-cal")
  val coordinatorActor = system.actorOf(CoordinatorActor.props(arr, n), "cal-coordinator")

  coordinatorActor ! StartCal


}
           

CalActor:執行計算任務

CoordinatorActor:執行排程管理任務,建立CalActor,執行結果統計。

執行結果:

n = 10000000

total: 495159047, average: 49
cost time: 95ms
[Actor] total: 495159047, average: 49
[Actor] cost time: 51ms
[INFO] [07/21/2017 16:51:27.273] [Thread-0] [CoordinatedShutdown(akka://average-cal)] Starting coordinated shutdown from JVM shutdown hook

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
           

第二題

import java.awt.image.BufferedImage
import java.io.{File}

import CoActor.{OpRespond, Start}
import OperActor.StartOp
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}

// 執行者:執行像素操作
object OperActor{
  def props(img: BufferedImage, spos: Int, epos: Int, width: Int): Props = Props(new OperActor(img, spos, epos, width))

  // 開始操作指令
  case object StartOp
}

class OperActor(img: BufferedImage, spos: Int, epos: Int, width: Int) extends Actor with ActorLogging{
  override def receive: Receive = {
    case StartOp =>
      log.info(s"start operation between $spos and $epos")
      for(i <- spos to epos){
        for(j <- 0 until width) {
          val ori = img.getRGB(j, i)
          img.setRGB(j, i, ~ori)
        }
      }
      // 向排程者報告完成
      sender() ! OpRespond
      log.info(s"end operation between $spos and $epos")
  }
}

// 排程者:對執行者進行排程
object CoActor{
  def props(img: BufferedImage) : Props = Props(new CoActor((img)))

  // 開始排程指令
  case object Start
  // 完成報告指令
  case object OpRespond
}

class CoActor(img: BufferedImage) extends Actor with ActorLogging {
  val height = img.getHeight()
  val width = img.getWidth()

  val actor_cnt = 5
  val interval = height / actor_cnt
  var response_cnt = 0

  override def receive: Receive = {
    case Start =>
      log.info("start operation")
      var last = 0
      for(i <- 1 to (actor_cnt - 1)) {
        val opActor = context.actorOf(OperActor.props(img, last, last + interval, width))
        last += (interval + 1)
        opActor ! StartOp
      }
      // 開始一個操作者
      val lastActor = context.actorOf(OperActor.props(img, last, height - 1, width))
      lastActor ! StartOp

    case OpRespond =>
      response_cnt += 1

      // 所有操作者全部完成任務,結束執行
      if(response_cnt == actor_cnt){
        log.info("end operation")
        val outFile = new File("resource/newImage.jpg")
        javax.imageio.ImageIO.write(img, "jpg", outFile)

        context.stop(self)
      }
  }
}

object p2 extends App {
//  println(System.getProperty("user.dir"))
  val image: BufferedImage = javax.imageio.ImageIO.read(new File("resource/image.jpg"))

  val system = ActorSystem("image-op")

  // 開始一個排程者
  val coActor = system.actorOf(CoActor.props(image), "coordinator")
  coActor ! Start
}
           
《快學Scala》第20章部分習題參考解答(Actor)
《快學Scala》第20章部分習題參考解答(Actor)