天天看点

大数据之Scala并发编程

一、Scala函数

1、聚合

val arr = List(List(1, 2, 3), List(3, 4, 5), List(2), List(0))

val result=arr.aggregate(0)(+.sum, + )

2、并集

val l1 = List(5,6,4,7)

val l2 = List(1,2,3,4)

val result = l1 union l2

3、交集

val l1 = List(5,6,4,7)

val l2 = List(1,2,3,4)

val result = l1.intersect(l2)

4、差集

val l1 = List(5,6,4,7)

val l2 = List(1,2,3,4)

val result = l1.diff(l2)

二、类的定义

//在Scala中,类并不用声明为public。
//Scala源文件中可以包含多个类,所有这些类都具有公有可见性。

  class Person {
    //用val修饰的变量是只读属性,有getter但没有setter
    //(相当与Java中用final修饰的变量)
    val id = "9527"
  
    //用var修饰的变量既有getter又有setter
    var age: Int = 18
  
    //类私有字段,只能在类的内部使用
    private var name: String = "唐伯虎"
  
    //对象私有字段,访问权限更加严格的,Person类的方法只能访问到当前对象的字段
    private[this] val pet = "小强"
    
}      

三、构造器

**
  *  每个类都有主构造器,主构造器的参数直接放置类名后面,与类交织在一起,
  * 参数若是不带val或var,则为私有属性
  */
class Student(val name: String, val age: Int){

    //主构造器会执行类定义中的所有语句
    println("执行主构造器")
    
   //用this关键字定义辅助构造器
   def this(name: String, age: Int, gender: String){
      //每个辅助构造器必须以主构造器或其他的辅助构造器的调用开始
      this(name, age)
      println("执行辅助构造器")
      this.gender = gender
  }

}      

四、对象

1、单例对象:单个不可变的实例

object SingletonDemo {

    def main(args: Array[String]) {
    
      //单例对象,不需要new,用【类名.方法】调用对象中的方法
      val session = SessionFactory.getSession()
      println(session)
    }
  }
  
 //单例的SessionFactory
 object SessionFactory{
    //该部分相当于java中的静态块
    var counts = 5
    val sessions = new ArrayBuffer[Session]()
    while(counts > 0){
      sessions += new Session
      counts -= 1
    }
  
    //在object中的方法相当于java中的静态方法
    def getSession(): Session ={
      sessions.remove(0)
    }
  }      

2、伴生对象:与类名相同的对象,类和伴生对象直接可以相互访问私有方法和属性

class Dog {
    val id = 1
    private var name = "itcast"
  
    def printName(): Unit ={
      //在Dog类中可以访问伴生对象Dog的私有属性
      println(Dog.CONSTANT + name )
    }
  }
  
  /**
    * 伴生对象
    */
 object Dog {
  
    //伴生对象中的私有属性
    private val CONSTANT = "汪汪汪 : "
  
    def main(args: Array[String]) {
      val p = new Dog
      //访问私有的字段name
      p.name = "123"
      p.printName()
    }
}      

通常我们会在类的伴生对象中定义apply方法,当遇到类名(参数1,…参数n)时apply方法会被调用

object ApplyDemo {

  def main(args: Array[String]) {
  
    //调用了Array伴生对象的apply方法
    //arr1中只有一个元素5
    val arr1 = Array(5)
    println(arr1.toBuffer)

    //new了一个长度为5的array,数组里面包含5个null
    var arr2 = new Array(5)
  }
}      

3、应用程序对象:继承App类

object AppObjectDemo extends App{
    //不用写main方法
    println("I love you Scala")
}      

五、继承: 接口用traint修饰, 实现接口用with

object ClazzDemo {

    def main(args: Array[String]) {
      val h = new Human
      /rintln(h.fight)
   }
}
  
trait Flyable{
    def fly(): Unit ={
      println("I can fly")
    }
  
}
  
abstract class Animal {

    def run(): Int
    val name: String
}

//继承animal类,实现Flyable接口
class Human extends Animal with Flyable{
  
