Apache Kafka 入門
Apache Kafka 入門大概分為5篇部落格,内容都比較基礎,計劃包含以下内容:
- Kafka的基本配置和運作
- Kafka指令詳細介紹
- Kafka-manager的基本配置和運作
- Kafka API 簡單用法
- Spring Boot 內建Kafka
Kafka支援Linux和WIndows環境,本文運作環境使用Linux(CentOS)。
本篇為第二篇。
Kafka指令行詳細介紹
常用的幾個指令如下:
- kafka-server-start.sh
- kafka-console-consumer.sh
- kafka-console-producer.sh
- kafka-topics.sh
在這幾個指令中,第一個僅用于啟動Kafka,後兩個console常用于測試,用途最多的是最後一個指令,是以下面指令中主要介紹的就是 kafka-topics.sh。
kafka-server-start.sh
用法:
> bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
這個指令後面可以有多個參數,第一個是可選參數,該參數可以讓目前指令以背景服務方式執行,第二個必須是 Kafka 的配置檔案。後面還可以有多個
--override
開頭的參數,其中的
property
可以是Broker Configs中提供的所有參數。這些額外的參數會覆寫配置檔案中的設定。
例如下面使用同一個配置檔案,通過參數覆寫啟動多個Broker。
> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1 --override listeners=PLAINTEXT://:9092 --override advertised.listeners=PLAINTEXT://192.168.16.150:9092
> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=1 --override log.dirs=/tmp/kafka-logs-2 --override listeners=PLAINTEXT://:9093 --override advertised.listeners=PLAINTEXT://192.168.16.150:9093
上面這種用法隻是用于示範,真正要啟動多個Broker 應該針對不同的 Broker 建立相應的 server.properties 配置。
kafka-console-consumer.sh
這個指令隻是簡單的将消息輸出到标準輸出中,該指令支援的參數如下。
option Description
------ -----------
--blacklist <String: blacklist> Blacklist of topics to exclude from
consumption.
--bootstrap-server <String: server to REQUIRED (unless old consumer is
connect to> used): The server to connect to.
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--metrics-dir <String: metrics If csv-reporter-enable is set, and
directory> this parameter isset, the csv
metrics will be outputed here
--new-consumer Use the new consumer implementation.
This is the default.
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
--property <String: prop> The properties to initialize the
message formatter.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--whitelist <String: whitelist> Whitelist of topics to include for
consumption.
--zookeeper <String: urls> REQUIRED (only when using old
consumer): The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to
--bootstrap-server
必須指定,通常
--topic
也要指定檢視的主題。如果想要從頭檢視消息,還可以指定
--from-beginning
參數。一般使用的指令如下。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
還可以通過下面的指令指定分區檢視:
>> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0
kafka-console-producer.sh
這個指令可以将檔案或标準輸入的内容發送到Kafka叢集。該指令參數如下。
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: 200)
--broker-list <String: broker-list> REQUIRED: The broker list string in
the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', or 'lz4'.If
specified without value, then it
defaults to 'gzip'
--key-serializer <String: The class name of the message encoder
encoder_class> implementation to use for
serializing keys. (default: kafka.
serializer.DefaultEncoder)
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
tools.
ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request
(default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. (default: 33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a
memory in bytes per partition> partition. When records are received
which are smaller than this size the
producer will attempt to
optimistically group them together
until this size is reached.
(default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. (default: 300000)
--old-producer Use the old producer implementation.
--producer-property <String: A mechanism to pass user-defined
producer_prop> properties in the form key=value to
the producer.
--producer.config <String: config file> Producer config properties file. Note
that [producer-property] takes
precedence over this config.
--property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--queue-enqueuetimeout-ms <Integer: Timeout for event enqueue (default:
queue enqueuetimeout ms> 2147483647)
--queue-size <Integer: queue_size> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of messages will
queue awaiting sufficient batch
size. (default: 10000)
--request-required-acks <String: The required acks of the producer
request required acks> requests (default: 1)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: 1500)
--retry-backoff-ms <Integer> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms.
(default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce
messages to.
--value-serializer <String: The class name of the message encoder
encoder_class> implementation to use for
serializing values. (default: kafka.
serializer.DefaultEncoder)
其中
--broker-list
和
--topic
是兩個必須提供的參數。
常用指令如下。
使用标準輸入方式。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
從檔案讀取:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt
kafka-topics.sh
相比上面幾個偶爾使用的指令來說,kafka-topics.sh 相對就比較重要。該指令包含以下參數。
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option).
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting
topics, the action will only execute
if the topic exists
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist
--list List all available topics.
--partitions <Integer: # of partitions> 正在建立或更改主題的分區數
(警告:如果為具有密鑰的主題
(分區)增加了分區
消息的邏輯或排序将受到影響
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: 正在建立的主題中每個分區的複制因子。
replication factor>
--topic <String: topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to
下面是幾種常用的 topic 指令。
描述主題的配置
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
設定保留時間
# Deprecated way
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --config retention.ms=1000
# Modern way
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000
如果您需要删除主題中的所有消息,則可以利用保留時間。首先将保留時間設定為非常低(1000 ms),等待幾秒鐘,然後将保留時間恢複為上一個值。
注意:預設保留時間為24小時(86400000毫秒)。
删除主題
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
注意:需要在Broker的配置檔案server.properties中配置 delete.topic.enable=true 才能删除主題。
主題資訊
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
添加分區
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3
建立主題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
topic 相關内容來源:http://ronnieroller.com/kafka/cheat-sheet
指令那麼多,怎麼記?
Kafka 的指令行工具提供了非常豐富的提示資訊,是以隻需要記住上面大概的幾個用法,知道怎麼寫就行。當需要用到某個指令時,通過指令提示進行操作。
比如說,如何使用 kafka-configs.sh 檢視主題(Topic)的配置?
首先,在指令行中輸入
bin/kafka-configs.sh
,然後或輸出下面的指令提示資訊。
Add/Remove entity config for a topic, client, user or broker
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add. Square brackets
can be used to group values which contain commas:
'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a
list of valid configurations: For entity_type
'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity_type 'brokers':
follower.replication.throttled.rate
leader.replication.throttled.rate
For entity_type 'users':
producer_byte_rate
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
For entity_type 'clients':
producer_byte_rate
consumer_byte_rate
Entity types 'users' and 'clients' may be specified
together to update config for clients of a
specific user.
--alter Alter the configuration for the entity.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for clients/users (applies to
corresponding entity type in command line)
--entity-name <String> Name of entity (topic name/client id/user principal
name/broker id)
--entity-type <String> Type of entity (topics/clients/users/brokers)
--force Suppress console prompts
--help Print usage information.
--zookeeper <String: urls> REQUIRED: The connection string for the zookeeper
connection in the form host:port. Multiple URLS
can be given to
從第一行可以看到這個指令可以修改 topic, client, user 或 broker 的配置。
如果要設定 topic,就需要設定
entity-type
為
topics
,輸入如下指令:
> bin/kafka-configs.sh --entity-type topics
Command must include exactly one action: --describe, --alter
指令提示需要指定一個操作(不隻是上面提示的兩個操作),增加
--describe
試試:
> bin/kafka-configs.sh --entity-type topics --describe
[root@localhost kafka_2.11-0.10.2.1]# bin/kafka-configs.sh --entity-type topics --describe
Missing required argument "[zookeeper]"
繼續增加
--zookeeper
:
> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
由于沒有指定主題名,這裡顯示了
__consumer_offsets
的資訊。下面指定一個topic試試。
> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181 --entity-name test
Configs for topic 'test' are