天天看點

kafka提高篇總結

Kafka

繼kafka基礎之後再來點進階實用的。順便做一個kafka整理總結。

檢視kafka自身維護偏移量:kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic flink --time -1

1、kafka自定義分區

分析

步驟:
    1,設計一個子類繼承分區父類,重寫其中的partition方法,在該方法中定制分區規則
    2,修改producer.properties檔案,指定自定的分區類
           

源碼以及效果

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class MyPartition implements Partitioner {
    /**
     * 下述方法在消息存儲到相應分區之前,都會被回調一次。
     * 原因:消息得找到自己存儲的所在。
     *
     * @param topic      主題名
     * @param key        消息的key
     * @param keyBytes   key對應的位元組數組
     * @param value      消息的value
     * @param valueBytes value對應的位元組數組
     * @param cluster    kafka分布式叢集
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //思路:
        //①獲得目前主題的分區數
        List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
        int totalPartitionNum = partitionInfos.size();
        //②關于key
        // a)key≠null, key的hash碼值對分區數求餘數,餘數即為目前消息所對應的分區編号
        //b)key=null, 根據value來計算
        //i)value ≠null, value的hash碼值對分區數求餘數,餘數即為目前消息所對應的分區編号
        //ii)value=null, 直接傳回預設的分區編号,如:0.

        if (key != null) {
            return key.hashCode() % totalPartitionNum;
        } else {
            if (value != null) {
                return value.hashCode() % totalPartitionNum;
            } else {
                return 0;
            }
        }
    }

    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

           

2、 消息攔截器

介紹

①在消息發送到kafka分布式叢集之前,針對于一些共通的業務處理,建議使用攔截器。如:對每條消息添加一個共通的字首(如:時間戳),對消息中包含一些反動的資訊進行篩選
②還可以監測到消息發送的狀态(成功,失敗)
③需求:實作一個簡單的雙interceptor組成的攔截鍊。第一個interceptor會在消息發送前将時間戳資訊加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。
④核心api:ProducerIntercepter
           

源碼以及效果

import org.apache.kafka.clients.producer.*;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Properties;

public class InteceptorUsageDemo {
    public static void main(String[] args) {
        //步驟:
        Producer<Integer, String> producer = null;
        try {
            //①準備Properties的執行個體,并将資源目錄下的配置檔案producer.properties中定制的參數封裝進去
            Properties properties = new Properties();
          properties.load(InteceptorUsageDemo.class.getClassLoader().getResourceAsStream("producer.properties"));
            //将攔截器封裝到properties執行個體中,作為參數來建構KafkaProducer
            Collection<String> params = new LinkedList<>();
            Collections.addAll(params, "com.intercepter.TimeIntecepter",
                    "com.intercepter.StatusIntecepter");
            properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, params);
            //②KafkaProducer執行個體的建立
            producer = new KafkaProducer(properties);
            //③通過循環模拟釋出多條消息
            for (int i = 1; i <= 10; i++) {
                //a)準備消息
                ProducerRecord<Integer, String> record = new ProducerRecord<>("flink", i, i + "\t→ 走起!呵呵哒哒...");
                //b) 釋出消息
                producer.send(record, new Callback() {
                    /**
                     * 目前待發送的消息發送完畢後,下述方法會被回調執行
                     *
                     * @param metadata
                     * @param exception
                     */
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.printf("目前的消息對應的主題是:%s,内容是:%s,所在的分區是:%d,偏移量是:%d%n",
                          metadata.topic(), record.value(), metadata.partition(), metadata.offset());
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑤資源釋放
            if (producer != null) {
                producer.close();
            }
        }
    }
}
           
package com.intercepter;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Map;

public class TimeIntecepter implements ProducerInterceptor<Integer, String> {
    /**
     * 每條消息發送之前,下述方法會被執行
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        return new ProducerRecord<Integer, String>(record.topic(), record.partition(),
                record.timestamp(), record.key(),
                new Date() + "→" + record.value(), record.headers());
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

           
package com.intercepter;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

public class StatusIntecepter implements ProducerInterceptor<Integer, String> {
    /**
     * 成功消息數
     */
    private int successCnt;