    val name = "abc"
  
    //打印几次"ABC"?
    val t1,t2,(a, b, c) = {
      println("ABC")
      (1,2,3)
    }
  
    println(a)
    println(t1._1)
  
    //在Scala中重写一个非抽象方法必须用override修饰
    override def fight(): String = {
      "fight with 棒子"
    }
    //在子类中重写超类的抽象方法时,不需要使用override关键字,写了也可以
    def run(): Int = {
      1
    }
}      

六、模式匹配: match…case匹配

1、匹配字符串

object CaseDemo01 extends App{

    val arr = Array("YoshizawaAkiho", "YuiHatano", "AoiSola")
    val name = arr(Random.nextInt(arr.length))
    name match {
      case "YoshizawaAkiho" => println("吉泽老师...")
      case "YuiHatano" => println("波多老师...")
      case _ => println("真不知道你们在说什么...")
    }
}      

2、匹配类型

object CaseDemo01 extends App{

    //val v = if(x >= 5) 1 else if(x < 2) 2.0 else "hello"
    val arr = Array("hello", 1, 2.0, CaseDemo)
    val v = arr(Random.nextInt(4))
    println(v)
    v match {
      case x: Int => println("Int " + x)
      case y: Double if(y >= 0) => println("Double "+ y)
      case z: String => println("String " + z)
      case _ => throw new Exception("not match exception")
    }
}      

3、匹配数组、元组

object CaseDemo03 extends App{

    val arr = Array(1, 3, 5)
    arr match {
      case Array(1, x, y) => println(x + " " + y)
      case Array(0) => println("only 0")
      case Array(0, _*) => println("0 ...")
      case _ => println("something else")
    }
  
    val lst = List(3, -1)
    lst match {
      case 0 :: Nil => println("only 0")
      case x :: y :: Nil => println(s"x: $x y: $y")
      case 0 :: tail => println("0 ...")
      case _ => println("something else")
    }
  
    val tup = (2, 3, 7)
    tup match {
      case (1, x, y) => println(s"1, $x , $y")
      case (_, z, 5) => println(z)
      case  _ => println("else")
    }
}      

注意:在Scala中列表要么为空(Nil表示空列表)要么是一个head元素加上一个tail列表。

9 :: List(5, 2) :: 操作符是将给定的头和尾创建一个新的列表

注意::: 操作符是右结合的,如9 :: 5 :: 2 :: Nil相当于 9 :: (5 :: (2 :: Nil))

4、样例类

case class是多例的,后面要跟构造参数,case object是单例的

case class SubmitTask(id: String, name: String)
case class HeartBeat(time: Long)
case object CheckTimeOutTask

object CaseDemo04 extends App{

      val arr = Array(CheckTimeOutTask, HeartBeat(12333), SubmitTask("0001", "task-0001"))

    arr(Random.nextInt(arr.length)) match {
      case SubmitTask(id, name) => {
        println(s"$id, $name")//前面需要加上s, $id直接取id的值
      }
      case HeartBeat(time) => {
        println(time)
      }
      case CheckTimeOutTask => {
        println("check")
      }
    }
}      

5、Option类型:能存在或也可能不存在的值

Some包装了某个值,None表示没有值

object OptionDemo {
    def main(args: Array[String]) {
    
      val map = Map("a" -> 1, "b" -> 2)
      val v = map.get("b") match {
        case Some(i) => i
        case None => 0
      }
      println(v)
      
      //更好的方式
      val v1 = map.getOrElse("c", 0)
      println(v1)
    }
}      

6、偏函数:没有match的一组case语句

它是PartialFunction[A, B]的一个实例,A代表参数类型,B代表返回类型

object PartialFuncDemo  {

      //方式一
    def func1: PartialFunction[String, Int] = {
      case "one" => 1
      case "two" => 2
      case _ => -1
    }
  
    //方式二
    def func2(num: String) : Int = num match {
      case "one" => 1
      case "two" => 2
      case _ => -1
    }
  
