關于spark streaming中的任務有如下幾個概念:
batch
job
stage
task
job的并行度複雜些,由兩個配置決定:
spark.scheduler.mode(fifo/fair)
spark.streaming.concurrentjobs
我們知道一個batch可能會有多個action執行,比如你注冊了多個kafka資料流,每個action都會産生一個job,是以一個batch有可能是一批job,也就是jobset的概念,這些job由jobexecutor依次送出執行,而jobexecutor是一個預設池子大小為1的線程池,是以隻能執行完一個job再執行另外一個job。這裡說的池子,他的大小就是由<b>spark.streaming.concurrentjobs </b>控制的。
concurrentjobs 其實決定了向spark core送出job的并行度。送出一個job,必須等這個執行完了,才會送出第二個。假設我們把它設定為2,則會并發的把job送出給spark core,spark 有自己的機制決定如何運作這兩個job,這個機制其實就是fifo或者fair(決定了資源的配置設定規則)。預設是fifo,也就是先進先出,你把concurrentjobs設定為2,但是如果底層是fifo,那麼會優先執行先送出的job,雖然如此,如果資源夠兩個job運作,還是會并行運作兩個job。
我們搞個例子來論證下上面的結論:
<a href="https://github.com/allwefantasy/spark-ml-example/blob/master/src/main/java/exmaple/jobtest.scala" target="_blank"><b>源碼github位址</b></a>
上面的testinputstream的簽名如下:
是以testinputstream其實就是我mock的一個資料源,最後numpartitions表示的是分區數。這裡,我們把concurrentjobs設定為2,意味着taskscheduler接受到了兩個job,然後setmaster[local(2)]表示隻可以并發執行兩個task。
因為input,input1每個batch至少都有3個元素,每個元素需要運作5秒,是以有一個task需要運作兩個元素,那麼第一次input1需要運作10秒。input1在運作五秒後,空出了一個線程,這個時候input的job開始運作,到第十秒的時候,input1完成,input開始運作也已經完成一個元素的計算,這個時候啟動另外兩個元素運作。是以input1花了10秒,input花了15秒,但是因為input被延時了五秒才得以運作,是以input1其實相當于花了20秒。
這裡你會好奇,為啥我先聲明的input,接着再申明的input1,但是input1卻先運作呢?因為這兩個資料源對應的job是被并發送出的,有一定的随機性。如果你多啟動幾次,你會發現input對應job id有可能是0,也有可能是1。
還有兩點值的注意的是:
job id的産生是在job送出的時候才産生,而不是job在産生的時候生成的。
job被送出後會直接進入scheduler的pool,在scheduler給你配置設定資源的時候,雖然說fifo是先按job id 小的優先處理,但是job id大的先進來,在配置設定資源的時候,小的還沒進來呢,是以job id 大的可能被優先執行了。
上面的流程解說解釋的是下面這張圖:

接着呢,input2在剩下兩條記錄處理的10秒過程中,其實第二個周期已經開始了,input的任務又得以開始運作,這個時候因為隻有一個線程可以用,是以運作了兩個元素,input1處理完成,空出線程,第二個周期的input1繼續排程,input的剩下的一個元素也繼續運作,最後input,input1都花了15秒。
有點繞,如果大家迷惑,可以把代碼貼在自己的ide上運作一下,然後觀察他們的交錯時間。
如果我們再做個調整:
你會發現,不同batch的job其實也可以并行運作的,這裡需要有幾個條件:
有延時發生了,batch無法在本batch完成
concurrentjobs > 1
如果scheduler mode 是fifo則需要某個job無法一直消耗掉所有資源
mode是fair則盡力保證你的job是并行運作的,毫無疑問是可以并行的。
回到我們的标題,不同batch的job有可能會同時在運作麼,隻要滿足我前面提到的三個條件,就有可能。