(摘自王家林)
日常工作、生活中資料來源很多不同的地方。例如:工業時代的汽車制造、監控裝置、工業裝置會産生很多源資料;資訊時代的電商網站、日志伺服器、社交網絡、金融交易系統、黑客攻擊、垃圾郵件、交通監控等;通信時代的手機、平闆、智能裝置、物聯網等會産生很多實時資料,資料流無處不在。
Streaming發揮強大流處理的地方。
大資料時代,資料價值一般怎麼定義?
所有沒經過流處理的資料都是無效資料或沒有價值的資料;資料産生之後立即處理産生的價值是最大的,資料放置越久或越滞後其使用價值越低。以前絕大多數電商網站盈利走的是網絡流量(即使用者的通路量),如今,電商網站不僅僅需要關注流量、交易量,更重要的是要通過資料流技術讓電商網站的各種資料流動起來,通過實時流動的資料及時分析、挖掘出各種有價值的資料;比如:對不同交易量的使用者指定使用者畫像,進而提供不同服務品質;準對使用者通路電商網站闆塊愛好及時推薦相關的資訊。
Spark Streaming是一個準實時流處理架構,而Hadoop MR是一個離線、批處理架構;很顯然,在資料的價值性角度,Spark Streaming完勝于Hadoop MR。
SparkStreaming VS Storm:
Spark Streaming是一個準實時流處理架構,處理響應時間一般以分鐘為機關,也就是說處理實時資料的延遲時間是秒級别的;Storm是一個實時流處理架構,處理響應是毫秒級的。是以在流架構選型方面要看具體業務場景。需要澄清的是現在很多人認為Spark Streaming流處理運作不穩定、資料丢失、事務性支援不好等等,那是因為很多人不會駕馭Spark Streaming及Spark本身。在Spark Streaming流處理的延遲時間方面,Spark定制版本,會将Spark Streaming的延遲從秒級别推進到100毫秒之内甚至更少。
SparkStreaming優點:
1、提供了豐富的API,企業中能快速實作各種複雜的業務邏輯。
3、Spark Streaming基于Spark優秀的血統。
SparkStreaming能不能像Storm一樣,一條一條處理資料?
Storm處理資料的方式是以條為機關來一條一條處理的,而Spark Streaming基于機關時間處理資料的,SparkStreaming能不能像Storm一樣呢?答案是:可以的。
業界一般的做法是Spark Streaming和Kafka搭檔即可達到這種效果,入下圖:

