天天看點

Kafka簡單示例

1.概述

在《Kafka實戰-簡單示例》一文中給大家介紹來Kafka的簡單示例,示範了如何編寫Kafka的代碼去生産資料和消費資料,今天給大家介紹如何去整 合一個完整的項目,本篇部落格我打算為大家介紹Flume+Kafka+Storm的實時日志統計,由于涉及的内容較多,這裡先給大家梳理一個項目的運用這 些技術的流程。下面是今天的内容目錄:

項目流程

Flume

Kafka

Storm

下面開始今天的内容分享。

2.項目流程

在整合這套方案的時候,項目組也是經過一番讨論,在讨論中,觀點很多,有人認為直接使用Storm進行實時處理,去掉Kafka環節;也有認為直接使用Kafka的API去消費,去掉Storm的消費環節等等,但是最終組内還是一緻決定使用這套方案,原因有如下幾點:

業務子產品化

功能元件化

我們認為,Kafka在整個環節中充當的職責應該單一,這項目的整個環節她就是一個中間件,下面用一個圖來說明這個原因,如下圖所示:

Kafka實戰-實時日志統計流程

整個項目流程如上圖所示,這樣劃分使得各個業務子產品化,功能更加的清晰明了。

Data Collection

負責從各個節點上實時收集使用者上報的日志資料,我們選用的是Apache的Flume NG來實作。

Data Access

由于收集的資料的速度和資料處理的速度不一定是一緻的,是以,這裡添加了一個中間件來做處理,所使用的是Apache的Kafka,關于Kafka叢集部署,大家可以參考我寫的《 Kafka實戰-Kafka Cluster 》。另外,有一部分資料是流向HDFS分布式檔案系統了的,友善于為離線統計業務提供資料源。

Stream Computing

在收集到資料後,我們需要對這些資料做實時處理,所選用的是Apache的Storm。關于Storm的叢集搭建部署部落格後面補上,較為簡單。

Data Output

在使用Storm對資料做處理後,我們需要将處理後的結果做持久化,由于對相應速度要求較高,這裡采用Redis+MySQL來做持久化。整個項目的流程架構圖,如下圖所示:

Kafka實戰-實時日志統計流程

3.Flume

Flume是一個分布式的、高可用的海量日志收集、聚合和傳輸日志收集系統,支援在日志系統中定制各類資料發送方(如:Kafka,HDFS 等),便于收集資料。Flume提供了豐富的日志源收集類型,有:Console、RPC、Text、Tail、Syslog、Exec等資料源的收集, 在我們的日志系統中目前我們所使用的是spooldir方式進行日志檔案采集,配置内容資訊如下所示:

producer.sources.s.type = spooldir

producer.sources.s.spoolDir = /home/hadoop/dir/logdfs

當然,Flume的資料發送方類型也是多種類型的,有:Console、Text、HDFS、RPC等,這裡我們系統所使用的是Kafka中間件來接收,配置内容如下所示:

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=test

關于,Flume的詳細搭建部署,大家可以參考我寫的《 高可用Hadoop平台-Flume NG實戰圖解篇 》。這裡就不多做贅述了。

4.Kafka

Kafka是一種提供高吞吐量的分布式釋出訂閱消息系統,她的特性如下所示:

通過磁盤資料結構提供消息的持久化,這種結構對于即使資料達到TB+級别的消息,存儲也能夠保持長時間的穩定。

搞吞吐特性使得Kafka即使使用普通的機器硬體,也可以支援每秒數10W的消息。

能夠通過Kafka Cluster和Consumer Cluster來Partition消息。

Kafka的目的是提供一個釋出訂閱解決方案,他可以處理Consumer網站中的所有流動資料,在網頁浏覽,搜尋以及使用者的一些行為,這些動作 是較為關鍵的因素。這些資料通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。對于Hadoop這樣的日志資料和離線計算系統,這樣的方案是一個解 決實時處理較好的一種方案。

關于Kafka叢集的搭建部署和使用,大家可以參考我寫的:《 Kafka實戰-Kafka Cluster 》,這裡就不多做贅述了。

5.Storm

Twitter将Storm開源了,這是一個分布式的、容錯的實時計算系統,已被貢獻到Apache基金會,下載下傳位址如下所示:

http://storm.apache.org/downloads.html

Storm的主要特點如下:

簡單的程式設計模型。類似于MapReduce降低了并行批處理複雜性,Storm降低了進行實時處理的複雜性。

可以使用各種程式設計語言。你可以在Storm之上使用各種程式設計語言。預設支援Clojure、Java、Ruby和Python。要增加對其他語言的支援,隻需實作一個簡單的Storm通信協定即可。

容錯性。Storm會管理工作程序和節點的故障。

水準擴充。計算是在多個線程、程序和伺服器之間并行進行的。

可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。

快速。系統的設計保證了消息能得到快速的處理,使用ØMQ作為其底層消息隊列。

本地模式。Storm有一個本地模式,可以在處理過程中完全模拟Storm叢集。這讓你可以快速進行開發和單元測試。

Storm叢集由一個主節點和多個工作節點組成。主節點運作了一個名為“Nimbus”的守護程序,用于配置設定代碼、布置任務及故障檢測。每個工作 節 點都運作了一個名為“Supervisor”的守護程序,用于監聽工作,開始并終止工作程序。Nimbus和Supervisor都能快速失敗,而且是無 狀态的,這樣一來它們就變得十分健壯,兩者的協調工作是由Apache的ZooKeeper來完成的。

Storm的術語包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被處理的資料。Spout是資料源。Bolt處理資料。Task是運作于Spout或Bolt中的 線程。Worker是運作這些線程的程序。Stream Grouping規定了Bolt接收什麼東西作為輸入資料。資料可以随機配置設定(術語為Shuffle),或者根據字段值配置設定(術語為Fields),或者 廣播(術語為All),或者總是發給一個Task(術語為Global),也可以不關心該資料(術語為None),或者由自定義邏輯來決定(術語為 Direct)。Topology是由Stream Grouping連接配接起來的Spout和Bolt節點網絡。在Storm Concepts頁面裡對這些術語有更詳細的描述。

關于Storm叢集的搭建部署,部落格在下一篇中更新,到時候會将更新位址附在這裡,這裡就先不對Storm叢集的搭建部署做過多的贅述了。

6.總結

這裡就是為大家介紹的Flume+Kafka+Storm的整體流程,後續會給大家用一個項目案例來實踐示範這個流程,包括具體的各個子產品的編碼實踐。今天大家可以先熟悉下實時計算項目的流程開發。