Kafka
繼kafka基礎之後再來點進階實用的。順便做一個kafka整理總結。
檢視kafka自身維護偏移量:kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list master:9092 --topic flink --time -1
1、kafka自定義分區
分析
步驟:
1,設計一個子類繼承分區父類,重寫其中的partition方法,在該方法中定制分區規則
2,修改producer.properties檔案,指定自定的分區類
源碼以及效果
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public class MyPartition implements Partitioner {
/**
* 下述方法在消息存儲到相應分區之前,都會被回調一次。
* 原因:消息得找到自己存儲的所在。
*
* @param topic 主題名
* @param key 消息的key
* @param keyBytes key對應的位元組數組
* @param value 消息的value
* @param valueBytes value對應的位元組數組
* @param cluster kafka分布式叢集
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//思路:
//①獲得目前主題的分區數
List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
int totalPartitionNum = partitionInfos.size();
//②關于key
// a)key≠null, key的hash碼值對分區數求餘數,餘數即為目前消息所對應的分區編号
//b)key=null, 根據value來計算
//i)value ≠null, value的hash碼值對分區數求餘數,餘數即為目前消息所對應的分區編号
//ii)value=null, 直接傳回預設的分區編号,如:0.
if (key != null) {
return key.hashCode() % totalPartitionNum;
} else {
if (value != null) {
return value.hashCode() % totalPartitionNum;
} else {
return 0;
}
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
2、 消息攔截器
介紹
①在消息發送到kafka分布式叢集之前,針對于一些共通的業務處理,建議使用攔截器。如:對每條消息添加一個共通的字首(如:時間戳),對消息中包含一些反動的資訊進行篩選
②還可以監測到消息發送的狀态(成功,失敗)
③需求:實作一個簡單的雙interceptor組成的攔截鍊。第一個interceptor會在消息發送前将時間戳資訊加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。
④核心api:ProducerIntercepter
源碼以及效果
import org.apache.kafka.clients.producer.*;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Properties;
public class InteceptorUsageDemo {
public static void main(String[] args) {
//步驟:
Producer<Integer, String> producer = null;
try {
//①準備Properties的執行個體,并将資源目錄下的配置檔案producer.properties中定制的參數封裝進去
Properties properties = new Properties();
properties.load(InteceptorUsageDemo.class.getClassLoader().getResourceAsStream("producer.properties"));
//将攔截器封裝到properties執行個體中,作為參數來建構KafkaProducer
Collection<String> params = new LinkedList<>();
Collections.addAll(params, "com.intercepter.TimeIntecepter",
"com.intercepter.StatusIntecepter");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, params);
//②KafkaProducer執行個體的建立
producer = new KafkaProducer(properties);
//③通過循環模拟釋出多條消息
for (int i = 1; i <= 10; i++) {
//a)準備消息
ProducerRecord<Integer, String> record = new ProducerRecord<>("flink", i, i + "\t→ 走起!呵呵哒哒...");
//b) 釋出消息
producer.send(record, new Callback() {
/**
* 目前待發送的消息發送完畢後,下述方法會被回調執行
*
* @param metadata
* @param exception
*/
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.printf("目前的消息對應的主題是:%s,内容是:%s,所在的分區是:%d,偏移量是:%d%n",
metadata.topic(), record.value(), metadata.partition(), metadata.offset());
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//⑤資源釋放
if (producer != null) {
producer.close();
}
}
}
}
package com.intercepter;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Map;
public class TimeIntecepter implements ProducerInterceptor<Integer, String> {
/**
* 每條消息發送之前,下述方法會被執行
* @param record
* @return
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
return new ProducerRecord<Integer, String>(record.topic(), record.partition(),
record.timestamp(), record.key(),
new Date() + "→" + record.value(), record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.intercepter;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class StatusIntecepter implements ProducerInterceptor<Integer, String> {
/**
* 成功消息數
*/
private int successCnt;
/**
* 失敗消息數
*/
private int failureCnt;
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
return record;
}
/**
* 每次發送完畢一條消息,下述的方法會被回調執行
* Ack: 應答機制
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
//根據參數2來判斷消息是否發送成功
//=null,成功
//≠null,失敗
if (exception == null) {
successCnt += 1;
} else {
failureCnt += 1;
}
}
@Override
public void close() {
//顯示結果
System.out.printf("成功的消息數是:%d,失敗的消息數:%d%n", successCnt, failureCnt);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
1.4、 Kafka與flume的整合
說明
步驟:
1,編輯配置檔案
# flume-kafka.properties: 用來定制agent的各個元件的行為(source,channel,sink)
############################################
# 對各個元件的描述說明
# 其中a1為agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 用于描述source的,類型是netcat網絡,telnet
a1.sources.r1.type = netcat
# source監聽的網絡ip位址和端口号
a1.sources.r1.bind = NODE01
a1.sources.r1.port = 44444
# 用于描述channel,在記憶體中做資料的臨時的存儲
a1.channels.c1.type = memory
# 該記憶體中最大的存儲容量,1000個events事件
a1.channels.c1.capacity = 1000
# 能夠同時對100個events事件監管事務
a1.channels.c1.transactionCapacity = 100
# 用于描述sink,類型是日志格式,用于定制消息釋出方的參數
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hive
a1.sinks.k1.brokerList = NODE01:9092,NODE02:9092,NODE03:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# 将a1中的各個元件建立關聯關系,将source和sink都指向了同一個channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2,開啟flume日志采集服務(背景)~> -Dflume.root.logger=INFO,console
flume-ng agent --conf-file flume-kafka.properties --name a1
以背景程序的方式啟動:
nohup flume-ng agent --conf-file flume-kafka.properties --name a1 > /dev/null 2>&1 &
3,使用netcat向4444端口寫入
> nc NODE01 44444
hello are you ready?
_____________
前提:
需要安裝telnet或者是netcat
運作方式總結:
一:telnet
需要在centOS上面安裝telnet(注意:線上安裝方式 yum install telnet)
啟動flumn-agent
啟動telnet:
telnet NODE01 44444
二:netcat(注意:線上安裝方式 yum install -y nc)
安裝發給大家的nc.xx.rpm
rpm -ivh nc.xx.rpm-path
啟動flumn-agent
啟動nc程序
nc NODE01 44444
______________
4,開啟一個kafka消費的程序
kafka-console-consumer.sh --topic hive --zookeeper NODE01:2181,NODE02:2181 --from-beginning
1.5 kafka學後總結
1、Segment的概念:
一個分區被分成相同大小資料條數不相等的Segment,
每個Segment有多個index檔案和資料檔案組成
2、資料的存儲機制(就是面試題中kafka速度為什麼如此之快):
首先是Broker接收到資料後,将資料放到作業系統的緩存裡(pagecache),
pagecache會盡可能多的使用空閑記憶體,
使用sendfile技術盡可能多的減少作業系統和應用程式之間進行重複緩存,
寫入資料的時候使用順序寫入,寫入資料的速度可達600m/s
3、Consumer怎麼解決負載均衡?(rebalance)
1)擷取Consumer消費的起始分區号
2)計算出Consumer要消費的分區數量
3)用起始分區号的hashCode值模餘分區數
4、資料的分發政策?
Kafka預設調用自己分區器(DefaultPartitioner),
也可以自定義分區器,需要實作Partitioner特質,實作partition方法
5、Kafka怎麼保證資料不丢失?
Kafka接收資料後會根據建立的topic指定的副本數來存儲,
也就是副本機制保證資料的安全性
6、Kafka的應用:
①作為消息隊列的應用在傳統的業務中使用高吞吐、分布式、使得處理大量業務内容輕松自如。
②作為網際網路行業的日志行為實時分析,比如:實時統計使用者浏覽頁面、搜尋及其他行為,結合實時處理架構使用實作實時監控,或放到 hadoop/離線資料倉庫裡處理。
③作為一種為外部的持久性日志的分布式系統提供服務。主要利用節點間備份資料,檔案存儲、日志壓縮等功能。
——————
其他應用場景:
① 企業内部名額
對于某些時效性要求較高的名額,如預警名額等,必須在資料變化時
及時計算并發送資訊
② 通信服務營運商
對于使用者套餐中的剩餘量進行監控,如流量,語音通話,短信
③ 電商行業
對于吞吐量特别大和資料變動頻次較高的應用,如電商網站,必須使
用實時計算來捕捉使用者偏好
7、Kafka元件:
①每個partition在存儲層面是append log檔案。新消息都會被直接追加到log檔案的尾部,每條消息在log檔案中的位置稱為offset(偏移量)。
②每條Message包含了以下三個屬性:
1° offset 對應類型:long 此消息在一個partition中序号。可以認為offset是partition中Message的id
2° MessageSize 對應類型:int。
3° data 是message的具體内容。
③越多的partitions意味着可以容納更多的consumer,有效提升并發消費的能力。
④總之:業務區分增加topic、資料量大增加partition (副本數<=broker節點數)。
8、實時流處理架構如Storm, Spark Streaming如何實作實時處理的,底層封裝了Kafka Stream。
若是手動實作實時處理的架構,需要自己使用Kafka Stream 庫。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.2</version>
</dependency>
9、維護消息訂閱方消費的offset的方式有哪些?
①zookeeper ,參數:--zookeeper
②kafka叢集來維護,參數:--bootstrap-server
主題名:__consumer_offsets, 預設: 50個分區; 預設的副本數是:1
若是達到預設的主題__consumer_offsets的分區的ha (高容錯),需要在server.properties檔案中定制預設的副本數:
default.replication.factor=3
③手動維護偏移量 (一般使用redis存儲偏移量)
10,幾個問題:
①每次啟動一個消費者程序(kafka-console-consumer.sh),是一個單獨的程序
②手動書寫的消費者,可以通過參數來定制是從頭開始消費,還是接力消費。需要指定flg (main: args[])
③kafka-console-consumer.sh,每次開啟一個消費者程序,有一個預設的消費者組。命名方式是:console-consumer-64328
④PachCache, SendFile