天天看點

StreamingListener技術點

以下是對StreamingListene的研究,由于比較簡單,故隻貼代碼,不做解釋

/**
  * Created by gabry.wu on 2016/5/27.
  * 實作StreamingListener,以監控spark作業狀态
  * 傳入StreamingContext可以在某種出錯時退出目前的SparkStreaming
  */
class StreamingMonitor(ssc:StreamingContext) extends StreamingListener{
  private val log =  LoggerFactory.getLogger("SparkStreamingMonitor")
  // Receiver啟動
  override def onReceiverStarted(receiverStarted : StreamingListenerReceiverStarted): Unit = {
    log.warn("onReceiverStarted")
    log.warn(s"active=${receiverStarted.receiverInfo.active},executorId=${receiverStarted.receiverInfo.executorId}," +
      s"lastError=${receiverStarted.receiverInfo.lastError},lastErrorMessage=${receiverStarted.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStarted.receiverInfo.location},name=${receiverStarted.receiverInfo.name}," +
      s"streamId=${receiverStarted.receiverInfo.streamId}")
  }
  // Receiver報錯
  override def onReceiverError(receiverError : StreamingListenerReceiverError): Unit = {
    log.warn("onReceiverError")
    //可在該函數處理Receiver失敗
    log.warn(s"active=${receiverError.receiverInfo.active},executorId=${receiverError.receiverInfo.executorId}," +
      s"lastError=${receiverError.receiverInfo.lastError},lastErrorMessage=${receiverError.receiverInfo.lastErrorMessage}," +
      s"location=${receiverError.receiverInfo.location},name=${receiverError.receiverInfo.name}," +
      s"streamId=${receiverError.receiverInfo.streamId}")
  }
  // Receiver停止
  override def onReceiverStopped(receiverStopped : StreamingListenerReceiverStopped): Unit = {
    log.warn("onReceiverStopped")
    log.warn(s"active=${receiverStopped.receiverInfo.active},executorId=${receiverStopped.receiverInfo.executorId}," +
      s"lastError=${receiverStopped.receiverInfo.lastError},lastErrorMessage=${receiverStopped.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStopped.receiverInfo.location},name=${receiverStopped.receiverInfo.name}," +
      s"streamId=${receiverStopped.receiverInfo.streamId}")
  }
  // Batch送出作業
  override def onBatchSubmitted(batchSubmitted : StreamingListenerBatchSubmitted): Unit = {
    log.warn("onBatchSubmitted")
    // 送出作業之前已經知道有多少資料
    // batchSubmitted.batchInfo.numRecords是此次batch的資料量
    log.warn(s"batchTime=${batchSubmitted.batchInfo.batchTime},numRecords=${batchSubmitted.batchInfo.numRecords}," +
      s"processingDelay=${batchSubmitted.batchInfo.processingDelay},processingEndTime=${batchSubmitted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchSubmitted.batchInfo.processingStartTime},schedulingDelay=${batchSubmitted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchSubmitted.batchInfo.submissionTime},totalDelay=${batchSubmitted.batchInfo.totalDelay}")
  }
  // Batch啟動
  override def onBatchStarted(batchStarted : StreamingListenerBatchStarted): Unit = {
    log.warn("onBatchStarted")
    //batchStarted.batchInfo.schedulingDelay:從送出到正式啟動batch的間隔時間
    log.warn(s"batchTime=${batchStarted.batchInfo.batchTime},numRecords=${batchStarted.batchInfo.numRecords}," +
      s"processingDelay=${batchStarted.batchInfo.processingDelay},processingEndTime=${batchStarted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchStarted.batchInfo.processingStartTime},schedulingDelay=${batchStarted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchStarted.batchInfo.submissionTime},totalDelay=${batchStarted.batchInfo.totalDelay}")
  }
  // Batch完成
  override def onBatchCompleted(batchCompleted : StreamingListenerBatchCompleted): Unit = {
    log.warn("onBatchCompleted")
    //batchCompleted.batchInfo.processingDelay:批量處理時間
    //batchCompleted.batchInfo.totalDelay:此次批處理從送出,到最後結束總耗時
    log.warn(s"batchTime=${batchCompleted.batchInfo.batchTime},numRecords=${batchCompleted.batchInfo.numRecords}," +
      s"processingDelay=${batchCompleted.batchInfo.processingDelay},processingEndTime=${batchCompleted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchCompleted.batchInfo.processingStartTime},schedulingDelay=${batchCompleted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchCompleted.batchInfo.submissionTime},totalDelay=${batchCompleted.batchInfo.totalDelay}")
  }
  // 輸出操作開始
  override def onOutputOperationStarted(outputOperationStarted : StreamingListenerOutputOperationStarted): Unit = {
    log.warn("onOutputOperationStarted")
    //outputOperationStarted.outputOperationInfo.description:其實就是Stack的部分資訊,可用于輸出Action的定位
    //outputOperationStarted.outputOperationInfo.name:Action的函數名稱
    log.warn(s"batchTime=${outputOperationStarted.outputOperationInfo.batchTime},description=${outputOperationStarted.outputOperationInfo.description}," +
      s"duration=${outputOperationStarted.outputOperationInfo.duration},endTime=${outputOperationStarted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationStarted.outputOperationInfo.failureReason},id=${outputOperationStarted.outputOperationInfo.id}," +
      s"name=${outputOperationStarted.outputOperationInfo.name},startTime=${outputOperationStarted.outputOperationInfo.startTime}")
  }
  // 輸出操作完成
  override def onOutputOperationCompleted(outputOperationCompleted : StreamingListenerOutputOperationCompleted): Unit = {
    log.warn("onOutputOperationCompleted")
    //outputOperationCompleted.outputOperationInfo.duration:Action的耗時
    //outputOperationCompleted.outputOperationInfo.failureReason:Action失敗的原因。可以在該函數中處理Batch失敗
    log.warn(s"batchTime=${outputOperationCompleted.outputOperationInfo.batchTime},description=${outputOperationCompleted.outputOperationInfo.description}," +
      s"duration=${outputOperationCompleted.outputOperationInfo.duration},endTime=${outputOperationCompleted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationCompleted.outputOperationInfo.failureReason},id=${outputOperationCompleted.outputOperationInfo.id}," +
      s"name=${outputOperationCompleted.outputOperationInfo.name},startTime=${outputOperationCompleted.outputOperationInfo.startTime}")
  }
}
      

下面是添加StreamingListene的代碼

val ssc = new StreamingContext(sparkConf, new Duration(batchDuration))
    ssc.addStreamingListener(new StreamingMonitor(ssc))
      

  

各個函數的調用順序

onReceiverStarted->[接收到資料]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->[接收到資料]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->.......->onReceiverStopped

其中[接收到資料]是可選項,并不是每次都會接收到資料。

繼續閱讀