編寫了獨立運作的spark application之後,需要将其送出到spark cluster中運作,一般會采用spark-submit來進行應用的送出,在使用spark-submit的過程中,有哪些事情需要注意的呢?
本文試就此做一個小小的總結。
spark-defaults.conf的作用範圍要搞清楚,編輯driver所在機器上的spark-defaults.conf,該檔案會影響 到driver所送出運作的application,及專門為該application提供計算資源的executor的啟動參數
隻需要在driver所在的機器上編輯該檔案,不需要在worker或master所運作的機器上編輯該檔案
舉個實際的例子
上述配置表示為該application提供計算資源的executor啟動時, heap memory需要有5g。
這裡需要引起注意的是,如果worker在加入cluster的時候,申明自己所在的機器隻有4g記憶體,那麼為上述的application配置設定executor是,該worker不能提供任何資源,因為4g<5g,無法滿足最低的資源需求。
spark-env.sh中最主要的是指定ip位址,如果運作的是master,就需要指定spark_master_ip,如果準備運作driver或worker就需要指定spark_local_ip,要和本機的ip位址一緻,否則啟動不了。
配置舉例如下:
啟動spark叢集
第一步啟動master
第二步啟動worker
将master替換成master實際運作的ip位址
如果想在一台機器上運作多個worker(主要是用于測試目的),那麼在啟動第二個及後面的worker時需要指定—webui-port的内容,否則會報端口已經被占用的錯誤,啟動第二個用的是8083,第三個就用8084,依此類推。
這種啟動worker的方式隻是為了測試是啟動友善,正規的方式是用spark_home/sbin/start-slaves.sh來啟動多個worker,由于涉及到ssh的配置,比較麻煩,我這是圖簡單的辦法。
用spark_home/sbin/start−slave.sh
來啟動worker時有一個預設的前提,即在每台機器上$spark_home必須在同一個目錄。
使用相同的使用者名和使用者組來啟動master和worker,否則executor在啟動後會報連接配接無法建立的錯誤。
我在實際的使用當中,遇到”no route to host”的錯誤資訊,起初還是認為網絡沒有配置好,後來網絡原因排查之後,忽然意識到有可能使用了不同的使用者名和使用者組,使用相同的使用者名/使用者組之後,問題消失。
spark叢集運作正常之後,接下來的問題就是送出application到叢集運作了。
spark-submit用于spark application的送出和運作,在使用這個指令的時候最大的困惑就是如何指定應用所需要的依賴包。
首先檢視一下spark-submit的幫助檔案:
有幾個選項可以用來指定所依賴的庫,分别為:
--driver-class-path driver所依賴的包,多個包之間用冒号(:)分割
--jars driver和executor都需要的包,多個包之間用逗号(,)分割
為了簡單起見,就通過—jars來指定依賴,運作指令如下:
需要提醒的時,這些上傳到worker的檔案,需要定時做手工清理,否則會占用許多磁盤空間
由于spark在計算的時候會将中間結果存儲到/tmp目錄,而目前linux又都支援tmpfs,其實說白了就是将/tmp目錄挂載到記憶體當中。
那麼這裡就存在一個問題,中間結果過多導緻/tmp目錄寫滿而出現如下錯誤
解決辦法就是針對tmp目錄不啟用tmpfs,修改/etc/fstab
有時可能會遇到java.lang.outofmemory, unable to create new native thread的錯誤,導緻這種錯誤的原因比較多。
有一種情況并非真的是記憶體不足引起的,而是由于超出了允許的最大檔案句柄數或最大程序數。
排查的步驟就是檢視一下允許打開的檔案句柄數和最大程序數,如果數值過低,使用ulimit将其調高之後,再試試問題是否已經解決。
修改允許打開的最大程序數
修改允許打開的檔案句柄
上面講述了spark-submit送出spark application如何解決依賴庫的問題,那如果是spark-shell的話,該怎麼辦呢?
spark-shell的話,利用--driver-class-path選項來指定所依賴的jar檔案,注意的是--driver-class-path後如果需要跟着多個jar檔案的話,jar檔案之間使用冒号(:)來分割。
本文就 spark-cassandra-connector 的一些實作細節進行探讨,主要集中于如何快速将大量的資料從cassandra 中讀取到本地記憶體或磁盤。
存儲在 cassandra 中資料的一般都會比較多,記錄數在千萬級别或上億級别是常見的事。如何将這些表中的内容快速加載到本地記憶體就是一個非常現實的問題。
解決這一挑戰的思路從大的方面來說是比較簡單的,那就是将整張表中的内容分成不同的區域,然後分區加載,不同的分區可以在不同的線程或程序中加載,利用并行化來減少整體加載時間。
順着這一思路出發,要問的問題就是 cassandra 中的資料如何才能分成不同的區域。
不同于 mysql ,在 cassandra 中是不存在 sequence id 這樣的類型的,也就是說無法簡單的使用seqid 來指定查詢或加載的資料範圍。
既然沒有 sequenceid,在 cassandra 中是否就沒有辦法了呢?答案顯然是否定的,如果隻是僅僅支援串行讀取,cassandra 早就會被扔進垃圾堆裡了。
資料分區的辦法在 cassandra 中至少有兩種辦法可以達到,一是通過 token range,另一個是 slice range。這裡主要講解利用 token range 來實作目的。
cassandra将要存儲的記錄存儲在不同的區域中,判斷某一記錄具體存儲在哪個區域的依據是主鍵的hash值。
在cassandra 1.2之前,組成cassandra叢集的所有節點(node),都需要手工的指定該節點的hash值範圍也就是token range。
手工計算token range顯然是很繁瑣,同時也不怎麼容易維護,在cassandra 1.2之後,引進了虛拟節點(vnode)的概念,主要目的是減少不必要的人工指定,同時也将token range的劃分變得更為細粒度。比如原先手工指定token range,隻能達到10000這樣一個精度,而有了vnode之後,預設安裝是每一個實體節點上有256個虛拟節點,這樣子的話每一個range的範圍就是10000/256,這樣變的更為精細。
有關token range的資訊存儲在cassandra的system命名空間(keyspace)下的local和peers兩張表中。其中local表示本節點的token range情況,而peers表示叢集中其它節點的token range情況。這兩張表中的tokens字段就存儲有詳細的資訊。如果叢集中隻由一台機器組成,那麼peers中的就會什麼内容都沒有。
簡單實驗,列出本節點的token range>
token range告訴我們cassandra的記錄是分片存儲的,也就意味着可以分片讀取。現在的問題轉換成為如何知道每一個token range的起止範圍。
cassandra支援的thrift接口中describe_ring就是用來擷取token range的具體起止範圍的。我們常用的nodetool工具使用的就是thrift接口,nodetool 中有一個describering指令使用的就是describe_ring原語。
可以做一個簡單的實驗,利用nodetool來檢視某個keyspace的token range具體情況。
在第一節中講解了cassandra中token range資訊的存儲位置,以及可以使用哪些api來擷取token range資訊。
接下來就分析spark-cassandra-connector是如何以cassandra為資料源将資料加載進記憶體的。
以簡單的查詢語句為例,假設使用者要從demo這個keyspace的tablex表中加載所有資料,用cql來表述就是
上述的查詢使用spark-cassandra-connector來表述就是
盡管上述語句沒有觸發spark job的送出,也就是說并不會将資料直正的從cassandra的tablex表中加載進來,但spark-cassandra-connector還是需要進行一些資料庫的操作。要解決的主要問題就是schema相關。
隻是說要從tablex中加載資料,并沒有告訴connector有哪些字段,每個字段的類型是什麼。這些資訊對後面使用諸如get[string](“fieldx”)來說卻是非常關鍵的。
為了擷取字段類型資訊的中繼資料,需要讀取system.schema_columns表,利用如下語句可以得到schema_columns表結構的詳細資訊
如果在conf/log4j.properties中将日志級别設定為debug, 然後再執行sc.cassandratable語句就可以看到具體的cql查詢語句是什麼。
spark-cassandra-connector添加了一種新的rdd實作,即cassandrardd。我們知道對于一個spark rdd來說,非常關鍵的就是确定getpartitions和compute函數。
getpartitions函數會調用cassandrarddpartitioner來擷取分區數目:
cassandrarddpartitioner中的partitions的處理邏輯大緻如下:
首先确定token range,使用describe_ring
然後根據cassandra中使用的partitioner來确定某一個token range中可能的記錄條數,這麼做的原因就是為進一步控制加載的資料,提高并發度。否則并發度就永遠是256了,比如有一個實體節點,其中有256個vnodes,也就是256個token分區。如果每個分區中大緻的記錄數是20,000,而每次加載最大隻允許10,00的話,整個資料就可以分成256x2=512個分區。
對describering傳回的token range進一步拆分的話,需要使用splitter,splitter的建構需要根據keyspace中使用了何種partitioner來決定,cassandra中預設的partitioner是murmur3partitioner,murmur3hash算法可以讓hash值更為均勻的分布到不同節點。
splitter中會利用到配置項spark.cassandra.input.split.size和spark.cassandra.page.row.size,分别表示一個線程最多讀取多少記錄,另一個表示每次讀取多少行。
partitions的源碼詳見casssandrarddparitioner.scala。
compute函數就利用确定的token的起止範圍來加載内容,這裡在了解的時候需要引起注意的就是flatmap是惰性執行的,也就是說隻有在真正需要值的時候才會被執行,延遲觸發。
資料真正的加載是發生在fetchtokenrange函數,這時使用到的就是cassandra java driver了,平淡無奇。
fetcchtokenrange函數使用cassandra java driver提供的api接口來讀取資料,利用java api讀取資料一般遵循以下步驟
addcontactpoint的參數是cassandra server的ip位址,在後面真正執行cql語句的時候,如果叢集有多個節點構成,那麼不同的cql就會在不同的節點上執行,自動實作了負載均衡。可以在addcontactpoint的參數中設定多個節點的位址,這樣可以防止某一節點挂掉,無法擷取叢集資訊的情況發生。
session是線程安全的,在不同的線程使用同一個session是沒有問題的,建議針對一個keyspace隻使用一個session.
在spark rdd中是無法使用sparkcontext的,否則會形成rdd嵌套的現象,因為利用sparkcontext很容易構造出rdd,如果在rdd的函數中如map中調用sparkcontext建立一個新的rdd,則形成深度嵌套進而導緻spark job有嵌套。
但在實際的情況下,我們可以需要根據rdd中的值再去對資料庫進行操作,那麼有什麼辦法來打開資料庫連接配接呢?
解決的辦法就是直接使用cassandra java driver而不再使用spark-cassandra-connector的進階封裝,因為不能像這樣子來使用cassandrardd.
如果是直接使用cassandra java driver,為了避免每個rdd中的iterator都需要打開一個session,那麼可以使用foreachpartition函數來進行操作,減少打開的session數。
其實最好的辦法是在外面建立一個session,然後在不同的partition中使用同一個session,但這種方法不行的原因是在執行的時候會需要”task not serializable”的錯誤,于是隻有在foreachpartition函數内部建立session.
盡管cassandra号稱可以做到當機時間為零,但為了謹慎起見,還是需要對資料進行備份。
cassandra提供了幾種備份的方法:
将資料導出成為json格式
利用copy将資料導出為csv格式
直接複制sstable檔案
導出成為json或csv格式,當表中的記錄非常多的時候,這顯然不是一個好的選擇。于是就隻剩下備份sstable檔案了。
問題是将sstable存儲到哪裡呢?放到hdfs當然沒有問題,哪有沒有可能對放到hdfs上的sstable直接進行讀取呢,在沒有經過任務修改的情況下,這是不行的。
試想一下,sstable的檔案會被拆分為多個塊而存儲到hdfs中,這樣會破壞記錄的完整性,hdfs在存儲的時候并不知道某一block中包含有完成的記錄資訊。
為了做到記錄資訊不會被拆分到多個block中,需要根據sstable的格式自行提取資訊,并将其存儲到hdfs上。這樣存儲之後的檔案就可以被并行通路。
cassandra中提供了工具sstablesplit來将大的sstable分割成為小的檔案。
datastax的dse企業版中提供了和hadoop及spark的緊密結合,其一個很大的基礎就是先将sstable的内容存儲到cfs中,大體的思路與剛才提及的應該差不多。
對sstable存儲結構的分析是一個研究的熱門,可以參考如下的連結。
<a href="https://www.fullcontact.com/blog/cassandra-sstables-offline/">https://www.fullcontact.com/blog/cassandra-sstables-offline/</a>
隻是以要研究備份政策是想将對資料的分析部分與業務部分相分離開,避免由于背景的資料分析導緻cassandra叢集響應變得緩慢而緻前台業務不可用,即将oltp和olap的資料源分離開。
通過近乎實時的資料備份,背景olap就可以使用spark來對資料進行分析和處理。
與傳統的rdbms相比,cassandra所能提供的查詢功能實在是弱的可以,如果想到實作非常複雜的查詢功能的,需要将cassandra和solr進行結合。
dse企業版提供了該功能,如果想手工搭建的話,可以參考下面的連結
<a href="http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr">http://www.slideshare.net/planetcassandra/an-introduction-to-distributed-search-with-cassandra-and-solr</a>
sparkcontext可以被多個線程使用,這意味着同個spark application中的job可以同時送出到spark cluster中,減少了整體的等待時間。
在同一個線程中, spark隻能逐個送出job,當job在執行的時候,driver application中的送出線程是處于等待狀态的。如果job a沒有執行完,job b就無法送出到叢集,就更不要提配置設定資源真正執行了。
那麼如何來減少等待時間呢,比如在讀取cassandra資料的過程中,需要從兩個不同的表中讀取資料,一種辦法就是先讀取完成表a與讀取表b,總的耗時是兩者之和。
如果利用共享sparkcontext的技術,在不同的線程中去讀取,則耗時隻是兩者之間的最大值。
在scala中有多種不同的方式來實作多線程,現僅以future為例來說明問題:
<b>簡要說明一下代碼邏輯:</b>
建立三個不同的rdd
在不同的線程(future)中通過count函數來送出job
使用await來等待future執行結束<b></b>