    /**
     * 失敗消息數
     */
    private int failureCnt;
    @Override
    public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
        return record;
    }
    /**
     * 每次發送完畢一條消息,下述的方法會被回調執行
     * Ack: 應答機制
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //根據參數2來判斷消息是否發送成功
        //=null,成功
        //≠null,失敗
        if (exception == null) {
            successCnt += 1;
        } else {
            failureCnt += 1;
        }
    }

    @Override
    public void close() {
        //顯示結果
        System.out.printf("成功的消息數是:%d,失敗的消息數:%d%n", successCnt, failureCnt);
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
           

1.4、 Kafka與flume的整合

說明

步驟:
1,編輯配置檔案
		# flume-kafka.properties: 用來定制agent的各個元件的行為(source,channel,sink)
		############################################
		# 對各個元件的描述說明
		# 其中a1為agent的名字
		# r1是a1的source的代号名字
		# c1是a1的channel的代号名字
		# k1是a1的sink的代号名字
		############################################
		a1.sources = r1
		a1.sinks = k1
		a1.channels = c1

		# 用于描述source的,類型是netcat網絡,telnet 
		a1.sources.r1.type = netcat
		# source監聽的網絡ip位址和端口号
		a1.sources.r1.bind = NODE01
		a1.sources.r1.port = 44444

		# 用于描述channel,在記憶體中做資料的臨時的存儲
		a1.channels.c1.type = memory
		# 該記憶體中最大的存儲容量,1000個events事件
		a1.channels.c1.capacity = 1000
		# 能夠同時對100個events事件監管事務
		a1.channels.c1.transactionCapacity = 100

		# 用于描述sink,類型是日志格式,用于定制消息釋出方的參數
		a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
		a1.sinks.k1.topic = hive
		a1.sinks.k1.brokerList = NODE01:9092,NODE02:9092,NODE03:9092
		a1.sinks.k1.requiredAcks = 1
		a1.sinks.k1.batchSize = 20

		# 将a1中的各個元件建立關聯關系,将source和sink都指向了同一個channel
		a1.sources.r1.channels = c1
		a1.sinks.k1.channel = c1

	2,開啟flume日志采集服務(背景)~> -Dflume.root.logger=INFO,console
	flume-ng agent  --conf-file flume-kafka.properties --name a1 
	以背景程序的方式啟動:
	nohup flume-ng agent  --conf-file flume-kafka.properties --name a1   > /dev/null    2>&1 &
		 
	3,使用netcat向4444端口寫入
	  > nc NODE01 44444
	    hello are you ready?
		 _____________
			前提: 
			  需要安裝telnet或者是netcat
					運作方式總結:
						一:telnet
							需要在centOS上面安裝telnet(注意:線上安裝方式 yum install telnet)
							啟動flumn-agent
							啟動telnet:
								telnet NODE01 44444
								
						二:netcat(注意:線上安裝方式 yum install -y nc)
							安裝發給大家的nc.xx.rpm
							rpm -ivh nc.xx.rpm-path
							啟動flumn-agent
							啟動nc程序
								nc NODE01 44444
			 ______________

	4,開啟一個kafka消費的程序
	  kafka-console-consumer.sh --topic hive    --zookeeper NODE01:2181,NODE02:2181  --from-beginning
           

1.5 kafka學後總結

1、Segment的概念:
	一個分區被分成相同大小資料條數不相等的Segment,
	每個Segment有多個index檔案和資料檔案組成
	
2、資料的存儲機制(就是面試題中kafka速度為什麼如此之快):
	首先是Broker接收到資料後,将資料放到作業系統的緩存裡(pagecache),
	pagecache會盡可能多的使用空閑記憶體,
	使用sendfile技術盡可能多的減少作業系統和應用程式之間進行重複緩存,
	寫入資料的時候使用順序寫入,寫入資料的速度可達600m/s

3、Consumer怎麼解決負載均衡?(rebalance)
	1)擷取Consumer消費的起始分區号
	2)計算出Consumer要消費的分區數量
	3)用起始分區号的hashCode值模餘分區數

4、資料的分發政策?
	Kafka預設調用自己分區器(DefaultPartitioner),
	也可以自定義分區器,需要實作Partitioner特質,實作partition方法
	
5、Kafka怎麼保證資料不丢失?
	Kafka接收資料後會根據建立的topic指定的副本數來存儲,
	也就是副本機制保證資料的安全性
	
6、Kafka的應用:
	①作為消息隊列的應用在傳統的業務中使用高吞吐、分布式、使得處理大量業務内容輕松自如。
	②作為網際網路行業的日志行為實時分析,比如:實時統計使用者浏覽頁面、搜尋及其他行為,結合實時處理架構使用實作實時監控,或放到 hadoop/離線資料倉庫裡處理。
	③作為一種為外部的持久性日志的分布式系統提供服務。主要利用節點間備份資料,檔案存儲、日志壓縮等功能。
	
	——————
	其他應用場景:
		① 企業内部名額
			對于某些時效性要求較高的名額,如預警名額等,必須在資料變化時
			及時計算并發送資訊
		② 通信服務營運商
			 對于使用者套餐中的剩餘量進行監控,如流量,語音通話,短信
		③ 電商行業
			對于吞吐量特别大和資料變動頻次較高的應用,如電商網站,必須使
			用實時計算來捕捉使用者偏好

7、Kafka元件:
	①每個partition在存儲層面是append log檔案。新消息都會被直接追加到log檔案的尾部,每條消息在log檔案中的位置稱為offset(偏移量)。
	②每條Message包含了以下三個屬性:
	  1° offset	對應類型:long  此消息在一個partition中序号。可以認為offset是partition中Message的id
	  2° MessageSize  對應類型:int。
	  3° data  是message的具體内容。
	③越多的partitions意味着可以容納更多的consumer,有效提升并發消費的能力。
	④總之:業務區分增加topic、資料量大增加partition (副本數<=broker節點數)。
	
8、實時流處理架構如Storm, Spark Streaming如何實作實時處理的,底層封裝了Kafka Stream。
	  若是手動實作實時處理的架構,需要自己使用Kafka Stream 庫。
   <dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-streams</artifactId>
		<version>1.0.2</version>
	</dependency>
	
9、維護消息訂閱方消費的offset的方式有哪些?
	 ①zookeeper ,參數:--zookeeper 
	 ②kafka叢集來維護,參數:--bootstrap-server
		主題名:__consumer_offsets,  預設: 50個分區; 預設的副本數是:1
		若是達到預設的主題__consumer_offsets的分區的ha (高容錯),需要在server.properties檔案中定制預設的副本數:
		default.replication.factor=3
	 ③手動維護偏移量 (一般使用redis存儲偏移量)

 10,幾個問題:
		①每次啟動一個消費者程序(kafka-console-consumer.sh),是一個單獨的程序
		②手動書寫的消費者,可以通過參數來定制是從頭開始消費,還是接力消費。需要指定flg (main: args[])
		③kafka-console-consumer.sh,每次開啟一個消費者程序,有一個預設的消費者組。命名方式是:console-consumer-64328
		④PachCache, SendFile