參考,
<a href="https://cwiki.apache.org/confluence/display/KAFKA/System+Tools">https://cwiki.apache.org/confluence/display/KAFKA/System+Tools</a>
<a href="https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools">https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools</a>
<a href="http://kafka.apache.org/documentation.html#quickstart">http://kafka.apache.org/documentation.html#quickstart</a>
<a href="http://kafka.apache.org/documentation.html#operations">http://kafka.apache.org/documentation.html#operations</a>
為了便于使用,kafka提供了比較強大的Tools,把經常需要使用的整理一下
開關kafka Server
topic相關
describe topic的詳細情況
修改topic的partition,隻能增加
到0.8.2才正式支援删除topic,目前是beta版
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
檢視有問題的partition
叢集擴充
叢集擴充,對于broker還是比較簡單的,但是現有的topic上的partition是不會做自動遷移的
需要手工做遷移,但kafka提供了比較友善的工具,
--generate,生成參考的遷移計劃
given a list of topics and a list of brokers,工具會給出遷徙方案
把topic完全遷移到新的brokers
<a></a>
給出目前的assignment情況和,遷移方案
我們可以同時儲存目前的assignment情況和遷移方案,目前的assignment情況可以用于rollback
--execute,開始執行遷移
--verify,check目前的遷移狀态
選擇topic的某個partition的某些replica進行遷徙
moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3
brokers下線
目前版本不支援下線的規劃,需要到0.8.2才支援,這需要把一個broker上的replica清空
增加replication factor
partition 0的replica數從1增長到3,目前replica存在broker5,在broker6,7上增加replica
Producer console
後面可以任意的輸入message,都會發到broker的topic中
Comsumer console
從頭讀這個topic,可以重複讀到所有資料
我在想為啥,每次都能replay,原來每次都是随機産生一個groupid
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
Consumer Offset Checker
這個會顯示出consumer group的offset情況, 必須參數為--group, 不指定--topic,預設為所有topic
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group
<code>bin/kafka-run-</code><code>class</code><code>.sh kafka.tools.ConsumerOffsetChecker</code>
required argument: [group]
Option Description
------ -----------
--broker-info Print broker info
--group Consumer group.
--help Print this message.
--topic Comma-separated list of consumer
topics (all topics if absent).
Example,
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
Export Zookeeper Offsets
将Zk中的offset資訊以下面的形式打到file裡面去
A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:
/consumers/group1/offsets/topic1/1-0:286894308
/consumers/group1/offsets/topic1/2-0:284803985
<code>bin/kafka-run-</code><code>class</code><code>.sh kafka.tools.ExportZkOffsets</code>
required argument: [zkconnect]
--output-file Output file
Update Offsets In Zookeeper
這個挺有用,用于replay, kafka的文檔有點坑爹,看了不知道咋用,還是看源碼才看明白
A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
<code>bin/kafka-run-</code><code>class</code><code>.sh kafka.tools.UpdateOffsetsInZK</code>
USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
Example,
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已經被清0,Lag=logSize
更加直接的方式是,直接去Zookeeper裡面看
通過zkCli.sh連上後,通過ls檢視
Broker Node Registry
Broker Topic Registry
Consumer Id Registry
Consumer Offset Tracking
Partition Owner registry