天天看點

建構實時日志處理平台:從零開始基于Kafka打造高效企業級架構

作者:一個即将退役的碼農

#頭條創作挑戰賽#

我們要實作一些大資料元件的組合,就如同玩樂高玩具一樣,把它們“插”在一起,“拼”成一個更大一點的玩具。

在任何一個企業中,伺服器每天都會産生很多的日志資料。這些資料内容非常豐富,包含了我們的線上業務資料、使用者行為資料以及後端系統資料。實時分析這些資料,能夠幫助我們更快地洞察潛在的趨勢,進而有針對性地做出決策。今天,我們就使用 Kafka 搭建一個這樣的平台。

流處理架構

如果在網上搜尋實時日志流處理,你應該能夠搜到很多教你搭建實時流處理平台做日志分析的教程。這些教程使用的技術棧大多是 Flume+Kafka+Storm、Spark Streaming 或 Flink。特别是 Flume+Kafka+Flink 的組合,逐漸成為了實時日志流處理的标配。不過,要搭建這樣的處理平台,你需要用到 3 個架構才能實作,這既增加了系統複雜度,也提高了運維成本。

今天,我來示範一下如何使用 Apache Kafka 這一個架構,實作一套實時日志流處理系統。換句話說,我使用的技術棧是 Kafka Connect+Kafka Core+Kafka Streams 的組合。

下面這張圖展示了基于 Kafka 的實時日志流處理平台的流程。

建構實時日志處理平台:從零開始基于Kafka打造高效企業級架構

從圖中我們可以看到,日志先從 Web 伺服器被不斷地生産出來,随後被實時送入到 Kafka Connect 元件,Kafka Connect 元件對日志進行處理後,将其灌入 Kafka 的某個主題上,接着發送到 Kafka Streams 元件,進行實時分析。最後,Kafka Streams 将分析結果發送到 Kafka 的另一個主題上。

我在之前的文章簡單介紹過 Kafka Connect 和 Kafka Streams 元件,前者可以實作外部系統與 Kafka 之間的資料互動,而後者可以實時處理 Kafka 主題中的消息。

現在,我們就使用這兩個元件,結合前面學習的所有 Kafka 知識,一起建構一個實時日志分析平台。

Kafka Connect 元件

我們先利用 Kafka Connect 元件收集資料。如前所述,Kafka Connect 元件負責連通 Kafka 與外部資料系統。連接配接外部資料源的元件叫連接配接器(Connector)。常見的外部資料源包括資料庫、KV 存儲、搜尋系統或檔案系統等。

今天我們使用檔案連接配接器(File Connector)實時讀取 Nginx 的 access 日志。假設 access 日志的格式如下:

10.10.13.41 - - [13/Aug/2019:03:46:54 +0800] "GET /v1/open/product_list?user_key=****&user_phone=****&screen_height=1125&screen_width=2436&from_page=1&user_type=2&os_type=ios HTTP/1.0" 200 1321
           

在這段日志裡,請求參數中的 os_type 字段目前有兩個值:ios 和 android。我們的目标是實時計算當天所有請求中 ios 端和 android 端的請求數。

啟動 Kafka Connect

目前,Kafka Connect 支援單機版(Standalone)和叢集版(Cluster),我們用叢集的方式來啟動 Connect 元件。

首先,我們要啟動 Kafka 叢集,假設 Broker 的連接配接位址是 localhost:9092。

啟動好 Kafka 叢集後,我們啟動 Connect 元件。在 Kafka 安裝目錄下有個 config/connect-distributed.properties 檔案,你需要修改下列項:

bootstrap.servers=localhost:9092
rest.host.name=localhost
rest.port=8083
           

第 1 項是指定要連接配接的 Kafka 叢集,第 2 項和第 3 項分别指定 Connect 元件開放的 REST 服務的主機名和端口。儲存這些變更之後,我們運作下面的指令啟動 Connect。

cd kafka_2.12-2.3.0
bin/connect-distributed.sh config/connect-distributed.properties
           

如果一切正常,此時 Connect 應該就成功啟動了。現在我們在浏覽器通路 localhost:8083 的 Connect REST 服務,應該能看到下面的傳回内容:

{"version":"2.3.0","commit":"fc1aaa116b661c8a","kafka_cluster_id":"XbADW3mnTUuQZtJKn9P-hA"}
           

添加 File Connector

看到該 JSON 串,就表明 Connect 已經成功啟動了。此時,我們打開一個終端,運作下面這條指令來檢視一下目前都有哪些 Connector。

$ curl http://localhost:8083/connectors
[]
           

結果顯示,目前我們沒有建立任何 Connector。

現在,我們來建立對應的 File Connector。該 Connector 讀取指定的檔案,并為每一行文本建立一條消息,并發送到特定的 Kafka 主題上。建立指令如下:

