天天看點

基于Spark Streaming預測股票走勢的例子(一)

  最近學習Spark Streaming,不知道是不是我搜尋的姿勢不對,總找不到具體的、完整的例子,一怒之下就決定自己寫一個出來。下面以預測股票走勢為例,總結了用Spark Streaming開發的具體步驟以及方法。

  一、資料源。

  既然預測股票走勢,當然要從網上找一下股票資料的接口,具體可以參考

http://blog.sina.com.cn/s/blog_540f22560100ba2k.html http://apistore.baidu.com/apiworks/servicedetail/115.html

。下面簡單分析一下各種資料接口的優劣以抛磚引玉:

  1、Sina股票資料接口。以字元串資料的形式範圍,簡單易用且直覺。

  2、百度資料接口。以API集市形式提供json形式的資料,比較規範,但使用起來比較繁瑣。

  簡單起見,作者使用新浪的資料接口。

  二、測試資料源

  有了股票的資料接口,以下代碼提供簡單的測試,以解析傳回的資料。

/**
  * Created by gabry.wu on 2016/2/18.
  */
package com.gabry.stock

import scala.io.Source
/** 其實這個類應該更通用一點,但目前一切以簡單為主,後期在進行重構 **/
class SinaStock
{
  var code:String="" //“sh601006”,股票代碼
  var name :String =""  //”大秦鐵路”,股票名字
  var curOpenPrice :Float =0 //”27.55″,今日開盤價
  var lstOpenPrice:Float =0 //”27.25″,昨日收盤價
  var curPrice :Float =0 //”26.91″,目前價格
  var highestPrice  :Float =0 //”27.55″,今日最高價
  var lowestPrice :Float=0 //”26.20″,今日最低價
  var bidBuyPrice:Float=0 //”26.91″,競買價,即“買一”報價
  var bidSalePrice:Float=0 //”26.92″,競賣價,即“賣一”報價
  var dealNum :Long=0 //8:”22114263″,成交的股票數,由于股票交易以一百股為基本機關,是以在使用時,通常把該值除以一百
  var dealAmount  :Float=0 //9:”589824680″,成交金額,機關為“元”,為了一目了然,通常以“萬元”為成交金額的機關,是以通常把該值除以一萬
  var bidBuy1Num :Long=0 //10:”4695″,“買一”申請4695股,即47手
  var bidBuy1Amount :Float=0 //11:”26.91″,“買一”報價
  var bidBuy2Num :Long=0
  var bidBuy2Amount :Float=0
  var bidBuy3Num :Long=0
  var bidBuy3Amount :Float=0
  var bidBuy4Num :Long=0
  var bidBuy4Amount :Float=0
  var bidBuy5Num :Long=0
  var bidBuy5Amount :Float=0
  var bidSale1Num :Long=0 //“賣一”申報3100股,即31手
  var bidSale1Amount :Float=0 //“賣一”報價
  var bidSale2Num :Long=0
  var bidSale2Amount :Float=0
  var bidSale3Num :Long=0
  var bidSale3Amount :Float=0
  var bidSale4Num :Long=0
  var bidSale4Amount :Float=0
  var bidSale5Num :Long=0
  var bidSale5Amount :Float=0
  var date:String ="" //”2008-01-11″,日期
  var time:String="" //”15:05:32″,時間
  def toDebugString =  "code[%s],name[%s],curOpenPrice [%f],lstOpenPrice[%f],curPrice [%f],highestPrice  [%f],lowestPrice [%f],bidBuyPrice[%f],bidSalePrice[%f],dealNum [%d],dealAmount  [%f],bidBuy1Num [%d],bidBuy1Amount [%f],,bidBuy2Num [%d],bidBuy2Amount [%f],bidBuy3Num [%d],bidBuy3Amount [%f],bidBuy4Num [%d],bidBuy4Amount [%f],bidBuy5Num [%d],bidBuy5Amount [%f],bidSale1Num [%d],bidSale1Amount [%f],bidSale2Num [%d],bidSale2Amount [%f],bidSale3Num [%d],bidSale3Amount [%f],bidSale4Num [%d],bidSale4Amount [%f],bidSale5Num [%d],bidSale5Amount [%f],date [%s],time [%s]" .format( this.code,    this.name,    this.curOpenPrice ,    this.lstOpenPrice,    this.curPrice ,    this.highestPrice  ,    this.lowestPrice ,    this.bidBuyPrice,    this.bidSalePrice,    this.dealNum ,    this.dealAmount  ,    this.bidBuy1Num ,    this.bidBuy1Amount ,    this.bidBuy2Num ,    this.bidBuy2Amount ,    this.bidBuy3Num ,    this.bidBuy3Amount ,    this.bidBuy4Num ,    this.bidBuy4Amount ,    this.bidBuy5Num ,    this.bidBuy5Amount ,    this.bidSale1Num ,    this.bidSale1Amount ,    this.bidSale2Num ,    this.bidSale2Amount ,    this.bidSale3Num ,    this.bidSale3Amount ,    this.bidSale4Num ,    this.bidSale4Amount ,    this.bidSale5Num ,    this.bidSale5Amount ,    this.date ,    this.time  )
  override def toString =  Array(this.code,this.name,this.curOpenPrice,this.lstOpenPrice,this.curPrice,this.highestPrice,this.lowestPrice,this.bidBuyPrice,this.bidSalePrice,this.dealNum,this.dealAmount,this.bidBuy1Num,this.bidBuy1Amount,this.bidBuy2Num,this.bidBuy2Amount,this.bidBuy3Num,this.bidBuy3Amount,this.bidBuy4Num,this.bidBuy4Amount,this.bidBuy5Num,this.bidBuy5Amount,this.bidSale1Num,this.bidSale1Amount,this.bidSale2Num,this.bidSale2Amount,this.bidSale3Num,this.bidSale3Amount,this.bidSale4Num,this.bidSale4Amount,this.bidSale5Num,this.bidSale5Amount,this.date,this.time).mkString(",")
  private var stockInfo :String =""
  def getStockInfo = stockInfo
  def this(stockInfo:String)
  {
    this()
    this.stockInfo=stockInfo
/** 根據新浪的資料接口解析資料 **/
    val stockDetail=stockInfo.split(Array(' ','_','=',',','"'))
    if (stockDetail.length>36){
      this.code=stockDetail(3)
      this.name=stockDetail(5)
      this.curOpenPrice =stockDetail(6).toFloat
      this.lstOpenPrice=stockDetail(7).toFloat
      this.curPrice =stockDetail(8).toFloat
      this.highestPrice  =stockDetail(9).toFloat
      this.lowestPrice =stockDetail(10).toFloat
      this.bidBuyPrice=stockDetail(11).toFloat
      this.bidSalePrice=stockDetail(12).toFloat
      this.dealNum =stockDetail(13).toLong
      this.dealAmount  =stockDetail(14).toFloat
      this.bidBuy1Num =stockDetail(15).toLong
      this.bidBuy1Amount =stockDetail(16).toFloat
      this.bidBuy2Num =stockDetail(17).toLong
      this.bidBuy2Amount =stockDetail(18).toFloat
      this.bidBuy3Num =stockDetail(19).toLong
      this.bidBuy3Amount =stockDetail(20).toFloat
      this.bidBuy4Num =stockDetail(21).toLong
      this.bidBuy4Amount =stockDetail(22).toFloat
      this.bidBuy5Num =stockDetail(23).toLong
      this.bidBuy5Amount =stockDetail(24).toFloat
      this.bidSale1Num =stockDetail(25).toLong
      this.bidSale1Amount =stockDetail(26).toFloat
      this.bidSale2Num =stockDetail(27).toLong
      this.bidSale2Amount =stockDetail(28).toFloat
      this.bidSale3Num =stockDetail(29).toLong
      this.bidSale3Amount =stockDetail(30).toFloat
      this.bidSale4Num =stockDetail(31).toLong
      this.bidSale4Amount =stockDetail(32).toFloat
      this.bidSale5Num =stockDetail(33).toLong
      this.bidSale5Amount =stockDetail(34).toFloat
      this.date =stockDetail(35)
      this.time =stockDetail(36)
      }
  }
}
/** SinaStock的伴生對象,此處用來替代new **/
object SinaStock
{
  def apply(stockInfo:String) :SinaStock =
  {
    new SinaStock(stockInfo)
  }
}
object StockRetrivor {
  def main(args: Array[String]): Unit = {
    println("查詢新浪股票(每小時更新) http://hq.sinajs.cn/list=sh601006,sh601007")
/** 查詢sh601006,sh601007兩隻股票 **/
    val sinaStockStream = Source.fromURL("http://hq.sinajs.cn/list=sh601006,sh601007","gbk")
    val sinaLines=sinaStockStream.getLines
    for(line <- sinaLines) {
/** 将每行資料解析成SinaStock對象,并答應對應的股票資訊 **/
      println(SinaStock(line).toString)
    }
    sinaStockStream.close()
  }
}
      

   三、Spark Streaming程式設計

   資料接口調試完畢,股票資料也解析好了,下面就開始Streaming。Spark Streaming一定會涉及資料源,且該資料源是一個主動推送的過程,即spark被動接受該資料源的資料進行分析。但Sina的接口是一個很簡單的HttpResponse,無法主動推送資料,是以我們需要實作一個Custom Receiver,可參考