    def main(args: Array[String]) {
      println(func1("one"))
      println(func2("one"))
    }
}      

七、Scala并发编程:Actor

Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息(message)的发送、接收来实现多线程的。使用Scala能够更容易地实现多线程应用的开发。

我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor

发送消息的方式:

! 发送异步消息,没有返回值。

!? 发送同步消息,等待返回值。

!! 发送异步消息,返回值是 Future[Any]。

1、案例一:两个单例对象

object MyActor1 extends Actor{

    //重新act方法
    def act(){
      for(i <- 1 to 10){
        println("actor-1 " + i)
        Thread.sleep(2000)
      }
    }
  }
  
  object MyActor2 extends Actor{
    //重新act方法
    def act(){
      for(i <- 1 to 10){
        println("actor-2 " + i)
        Thread.sleep(2000)
      }
    }
  }
  
  object ActorTest extends App{
    //启动Actor
    MyActor1.start()
    MyActor2.start()
}      

2、案例二:(可以不断地接收消息)

class MyActor extends Actor {
  
    override def act(): Unit = {
      while (true) {
        receive {
          case "start" => {
            println("starting ...")
            Thread.sleep(5000)
            println("started")
          }
          
          case "stop" => {
            println("stopping ...")
            Thread.sleep(5000)
            println("stopped ...")
          }
        }
      }
    }
  }
  
  object MyActor {
    def main(args: Array[String]) {
      val actor = new MyActor
      actor.start()
      actor ! "start"
      actor ! "stop"
      println("消息发送完成!")
    }
}      

3、案例三:(react方式会复用线程,比receive更高效)

class YourActor extends Actor {
  
    override def act(): Unit = {
      loop {
        react {
          case "start" => {
            println("starting ...")
            Thread.sleep(5000)
            println("started")
          }
          case "stop" => {
            println("stopping ...")
            Thread.sleep(8000)
            println("stopped ...")
          }
        }
      }
    }
  }
  
  
  object YourActor {
    def main(args: Array[String]) {
      val actor = new YourActor
      actor.start()
      actor ! "start"
      actor ! "stop"
      println("消息发送完成!")
    }
}      
package cn.itcast.actor

import scala.actors.{Actor, Future}
import scala.collection.mutable.{HashSet, ListBuffer}
import scala.io.Source

/**
  * Created by root on 2016/5/11.
  */
class Task extends Actor {
    override def act(): Unit = {
      loop {
        react {
          case SubmitTask(filename) => {
            //局部统计, 结果是Map[String, Int]
            val result = Source.fromFile(filename).getLines().flatMap(_.split(" ")).map((_, 1)).toList.groupBy(_._1).mapValues(_.size)
            sender ! ResultTask(result) //发送ResultTask, 用它来包装result
          }
          case StopTask => {
            exit()
          }
        }
      }
    }
}

case class SubmitTask(filename: String)
case class ResultTask(reslut : Map[String, Int])
case object StopTask

object ActorWordCount {

  def main(args: Array[String]) {
  
      val replySet = new HashSet[Future[Any]]()
      val resultList = new ListBuffer[ResultTask]()
  
      val files = Array[String]("c://words.txt", "c://words.log")
      for (f <- files) {
        val actor = new Task
        val reply = actor.start() !! SubmitTask(f)  //启动, 并发送消息,返回Future
        replySet += reply //把这些Future放到集合中
      }
  
      while(replySet.size > 0) {
      
        val toCompute = replySet.filter(_.isSet)  //取出有效的结果, 待处理的数据
        for(f <- toCompute) {
          val result = f().asInstanceOf[ResultTask] //获取实例, 注意f后要加(), 调用apply(), 否则会报转换异常
          resultList += result
          replySet -= f
        }
        Thread.sleep(100)
    }

      //汇总的功能
      //List((hello, 5), (tom,3), (helllo, 2), (jerry, 2))
      val fr = resultList.flatMap(_.reslut).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
      println(fr)

  }
}      
上一篇: ACM相关tips
下一篇: 大数据之YARN