Kafka業界認同最主流的分布式消息架構,此架構即符合消息廣播模式又符合消息隊列模式。
Kafka内部使用的技術:
1、 Cache
2、 Interface
3、 Persistence(預設最大持久化一周)
4、 Zero-Copy技術讓Kafka每秒吞吐量幾百兆,而且資料隻需要加載一次到核心提供其他應用程式使用
外部各種源資料推進(Push)Kafka,然後再通過Spark Streaming抓取(Pull)資料,抓取的資料量可以根據自己的實際情況确定每一秒中要處理多少資料。
通過Spark Streaming動手實戰wordCount執行個體
這裡是運作一個Spark Streaming的程式:統計這個時間段内流進來的單詞出現的次數. 它計算的是:他規定的時間段内每個單詞出現了多少次。
1、先啟動下Spark叢集:
我們從叢集裡面打開下官方網站
接受這個資料進行加工,就是流處理的過程,剛才那個WordCount就是以1s做一個機關。
剛才運作的時候,為什麼沒有結果呢?因為需要資料源。
2、擷取資料源:
新開一個指令終端,然後輸入:
$ nc -lk 9999
現在我們拷貝資料源進入運作:
然後按回車運作
DStream和RDD關系:
沒有輸入資料會列印的是空結果:
但是實際上,Job的執行是Spark Streaming架構幫我們産生的和開發者自己寫的Spark代碼業務邏輯沒有關系,而且Spark Streaming架構的執行時間間隔可以手動配置,如:每隔一秒鐘就會産生一次Job的調用。是以在開發者編寫好的Spark代碼時(如:flatmap、map、collect),不會導緻job的運作,job運作是Spark Streaming架構産生的,可以配置成每隔一秒中都會産生一次job調用。
Spark Streaming流進來的資料是DStream,但Spark Core架構隻認RDD,這就産生沖突了?
Spark Streaming架構中,作業執行個體的産生都是基于rdd執行個體來産生,你寫的代碼是作業的模闆,即rdd是作業的模闆,模闆一運作rdd就會被執行,此時action必須處理資料。RDD的模闆就是DStream離散流,RDD之間存在依賴關系,DStream就有了依賴關系,也就構成了DStream 有向無環圖。這個DAG圖,是模闆。Spark Streaming隻不過是在附在RDD上面一層薄薄的封裝而已。你寫的代碼不能産生Job,隻有架構才能産生Job.
如果一秒内計算不完資料,就隻能調優了.
總結:
2、開發環境采用eclipse maven工程,需要添加Spark Streaming依賴。
3、Spark streaming 基于Spark Core進行計算,需要注意事項:
設定本地master,如果指定local的話,必須配置至少二條線程,也可通過sparkconf來設定,因為Spark Streaming應用程式在運作的時候,至少有一條線程用于不斷的循環接收資料,并且至少有一條線程用于處理接收的資料(否則的話無法有線程用于處理資料),随着時間的推移,記憶體和磁盤都會不堪重負)。
溫馨提示:
對于叢集而言,每隔exccutor一般肯定不隻一個Thread,那對于處理Spark Streaming應用程式而言,每個executor一般配置設定多少core比較合适?根據我們過去的經驗,5個左右的core是最佳的(段子:配置設定為奇數個core的表現最佳,例如:配置設定3個、5個、7個core等)
接下來,讓我們開始動手寫寫Java代碼吧!
第一步:建立SparkConf對象
第二步:建立SparkStreamingContext
我們采用基于配置檔案的方式建立SparkStreamingContext對象:
第三步,建立Spark Streaming輸入資料來源:
我們将資料來源配置為本地端口9999(注意端口要求沒有被占用):
第四步:我們就像對RDD程式設計一樣,基于DStream進行程式設計,原因是DStream是RDD産生的模闆,在Spark Streaming發生計算前,其實質是把每個Batch的DStream的操作翻譯成為了RDD操作。
1、flatMap操作:
2、 mapToPair操作:
3、reduceByKey操作:
4、print等操作:
除了print()方法将處理後的資料輸出之外,還有其他的方法也非常重要,在開發中需要重點掌握,比如SaveAsTextFile,SaveAsHadoopFile等,最為重要的是foreachRDD方法,這個方法可以将資料寫入Redis,DB,DashBoard等,甚至可以随意的定義資料放在哪裡,功能非常強大。
一、Scala方式開發
第一步,接收資料源:
第二步,flatMap操作:
第三步,map操作:
第四步,reduce操作:
第五步,print()等操作:
第六步:awaitTermination操作
一、StreamingContext功能及源碼剖析:
2、 Spark Streaming的主要功能有:
主程式的入口;
提供了各種建立DStream的方法接收各種流入的資料源(例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等);
通過構造函數執行個體化Spark Streaming對象時,可以指定master URL、appName、或者傳入SparkConf配置對象、或者已經建立的SparkContext對象;
将接收的資料流傳入DStreams對象中;
通過Spark Streaming對象執行個體的start方法啟動目前應用程式的流計算架構或通過stop方法結束目前應用程式的流計算架構;
二、DStream功能及源碼剖析:
1、 DStream是RDD的模闆,DStream是抽象的,RDD也是抽象
2、 DStream的具體實作子類如下圖所示:
3、 以StreamingContext執行個體的socketTextSteam方法為例,其執行完的結果傳回DStream對象執行個體,其源碼調用過程如下圖:
socket.getInputStream擷取資料,while循環來存儲儲蓄資料(記憶體、磁盤)
三、Receiver功能及源碼剖析:
1、Receiver代表資料的輸入,接收外部輸入的資料,如從Kafka上抓取資料;
2、Receiver運作在Worker節點上;
3、Receiver在Worker節點上抓取Kafka分布式消息架構上的資料時,具體實作類是KafkaReceiver;
4、Receiver是抽象類,其抓取資料的實作子類如下圖所示:
5、 如果上述實作類都滿足不了您的要求,您自己可以定義Receiver類,隻需要繼承Receiver抽象類來實作自己子類的業務需求。
四、StreamingContext、DStream、Receiver結合流程分析:
(1)inputStream代表了資料輸入流(如:Socket、Kafka、Flume等)
(2)Transformation代表了對資料的一系列操作,如flatMap、map等
(3)outputStream代表了資料的輸出,例如wordCount中的println方法:
資料資料在流進來之後最終會生成Job,最終還是基于Spark Core的RDD進行執行:在處理流進來的資料時是DStream進行Transformation由于是StreamingContext是以根本不會去運作,StreamingContext會根據Transformation生成”DStream的鍊條”及DStreamGraph,而DStreamGraph就是DAG的模闆,這個模闆是被架構托管的。當我們指定時間間隔的時候,Driver端就會根據這個時間間隔來觸發Job而觸發Job的方法就是根據OutputDStream中指定的具體的function,例如wordcount中print,這個函數一定會傳給ForEachDStream,它會把函數交給最後一個DStream産生的RDD,也就是RDD的print操作,而這個操作就是RDD觸發Action。
啟動HDFS,如下圖所示:
通過web端檢視節點正常啟動,如下圖所示:
2.啟動Spark叢集,如下圖所示:
通過web端檢視叢集啟動正常,如下圖所示:
3.啟動start-history-server.sh,如下圖所示:
二:HDFS的SparkStreaming案例實戰(代碼部分)
package com.dt.spark.SparkApps.sparkstreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
/**
* Created by Jonson on 2016/4/17.
*/
public class SparkStreamingOnHDFS {
public static void main(String[] args){
/**
* 第一步:配置SparkConf
* 1. 至少兩條線程:
* 因為Spark Streaming應用程式在運作的時候,至少有一條線程用于不斷的循環接收資料,
* 并且至少有一條線程用于處理接收的資料(否則的話無法有線程用于處理資料,随着時間的推移,記憶體和磁盤都不堪重負)
* 2. 對于叢集而言,每個Executor一般而言肯定不止一個線程,對于處理Spark Streaming的應用程式而言,每個Executor一般
* 配置設定多少個Core合适呢?根據我們過去的經驗,5個左右的core是最佳的(配置設定為奇數個Core為最佳)。
*/
final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");
* 第二步:建立SparkStreamingContext,這個是Spark Streaming應用程式所有功能的起始點和程式排程的核心
* 1,SparkStreamingContext的建構可以基于SparkConf參數,也可以基于持久化SparkStreamingContext的内容
* 來恢複過來(典型的場景是Driver崩潰後重新啟動,由于Spark Streaming具有連續7*24小時不間斷運作的特征,
* 所有需要在Driver重新啟動後繼續上一次的狀态,此時狀态的恢複需要基于曾經的checkpoint)
* 2,在一個Spark Streaming應用程式中可以建立若幹個SparkStreamingContext對象,使用下一個SparkStreamingContext
* 之前需要把前面正在運作的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大啟發:SparkStreamingContext
* 是Spark core上的一個應用程式而已,隻不過Spark Streaming架構箱運作的話需要Spark工程師寫業務邏輯
// JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)設定每隔5秒
final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory,conf);
}
};
* 可以從失敗中恢複Driver,不過還需要制定Driver這個程序運作在Cluster,并且送出應用程式的時候
* 指定 --supervise;
JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
* 現在是監控一個檔案系統的目錄
* 此處沒有Receiver,Spark Streaming應用程式隻是按照時間間隔監控目錄下每個Batch新增的内容(把新增的)
* 作為RDD的資料來源生成原始的RDD
//指定從HDFS中監控的目錄
JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");
* 第四步:接下來就像對于RDD程式設計一樣基于DStreaming進行程式設計!!!
* 原因是:
* DStreaming是RDD産生的模闆(或者說類)。
* 在Spark Streaming具體發生計算前其實質是把每個batch的DStream的操作翻譯成對RDD的操作!!
* 對初始的DStream進行Transformation級别的處理,例如Map,filter等高階函數的程式設計,來進行具體的資料計算。
* 第4.1步:将每一行的字元串拆分成單個單詞
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
});
* 第4.2步:對初始的JavaRDD進行Transformation級别的處理,例如map,filter等高階函數等的程式設計,來進行具體的資料計算
* 在4.1的基礎上,在單詞拆分的基礎上對每個單詞執行個體計數為1,也就是word => (word,1)
JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String,Integer>(word,1);
* 第4.3步:在每個單詞執行個體計數的基礎上統計每個單詞在檔案中出現的總次數
JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
* 此處的print并不會直接觸發Job的執行,因為現在的一切都是在Spark Streaming架構控制下的,對于Spark而言具體是否
* 觸發真正的Job運作是基于設定的Duration時間間隔的
* 一定要注意的是:Spark Streaming應用程式要想執行具體的Job,對DStream就必須有output Stream操作,
* output Stream有很多類型的函數觸發,例如:print,saveAsTextFile,saveAsHadoopFiles等,其實最為重要的一個方法是
* foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis,DB,DashBoard等上面,foreachRDD主要就是用來完成這些
* 功能的,而且可以随意的自定義具體資料到底存放在哪裡!!!
wordscount.print();
/**
* Spark Streaming執行引擎也就是Driver開始運作,Driver啟動的時候是位于一條新的線程中的。
* 當然其内部有消息循環體用于接收應用程式本身或者Executor的消息;
jsc.start();
jsc.awaitTermination();
jsc.close();
}
/**
* 工廠化模式建構JavaStreamingContext
*/
private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){
System.out.println("Creating new context");
SparkConf = conf;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5));
ssc.checkpoint(checkpointDirectory);
return ssc;
}
代碼打包在叢集中運作
建立目錄
2.腳本運作
腳本内容如下:
此時Spark Streaming會每隔5秒執行一次,不斷的掃描監控目錄下是否有新的檔案。
3.上傳檔案到HDFS中的Data目錄下
4.輸出結果
三:Spark Streaming on HDFS源碼解密
JavaStreamingContextFactory的create方法可以建立JavaStreamingContext
而我們在具體實作的時候覆寫了該方法,内部就是調用createContext方法來具體實作。上述實戰案例中我們實作了createContext方法。
3.checkpoint:
一方面:保持容錯
一方面保持狀态
在開始和結束的時候每個batch都會進行checkpoint
** Sets the context to periodically checkpoint the DStream operations for master
應用場景:
假設資料流進來,進行ML或者Graphx的時候有時需要很長時間,但是bacth定時定條件的清除RDD,是以就可以通過remember使得資料可以延長更長時間。/**
如果設定了checkpoint ,重新開機程式的時候,getOrCreate()會重新從checkpoint目錄中初始化出StreamingContext。
/* * Either recreate
a StreamingContext from checkpoint data or create a new StreamingContext.
Streaming會定期的進行checkpoint。
重新啟動程式的時候,他會從曾經checkpoint的目錄中,如果沒有做額外配置的時候,所有的資訊都會放在checkpoint的目錄中(包括曾經應用程式資訊),是以下次再次啟動的時候就會報錯,無法初始化ShuffleDStream。
一、什麼是Flume?
flume 作為 cloudera 開發的實時日志收集系統,受到了業界的認可與廣泛應用。Flume 初始的發行版本目前被統稱為 Flume OG(original generation),屬于 cloudera。但随着 FLume 功能的擴充,Flume OG 代碼工程臃腫、核心元件設計不合理、核心配置不标準等缺點暴露出來,尤其是在 Flume OG 的最後一個發行版本 0.94.0 中,日志傳輸不穩定的現象尤為嚴重,為了解決這些問題,2011 年 10 月 22 号,cloudera 完成了 Flume-728,對
Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。
flume的特點:
flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。支援在日志系統中定制各類資料發送方,用于收集資料;同時,Flume提供對資料進行簡單處理,并寫到各種資料接受方(比如文本、HDFS、Hbase等)的能力 。
flume的資料流由事件(Event)貫穿始終。事件是Flume的基本資料機關,它攜帶日志資料(位元組數組形式)并且攜帶有頭資訊,這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區,它将儲存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
flume的可靠性
當節點出現故障時,日志能夠被傳送到其他節點上而不會丢失。Flume提供了三種級别的可靠性保障,從強到弱依次分别為:end-to-end(收到資料agent首先将event寫到磁盤上,當資料傳送成功後,再删除;如果資料發送失敗,可以重新發送。),Store on failure(這也是scribe采用的政策,當資料接收方crash時,将資料寫到本地,待恢複後,繼續發送),Besteffort(資料發送到接收方後,不會進行确認)。
flume的可恢複性:
還是靠Channel。推薦使用FileChannel,事件持久化在本地檔案系統裡(性能較差)。
flume的一些核心概念:
Agent 使用JVM 運作Flume。每台機器運作一個agent,但是可以在一個agent中包含多個sources和sinks。
Client 生産資料,運作在一個獨立的線程。
Source 從Client收集資料,傳遞給Channel。
Sink 從Channel收集資料,運作在一個獨立線程。
Channel 連接配接 sources 和 sinks ,這個有點像一個隊列。
Events 可以是日志記錄、 avro 對象等。
Flume以agent為最小的獨立運作機關。一個agent就是一個JVM。單agent由Source、Sink和Channel三大元件構成,如下圖:
值得注意的是,Flume提供了大量内置的Source、Channel和Sink類型。不同類型的Source,Channel和Sink可以自由組合。組合方式基于使用者設定的配置檔案,非常靈活。比如:Channel可以把事件暫存在記憶體裡,也可以持久化到本地硬碟上。Sink可以把日志寫入HDFS, HBase,甚至是另外一個Source等等。Flume支援使用者建立多級流,也就是說,多個agent可以協同工作,并且支援Fan-in、Fan-out、Contextual Routing、Backup Routes,這也正是NB之處。如下圖所示:
1、Flume叢集采集外部系統的業務資訊,将采集後的資訊發生到Kafka叢集,最終提供Spark Streaming流架構計算處理,流處理完成後再将最終結果發送給Kafka存儲,架構如下圖:
2、Flume叢集采集外部系統的業務資訊,将采集後的資訊發生到Kafka叢集,最終提供Spark Streaming流架構計算處理,流處理完成後再将最終結果發送給Kafka存儲,同時将最終結果通過Ganglia監控工具進行圖形化展示,架構如下圖:
三、Kafka資料寫入Spark Streaming有二種方式:
Streaming啟動的Job來處理這些資料;然而,在預設的配置下,這種方法在失敗的情況下會丢失資料,為了保證零資料丢失,你可以在Spark Streaming中使用WAL日志功能,這使得我們可以将接收到的資料儲存到WAL中(WAL日志可以存儲在HDFS上),是以在失敗的時候,我們可以從WAL中恢複,而不至于丢失資料。
另一種是DirectAPI,産生資料和處理資料的時候是在兩台機器上?其實是在同一台資料上,由于在一台機器上有Driver和Executor,是以這台機器要足夠強悍。
Flume叢集将采集的資料放到Kafka叢集中,Spark Streaming會實時線上的從Kafka叢集中通過DirectAPI拿資料,可以通過Kafka中的topic+partition查詢最新的偏移量(offset)來讀取每個batch的資料,即使讀取失敗也可再根據偏移量來讀取失敗的資料,保證應用運作的穩定性和資料可靠性。
2、個人強烈推薦在生産環境下用DirectAPI,但是我們的發行版,會對DirectAPI進行優化,降低其延遲。
實際生産環境下,搜集分布式的日志以Kafka為核心。
本課分2部分講解:
第二部分,講解Kafka的安裝和實戰。
一、Kafka的概念、架構和用例場景
<a target="_blank" href="http://kafka.apache.org/documentation.html#introdution">http://kafka.apache.org/documentation.html#introdution</a>
1、Kafka的概念
什麼是消息元件:
且是解耦合方式傳遞資料。
完善的流處理系統的特點:
1)能線上的以非常低的延遲,來處理資料,而且是穩定可靠的
2)能對流進來的資料進行非常複雜的分析,而不是簡單的僅僅統計的分析
3)不僅能處理目前線上的資料,也能處理過去一天,一周,一個月甚至一年的資料
Apache Kafka與傳統消息系統相比,有以下不同的特點:
分布式系統,易于向外擴充;
線上低延遲,同時為釋出和訂閱提供高吞吐量;
流進來的資料一般處理完後就消失了,也可以将消息存儲到磁盤,是以可以處理1天甚至1周前内容,是以kafka不僅是一個消息中間件,還是一個存儲系統
2、Kafka的架構
Kafka既然具備消息系統的基本功能,那麼就必然會有組成消息系統的元件:
Topic,Producer和Consumer。Kafka還有其特殊的Kafka Cluster元件。
Topic主題:
代表一種資料的類别或類型,工作、娛樂、生活有不同的Topic,生産者需要說明把說明資料分别放在那些Topic中,裡面就是一個個小對象,并将資料資料推到Kafka,消費者擷取資料是pull的過程。一組相同類型的消息資料流。這些消息在Kafka會被分區存放,并且有多個副本,以防資料丢失。每個分區的消息是順序寫入的,并且不可改寫。
- Producer(生産者):把資料推到Kafka系統的任何對象。
- Kafka Cluster(Kafka叢集):把推到Kafka系統的消息儲存起來的一組伺服器,也叫Broker。因為Kafka叢集用到了Zookeeper作為底層支援架構,是以由一個選出的伺服器作為Leader來處理所有消息的讀和寫的請求,其他伺服器作為Follower接受Leader的廣播同步備份資料,以備災難恢複時用。
- Consumer(消費者):從Kafka系統訂閱消息的任何對象。
消費者可以有多個,并且某些消費者還可以組成Consumer Group。多個Consumer Group之間組成消息廣播的關系,是以各個Group可以拉相同的消息資料。在Consumer Group内部,各消費者之間對Consumer Group拉出來的消息資料是隊列先進先出的關系,某個消息資料隻能給該Group的一個消費者使用,同一個Group中的實體是互斥的,對一個消息,這樣是避免重複消費。如果有多個group,每個group中隻有一個實體,這就是隊列的方式了,因為它是互斥的。如果不是一個實體,則是廣播模式,如下圖所示,廣播隻能廣播給一個group中的一個消費實體
kafka的資料傳輸是基于kernel(核心)級别的(傳輸速度接近0拷貝-ZeroCopy)、沒有使用者空間的參與。Linux本身是軟體,軟體啟動時第一個啟動程序叫init,在init程序啟動後會進入使用者空間;kafka是用java寫的,是基于jvm虛拟機的。例如:在分布式系統中,機器A上的應用程式需要讀取機器B上的Java服務資料,由于Java程式對應的JVM是使用者空間級别而且資料在磁盤上,A上應用程式讀取資料時會首先進入機器B上的核心空間再進入機器B的使用者空間,讀取使用者空間的資料後,資料再經過B機器上的核心空間分發到網絡中(之是以要再經過B的核心,因為要通過網絡通信,不通過核心,哪裡來的網絡通信),機器A網卡接收到傳輸過來的資料後再将資料寫入A機器的核心空間,進而最終将資料傳輸給A的使用者空間進行處理。如下圖:網絡本身是一種硬體,磁盤隻是硬體的一種。
正常情況下,外部系統從Java程式中讀取資料,傳輸給核心空間并依賴網卡将資料寫入到網絡中,進而把資料傳輸出去。其實Java本身是核心的一層外衣,Java Socket程式設計,操作的各種資料都是在JVM的使用者空間中進行的。而Kafka操作資料是放在核心空間的,通常核心空間處理資料的速度比使用者空間快上萬倍,因為沒用使用者态和核心态的切換,是以通過kafka可以實作高速讀、寫資料。隻要磁盤空間足夠大,可以無限量的存儲資料,kafka的資料就是存儲在磁盤中的,不是存在核心中的。而很多消息元件是把資料存記憶體中的。kafka用zookeeperg管理中繼資料,而且按順序寫資料,比随機寫要快很多。又有副本!
3、Kafka的用例場景
類似微信,手機和郵箱等等這樣大家熟悉的消息元件,Kafka也可以:
- 支援文字/圖檔
- 可以存儲内容
- 分門别類
從内容消費的角度,Kafka把郵箱中的郵件類型看成是Topic。
二、Kafka的安裝和實戰
<a target="_blank" href="http://kafka.apache.org/documentation.html#quickstart">http://kafka.apache.org/documentation.html#quickstart</a>
1、安裝和配置Zookeeper
Kafka叢集模式需要提前安裝好Zookeeper。
- 提示:Kafka單例模式不需要安裝額外的Zookeeper,可以使用内置的Zookeeper。
- Kafka叢集模式需要至少3台伺服器。本課實戰用到的伺服器Hostname:master,slave1,slave2。
- 本課中用到的Zookeeper版本是Zookeeper-3.4.6。
1) 下載下傳Zookeeper
下載下傳zookeeper-3.4.6.tar.gz
1) 安裝Zookeeper
提示:下面的步驟發生在master伺服器。
以ubuntu14.04舉例,把下載下傳好的檔案放到/root目錄,用下面的指令解壓:
cd /root
tar -zxvf zookeeper-3.4.6.tar.gz
解壓後在/root目錄會多出一個zookeeper-3.4.6的新目錄,用下面的指令把它剪切到指定目錄即安裝好Zookeeper了:
mv zookeeper-3.4.6 /usr/local/spark
之後在/usr/local/spark目錄會多出一個zookeeper-3.4.6的新目錄。下面我們講如何配置安裝好的Zookeeper。
2) 配置Zookeeper
配置.bashrc
- 打開檔案:vi /root/.bashrc
- 在PATH配置行前添加:
export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6
- 最後修改PATH:
export PATH=${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${HIVE_HOME}/bin:${KAFKA_HOME}/bin:$PATH
- 使配置的環境變量立即生效:source /root/.bashrc
建立data目錄
- cd $ZOOKEEPER_HOME
- mkdir data
建立并打開zoo.cfg檔案
- cd $ZOOKEEPER_HOME/conf
- cp zoo_sample.cfg zoo.cfg
- vi zoo.cfg
配置zoo.cfg
# 配置Zookeeper的日志和伺服器身份證号等資料存放的目錄。
# 千萬不要用預設的/tmp/zookeeper目錄,因為/tmp目錄的資料容易被意外删除。
dataDir=../data
# Zookeeper與用戶端連接配接的端口
clientPort=2181
# 在檔案最後新增3行配置每個伺服器的2個重要端口:Leader端口和選舉端口
# server.A=B:C:D:其中 A 是一個數字,表示這個是第幾号伺服器;
# B 是這個伺服器的hostname或ip位址;
# C 表示的是這個伺服器與叢集中的 Leader 伺服器交換資訊的端口;
# D 表示的是萬一叢集中的 Leader 伺服器挂了,需要一個端口來重新進行選舉,
# 選出一個新的 Leader,而這個端口就是用來執行選舉時伺服器互相通信的端口。
# 如果是僞叢集的配置方式,由于 B 都是一樣,是以不同的 Zookeeper 執行個體通信
# 端口号不能一樣,是以要給它們配置設定不同的端口号。
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
改成如下方式:
dataDir=/usr/local/spark/zookeeper-3.4.6/data
dataLogDir=/usr/local/spark/zookeeper-3.4.6/logs
server.0=master1:2888:3888
server.1=work1:2888:3888
server.2=work2:2888:3888
建立并打開myid檔案
- cd $ZOOKEEPER_HOME/data
- touch myid
- vi myid
配置myid
按照zoo.cfg的配置,myid的内容就是1。要寫成0,和上面zoo.cfg裡面的配置server.0,server.1,server.2一緻,是以下面work1中myid内容為1,work2中myid内容為2
3) 同步master的安裝和配置到slave1和slave2
- 在master伺服器上運作下面的指令
scp ./.bashrc root@slave1:/root
scp ./.bashrc root@slave2:/root
cd /usr/local/spark
scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark
scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark
- 在slave1伺服器上運作下面的指令
vi $ZOOKEEPER_HOME/data/myid
按照zoo.cfg的配置,myid的内容就是1。
- 在slave2伺服器上運作下面的指令
按照zoo.cfg的配置,myid的内容就是2。
4) 啟動Zookeeper服務
zkServer.sh start
source /root/.bashrc
5) 驗證Zookeeper是否安裝和啟動成功
- 在master伺服器上運作指令:jps和zkServer.sh status
root@master:/usr/local/spark/zookeeper-3.4.6/bin# jps
3844 QuorumPeerMain
4790 Jps
zkServer.sh status
root@master:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
JMX enabled by default
Using config: /usr/local/spark/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
- 在slave1伺服器上運作指令:jps和zkServer.sh status
root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# jps
3462 QuorumPeerMain
4313 Jps
root@slave1:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
- 在slave2伺服器上運作指令:jps和zkServer.sh status
root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# jps
4073 Jps
3277 QuorumPeerMain
root@slave2:/usr/local/spark/zookeeper-3.4.6/bin# zkServer.sh status
Mode: leader
至此,代表Zookeeper已經安裝和配置成功。
2、安裝和配置Kafka
本課中用到的Kafka版本是Kafka-2.10-0.9.0.1。
1) 下載下傳Kafka
下載下傳kafka_2.10-0.9.0.1.tgz
1) 安裝Kafka
tar -zxvf kafka_2.10-0.9.0.1.tgz
解壓後在/root目錄會多出一個kafka_2.10-0.9.0.1的新目錄,用下面的指令把它剪切到指定目錄即安裝好Kafka了:
mv kafka_2.10-0.9.0.1 /usr/local
之後在/usr/local目錄會多出一個kafka_2.10-0.9.0.1的新目錄。下面我們講如何配置安裝好的Kafka。
2) 配置Kafka
export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1
打開server.properties
- cd $ZOOKEEPER_HOME/config
- vi server.properties
配置server.properties
broker.id=0
port=9092
zookeeper.connect=master:2181,slave1:2181,slave2:2181
cd /usr/local
scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local
scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local
vi $KAFKA_HOME/config/server.properties
修改broker.id=1。
修改broker.id=2。
4) 啟動Kafka服務
- 在master伺服器上運作下面的指令,nohup,在叢集上終端不輸出啟動日志
cd $KAFKA_HOME/bin
nohup ./kafka-server-start.sh ../config/server.properties &
kafka-server-start.sh ../config/server.properties &
5) 驗證Kafka是否安裝和啟動成功
- 在任意伺服器上運作指令建立Topic“HelloKafka”:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 1 --topic HelloKafka
- 在任意伺服器上運作指令為建立的Topic“HelloKafka”生産一些消息:
kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic HelloKafka
輸入下面的消息内容:
This is DT_Spark!
I’m Rocky!
Life is short, you need Spark!
- 在任意伺服器上運作指令從指定的Topic“HelloKafka”上消費(拉取)消息:
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --from-beginning --topic HelloKafka
過一會兒,你會看到列印的消息内容:
- 在任意伺服器上運作指令檢視所有的Topic名字:
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
- 在任意伺服器上運作指令檢視指定Topic的概況:
kafka-topics.sh --describe --zookeepermaster:2181,slave1:2181,slave2:2181 --topic HelloKafka
至此,代表Kafka已經安裝和配置成功。