http://spark.apache.org/docs/latest/streaming-custom-receivers.html

   下面是具體的代碼,其實定制化一個Receiver簡單來說就是實作onStart/onStop。onStart用來初始化資源,給擷取資料做準備,擷取到的資料用store發送給SparkStreaming即可;onStop用來釋放資源

package com.gabry.stock

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

import scala.io.Source

/**
  * Created by gabry.wu on 2016/2/19.
  * 簡單起見,隻擷取新浪股票資料,後續再進行重構
  */
class SinaStockReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging{
  def onStart() {
    /* 建立一個線程用來查詢新浪股票資料,并将資料發送給Spark Streaming */
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }
  private def receive(): Unit = {
      try{
        while(!isStopped ) {
          var stockIndex = 1
          while(stockIndex!=0){
            val stockCode = 601000+stockIndex
            val url="http://hq.sinajs.cn/list=sh%d".format(stockCode)
            logInfo(url)
            val sinaStockStream = Source.fromURL(url,"gbk")
            val sinaLines=sinaStockStream.getLines
            for(line <- sinaLines) {
              logInfo(line)
              store(line)
            }
            sinaStockStream.close()
            stockIndex= (stockIndex+1)%1
          }
       
        }

        logInfo("Stopped receiving")
        restart("Trying to connect again")
      } catch {
        case e: java.net.ConnectException =>
          restart("Error connecting to", e)
        case t: Throwable =>
          restart("Error receiving data", t)
      }
    }
}
      

   Receiver搞定之後就可以開始編寫股票預測的main函數了,貼代碼之前說明一下,股票預測的方法之一,就是統計一段時間内股票上漲的次數,并展示上漲次數TopN的股票資訊,但本文一切從簡,并沒有實作全部的功能,隻是統計了股票上漲的次數,也就是對上漲與否進行WordCount。

