注:本文所有的代碼使用的使Akka的Actor架構,而沒有使用Scala原生的Actor架構(deprecated)。
本文隻做了前兩題,actor程式設計還是很清晰的,我的架構可能不是最合理的,feel free to put your architect.
第一題
import CoordinatorActor.{CalRequest, CalRespond, StartCal}
import akka.actor.{Actor, ActorSystem, Props}
import scala.util.Random
object CoordinatorActor{
def props(arr: Array[Int], n: Int): Props = Props(new CoordinatorActor(arr, n))
case object StartCal
final case class CalRequest(arr: Array[Int], spos: Int, epos: Int)
final case class CalRespond(ans: Long)
}
class CoordinatorActor(arr: Array[Int], n: Int) extends Actor{
val cal_cnt = 4
val interval = n / cal_cnt
var sum:Long = 0l
var respondCnt = 0
var tstamp1: Long = _
override def receive: Receive = {
case StartCal =>
tstamp1 = System.currentTimeMillis()
var last = 1
for(i <- 1 to (cal_cnt - 1)){
val calActor = context.actorOf(Props[CalActor], s"cal-$i")
calActor ! CalRequest(arr, last, last + interval)
last += (interval + 1)
}
val lastActor = context.actorOf(Props[CalActor], s"cal-$cal_cnt")
lastActor ! CalRequest(arr, last, n)
case CalRespond(partSum) =>
sum += partSum
respondCnt += 1
if(respondCnt == cal_cnt){
println(s"[Actor] total: $sum, average: ${sum / n}")
val tstamp2 = System.currentTimeMillis()
println(s"[Actor] cost time: ${tstamp2 - tstamp1}ms")
context.stop(self)
}
}
}
class CalActor extends Actor{
import CoordinatorActor._
override def receive: Receive = {
case CalRequest(arr, spos, epos) =>
var sum: Long = 0l
for(i <- spos to epos) sum += arr(i)
sender() ! CalRespond(sum)
}
}
object p1 extends App {
val n = 10000000
val rand = Random
val arr = new Array[Int](n + 10)
for(i <- 1 to n) arr(i) = rand.nextInt(100)
var sum1: Long = 0l
val tstamp1 = System.currentTimeMillis()
for(i <- 1 to n) sum1 += arr(i)
println(s"total: $sum1, average: ${sum1 / n}")
val tstamp2 = System.currentTimeMillis()
println(s"cost time: ${tstamp2 - tstamp1}ms")
val system = ActorSystem("average-cal")
val coordinatorActor = system.actorOf(CoordinatorActor.props(arr, n), "cal-coordinator")
coordinatorActor ! StartCal
}
CalActor:執行計算任務
CoordinatorActor:執行排程管理任務,建立CalActor,執行結果統計。
執行結果:
n = 10000000
total: 495159047, average: 49
cost time: 95ms
[Actor] total: 495159047, average: 49
[Actor] cost time: 51ms
[INFO] [07/21/2017 16:51:27.273] [Thread-0] [CoordinatedShutdown(akka://average-cal)] Starting coordinated shutdown from JVM shutdown hook
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
第二題
import java.awt.image.BufferedImage
import java.io.{File}
import CoActor.{OpRespond, Start}
import OperActor.StartOp
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
// 執行者:執行像素操作
object OperActor{
def props(img: BufferedImage, spos: Int, epos: Int, width: Int): Props = Props(new OperActor(img, spos, epos, width))
// 開始操作指令
case object StartOp
}
class OperActor(img: BufferedImage, spos: Int, epos: Int, width: Int) extends Actor with ActorLogging{
override def receive: Receive = {
case StartOp =>
log.info(s"start operation between $spos and $epos")
for(i <- spos to epos){
for(j <- 0 until width) {
val ori = img.getRGB(j, i)
img.setRGB(j, i, ~ori)
}
}
// 向排程者報告完成
sender() ! OpRespond
log.info(s"end operation between $spos and $epos")
}
}
// 排程者:對執行者進行排程
object CoActor{
def props(img: BufferedImage) : Props = Props(new CoActor((img)))
// 開始排程指令
case object Start
// 完成報告指令
case object OpRespond
}
class CoActor(img: BufferedImage) extends Actor with ActorLogging {
val height = img.getHeight()
val width = img.getWidth()
val actor_cnt = 5
val interval = height / actor_cnt
var response_cnt = 0
override def receive: Receive = {
case Start =>
log.info("start operation")
var last = 0
for(i <- 1 to (actor_cnt - 1)) {
val opActor = context.actorOf(OperActor.props(img, last, last + interval, width))
last += (interval + 1)
opActor ! StartOp
}
// 開始一個操作者
val lastActor = context.actorOf(OperActor.props(img, last, height - 1, width))
lastActor ! StartOp
case OpRespond =>
response_cnt += 1
// 所有操作者全部完成任務,結束執行
if(response_cnt == actor_cnt){
log.info("end operation")
val outFile = new File("resource/newImage.jpg")
javax.imageio.ImageIO.write(img, "jpg", outFile)
context.stop(self)
}
}
}
object p2 extends App {
// println(System.getProperty("user.dir"))
val image: BufferedImage = javax.imageio.ImageIO.read(new File("resource/image.jpg"))
val system = ActorSystem("image-op")
// 開始一個排程者
val coActor = system.actorOf(CoActor.props(image), "coordinator")
coActor ! Start
}