$ curl -H "Content-Type:application/json" -H "Accept:application/json" http://localhost:8083/connectors -X POST --data '{"name":"file-connector","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/var/log/access.log","tasks.max":"1","topic":"access_log"}}'
{"name":"file-connector","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","file":"/var/log/access.log","tasks.max":"1","topic":"access_log","name":"file-connector"},"tasks":[],"type":"source"}
           

這條指令本質上是向 Connect REST 服務發送了一個 POST 請求,去建立對應的 Connector。在這個例子中,我們的 Connector 類是 Kafka 預設提供的FileStreamSourceConnector。我們要讀取的日志檔案在 /var/log 目錄下,要發送到 Kafka 的主題名稱為 access_log。

現在,我們再次運作 curl http: // localhost:8083/connectors, 驗證一下剛才的 Connector 是否建立成功了。

$ curl http://localhost:8083/connectors
["file-connector"]
           

顯然,名為 file-connector 的新 Connector 已經建立成功了。如果我們現在使用 Console Consumer 程式去讀取 access_log 主題的話,應該會發現 access.log 中的日志行資料已經源源不斷地向該主題發送了。

如果你的生産環境中有多台機器,操作也很簡單,在每台機器上都建立這樣一個 Connector,隻要保證它們被送入到相同的 Kafka 主題以供消費就行了。

Kafka Streams 元件

資料到達 Kafka 還不夠,我們還需要對其進行實時處理。下面我示範一下如何編寫 Kafka Streams 程式來實時分析 Kafka 主題資料。

我們知道,Kafka Streams 是 Kafka 提供的用于實時流處理的元件。

與其他流處理架構不同的是,它僅僅是一個類庫,用它編寫的應用被編譯打包之後就是一個普通的 Java 應用程式。你可以使用任何部署架構來運作 Kafka Streams 應用程式。

同時,你隻需要簡單地啟動多個應用程式執行個體,就能自動地獲得負載均衡和故障轉移,是以,和 Spark Streaming 或 Flink 這樣的架構相比,Kafka Streams 自然有它的優勢。

下面這張來自 Kafka 官網的圖檔,形象地展示了多個 Kafka Streams 應用程式組合在一起,共同實作流處理的場景。圖中清晰地展示了 3 個 Kafka Streams 應用程式執行個體。一方面,它們形成一個組,共同參與并執行流處理邏輯的計算;另一方面,它們又都是獨立的實體,彼此之間毫無關聯,完全依靠 Kafka Streams 幫助它們發現彼此并進行協作。

建構實時日志處理平台:從零開始基于Kafka打造高效企業級架構

關于 Kafka Streams 的原理,我會在專欄後面進行詳細介紹。今天,我們隻要能夠學會利用它提供的 API 編寫流處理應用,幫我們找到剛剛提到的請求日志中 ios 端和 android 端發送請求數量的占比資料就行了。

編寫流處理應用

要使用 Kafka Streams,你需要在你的 Java 項目中顯式地添加 kafka-streams 依賴。我以最新的 2.3 版本為例,分别示範下 Maven 和 Gradle 的配置方法。

Maven:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.3.0</version>
</dependency>
Gradle:
compile group: 'org.apache.kafka', name: 'kafka-streams', version: '2.3.0'
           

現在,我先給出完整的代碼,然後我會詳細解釋一下代碼中關鍵部分的含義。

package com.geekbang.kafkalearn;
 
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;
 
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
public class OSCheckStreaming {
 