/**
  * Created by gabry.wu on 2016/2/19.
  */
package com.gabry.stock

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StockTrend {
  def updatePriceTrend( newValue:Seq[(Float,Int)],preValue :Option[(Float,Int)]):Option[(Float,Int)] = {
    if (newValue.length>0){
      val priceDiff=newValue(0)._1 - preValue.getOrElse((newValue(0)._1 ,0))._1
      // ("update state: new Value "+newValue(0) + ",pre Value " + preValue.getOrElse((newValue(0)._1 ,0)))
      Some((newValue(0)._1,priceDiff.compareTo(0.0f)))
    }else preValue
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    Logger.getRootLogger.setLevel(Level.WARN)
    ssc.checkpoint("./tmp")
    /* 建立股票的輸入流,該輸入流是自定義的 */
    val lines = ssc.receiverStream(new SinaStockReceiver())
    /** 将資料的每一行映射成一個SinaStock對象。注意此處的每一行資料都是SinaStockReceiver對象調用store傳過來的 **/
    val words = lines.map(SinaStock(_))
    import scala.util.Random
    /* reduce從左到右進行折疊。其實就是先處理t-6,t-5的RDD,将結果與t-4的RDD再次調用reduceFunc,依次類推直到目前RDD */
    def reduceFunc( left :(Float,Int),right:(Float,Int)):(Float,Int) = {
      println("left "+left+"right "+right)
      (right._1,left._2+right._2)
    }

    /* 3點之後股票價格不在變化,故為了測試,此處使用随機數修改股票目前價格 */
    /* 根據上一次股票價格更新股票的變化方向 */
    /** 由于股票資訊隻有目前價格,如果要判斷股票上漲與否就要記錄上一次的股票價格,是以此處使用updateStateByKey更新目前股票價格是否上漲。
    若上漲則記為1,不變記為0,否則記為1
      **/
    val stockState = words.map(sinaStock => (sinaStock.name, (sinaStock.curPrice+Random.nextFloat,-1))).filter(stock=>stock._1.isEmpty==false)
                   .updateStateByKey(updatePriceTrend)
    /* 每3秒,處理過去6秒的資料,對資料進行變化的累加 */
    val stockTrend=stockState.reduceByKeyAndWindow(reduceFunc(_,_),Seconds(6),Seconds(3))
    /* 每3秒,處理過去6秒的資料,對資料進行正向變化的累加 */
    //val stockPosTrend=stockState.filter(x=>x._2._2>=0).reduceByKeyAndWindow(reduceFunc(_,_),Seconds(6),Seconds(3))
    stockState.print()
    stockTrend.print()
    //stockPosTrend.print()
    ssc.start()
    ssc.awaitTermination()
    println("StockTrend")
  }
}
      

   四、運作結果分析

  下面是某次運作的列印結果,對其進行簡單的分析。

  由于ssc的時間間隔為1,是以每秒都會查詢大同煤業的股票資料,這就是下面每個Time列印的第一行資料(因為stockState先進行print,是以每次查詢的股票資料是第一行);又因為slide設定為3,是以每隔3秒會進行reduceFunc計算,該函數處理windowsize個RDD(此處設定為6),對這6個RDD按照時間先後順序進行reduce。

  需要特别說明的是spark的reduce預設從左到右進行fold(折疊),從最左邊取兩個數進行reduce計算産生臨時結果,再與後面的資料進行reduce,以此類推進行計算,其實就是foldLeft。

  下面标紅色的資料,其實就是對(5.387682,0),(5.9087195,1),(5.7605586,-1),(5.278526,-1),(5.4471517,1),(5.749305,1)進行reduce的過程。