    public static void main(String[] args) {
 
 
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "os-check-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName());
 
        final Gson gson = new Gson();
        final StreamsBuilder builder = new StreamsBuilder();
 
        KStream<String, String> source = builder.stream("access_log");
        source.mapValues(value -> gson.fromJson(value, LogLine.class)).mapValues(LogLine::getPayload)
                .groupBy((key, value) -> value.contains("ios") ? "ios" : "android")
                .windowedBy(TimeWindows.of(Duration.ofSeconds(2L)))
                .count()
                .toStream()
                .to("os-check", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class), Serdes.Long()));
 
        final Topology topology = builder.build();
 
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);
 
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
 
        try {
            streams.start();
            latch.await();
        } catch (Exception e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
 
 
class LogLine {
    private String payload;
    private Object schema;
 
    public String getPayload() {
        return payload;
    }
}
           

這段代碼會實時讀取 access_log 主題,每 2 秒計算一次 ios 端和 android 端請求的總數,并把這些資料寫入到 os-check 主題中。

首先,我們構造一個 Properties 對象。這個對象負責初始化 Streams 應用程式所需要的關鍵參數設定。比如,在上面的例子中,我們設定了 bootstrap.servers 參數、application.id 參數以及預設的序列化器(Serializer)和解序列化器(Deserializer)。

bootstrap.servers 參數你應該已經很熟悉了,我就不多講了。這裡的 application.id 是 Streams 程式中非常關鍵的參數,你必須要指定一個叢集範圍内唯一的字元串來辨別你的 Kafka Streams 程式。序列化器和解序列化器設定了預設情況下 Streams 程式執行序列化和反序列化時用到的類。在這個例子中,我們設定的是 String 類型,這表示,序列化時會将 String 轉換成位元組數組,反序列化時會将位元組數組轉換成 String。

建構好 Properties 執行個體之後,下一步是建立 StreamsBuilder 對象。稍後我們會用這個 Builder 去實作具體的流處理邏輯。

在這個例子中,我們實作了這樣的流計算邏輯:每 2 秒去計算一下 ios 端和 android 端各自發送的總請求數。還記得我們的原始資料長什麼樣子嗎?它是一行 Nginx 日志,隻不過 Connect 元件在讀取它後,會把它包裝成 JSON 格式發送到 Kafka,是以,我們需要借助 Gson 來幫助我們把 JSON 串還原為 Java 對象,這就是我在代碼中建立 LogLine 類的原因。

代碼中的 mapValues 調用将接收到的 JSON 串轉換成 LogLine 對象,之後再次調用 mapValues 方法,提取出 LogLine 對象中的 payload 字段,這個字段儲存了真正的日志資料。這樣,經過兩次 mapValues 方法調用之後,我們成功地将原始資料轉換成了實際的 Nginx 日志行資料。

值得注意的是,代碼使用的是 Kafka Streams 提供的 mapValues 方法。顧名思義,這個方法就是隻對消息體(Value)進行轉換,而不變更消息的鍵(Key)。

其實,Kafka Streams 也提供了 map 方法,允許你同時修改消息 Key。通常來說,我們認為mapValues 要比 map 方法更高效,因為 Key 的變更可能導緻下遊處理算子(Operator)的重分區,降低性能。如果可能的話最好盡量使用 mapValues 方法。

拿到真實日志行資料之後,我們調用 groupBy 方法進行統計計數。由于我們要統計雙端(ios 端和 android 端)的請求數,是以,我們 groupBy 的 Key 是 ios 或 android。在上面的那段代碼中,我僅僅依靠日志行中是否包含特定關鍵字的方式來确定是哪一端。更正宗的做法應該是,分析 Nginx 日志格式,提取對應的參數值,也就是 os_type 的值。

做完 groupBy 之後,我們還需要限定要統計的時間視窗範圍,即我們統計的雙端請求數是在哪個時間視窗内計算的。在這個例子中,我調用了windowedBy 方法,要求 Kafka Streams 每 2 秒統計一次雙端的請求數。設定好了時間視窗之後,下面就是調用count 方法進行統計計數了。

這一切都做完了之後,我們需要調用toStream 方法将剛才統計出來的表(Table)轉換成事件流,這樣我們就能實時觀測它裡面的内容。我會在專欄的最後幾講中解釋下流處理領域内的流和表的概念以及它們的差別。這裡你隻需要知道 toStream 是将一個 Table 變成一個 Stream 即可。

最後,我們調用to 方法将這些時間視窗統計資料不斷地寫入到名為 os-check 的 Kafka 主題中,進而最終實作我們對 Nginx 日志進行實時分析處理的需求。

啟動流處理應用

由于 Kafka Streams 應用程式就是普通的 Java 應用,你可以用你熟悉的方式對它進行編譯、打包和部署。本例中的 OSCheckStreaming.java 就是一個可執行的 Java 類,是以直接運作它即可。如果一切正常,它會将統計資料源源不斷地寫入到 os-check 主題。

檢視統計結果

如果我們想要檢視統計的結果,一個簡單的方法是使用 Kafka 自帶的 kafka-console-consumer 腳本。指令如下:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic os-check --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true --property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer --property key.deserializer.default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes\$StringSerde
[android@1565743788000/9223372036854775807] 1522
[ios@1565743788000/9223372036854775807] 478
[ios@1565743790000/9223372036854775807] 1912
[android@1565743790000/9223372036854775807] 5313
[ios@1565743792000/9223372036854775807] 780
[android@1565743792000/9223372036854775807] 1949
[android@1565743794000/9223372036854775807] 37
……
           

由于我們統計的結果是某個時間視窗範圍内的,是以承載這個統計結果的消息的 Key 封裝了該時間視窗資訊,具體格式是:[ios 或 android@開始時間 / 結束時間],而消息的 Value 就是一個簡單的數字,表示這個時間視窗内的總請求數。

如果把上面 ios 相鄰輸出行中的開始時間相減,我們就會發現,它們的确是每 2 秒輸出一次,每次輸出會同時計算出 ios 端和 android 端的總請求數。接下來,你可以訂閱這個 Kafka 主題,将結果實時導出到你期望的其他資料存儲上。

小結

至此,基于 Apache Kafka 的實時日志流處理平台就簡單搭建完成了。在搭建的過程中,我們隻使用 Kafka 這一個大資料架構就完成了所有元件的安裝、配置和代碼開發。比起 Flume+Kafka+Flink 這樣的技術棧,純 Kafka 的方案在運維和管理成本上有着極大的優勢。如果你打算從 0 建構一個實時流處理平台,不妨試一下 Kafka Connect+Kafka Core+Kafka Streams 的組合。