-------------------------------------------

Time: 1455888254000 ms

(大同煤業,(5.387682,0))

Time: 1455888255000 ms

(大同煤業,(5.9087195,1))

Time: 1455888256000 ms

(大同煤業,(5.7605586,-1))

left (5.387682,0)right (5.9087195,1)

left (5.9087195,1)right (5.7605586,-1)

(大同煤業,(5.7605586,0))

Time: 1455888257000 ms

(大同煤業,(5.278526,-1))

Time: 1455888258000 ms

(大同煤業,(5.4471517,1))

Time: 1455888259000 ms

(大同煤業,(5.749305,1))

left (5.7605586,0)right (5.278526,-1)

left (5.278526,-1)right (5.4471517,1)

left (5.4471517,0)right (5.749305,1)

Time: 1455888260000 ms

Time: 1455888261000 ms

(大同煤業,(5.748391,-1))

Time: 1455888262000 ms

(大同煤業,(5.395269,-1))

left (5.749305,1)right (5.749305,1)

left (5.749305,2)right (5.748391,-1)

left (5.748391,1)right (5.395269,-1)

(大同煤業,(5.395269,0))

Time: 1455888263000 ms

(大同煤業,(5.5215807,1))

Time: 1455888264000 ms

(大同煤業,(5.945005,1))

Time: 1455888265000 ms

(大同煤業,(5.2400274,-1))

left (5.749305,1)right (5.748391,-1)

left (5.748391,0)right (5.395269,-1)

left (5.395269,-1)right (5.5215807,1)

left (5.5215807,0)right (5.945005,1)

left (5.945005,1)right (5.2400274,-1)

(大同煤業,(5.2400274,0))

Time: 1455888266000 ms

(大同煤業,(5.1895638,-1))

Time: 1455888267000 ms

(大同煤業,(5.1885605,-1))

Time: 1455888268000 ms

(大同煤業,(5.9881735,1)) 

Process finished with exit code -1

   五、總結

  本文以股票預測為例簡單描述了SparkStreaming程式設計的步驟及其注意點,希望抛磚引玉,也算彌補了網上沒有完整例子的遺憾。但由于作者重代碼、輕描述,估計會有一些不易了解的地方,還望各位讀者留言讨論。最後附上源碼的git位址:http://git.oschina.net/gabry_wu/BigDataPractice

PS:未經允許,禁止轉載,否則将追究法律責任!

繼續閱讀