天天看點

Python 使用python-kafka類庫開發kafka生産者&消費者&用戶端

使用python-kafka類庫開發kafka生産者&消費者&用戶端

  By: 授客 QQ:1033553122

1.測試環境

python 3.4

zookeeper-3.4.13.tar.gz

下載下傳位址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

下載下傳位址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

kafka_2.12-2.1.0.tgz

http://kafka.apache.org/downloads.html

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

pip-18.1.tar.gz

下載下傳位址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

說明:實踐中發現,pip版本比較舊的話,沒法安裝whl檔案

kafka_python-1.4.4-py2.py3-none-any.whl

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

https://www.lfd.uci.edu/~gohlke/pythonlibs/

說明:

kafka-python支援gzip壓縮/解壓縮。如果要消費lz4方式壓縮的消息,則需要安裝python-lz4,如果要支援snappy方式壓縮/解壓縮則需要安裝,否則可能會報錯:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

建構生産者對象時,可通過compression_type 參數指定由對應生産者生産的消息資料的壓縮方式,或者在producer.properties配置中配置compression.type參數。

參考連結:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

2.代碼實踐

#-*- encoding:utf-8 -*-

__author__ = 'shouke'

from kafka import KafkaProducer

import json

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

# Block直到單條消息發送完或者逾時

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

# Block直到所有阻塞的消息發送到網絡

# 注意: 該操作不保證傳輸或者消息發送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms

# 序列化json資料

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

# 序列化字元串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

# 消息記錄攜帶header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

# 擷取性能資料(注意,實踐發現分區較多的情況下,該操作比較耗時

metrics = producer.metrics()

print(metrics)

producer.flush()

實踐中遇到錯誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:

進入到配置目錄(config),編輯server.properties檔案,

查找并設定listener,配置監聽端口,格式:listeners = listener_name://host_name:port,供kafka用戶端連接配接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

API及常用參數說明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字元串,或者由'host[:port]'組成的字元串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka叢集中的單台伺服器)位址,預設值為 localhost, port預設值為9092,這裡可以不用填寫所有broker的host和port,但必須保證至少有一個broker)

key_serializer (可調用對象) –用于轉換使用者提供的key值為位元組,必須傳回位元組資料。 如果為None,則等同調用f(key)。 預設值: None.

value_serializer(可調用對象) – 用于轉換使用者提供的value消息值為位元組,必須傳回位元組資料。 如果為None,則等同調用f(value)。 預設值: None.

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 設定消息将要釋出到的主題,即消息所屬主題

value(可選) – 消息内容,必須為位元組資料,或者通過value_serializer序列化後的位元組資料。如果為None,則key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)

partition (int, 可選) – 指定分區。如果未設定,則使用配置的partitioner

key (可選) – 和消息對應的key,可用于決定消息發送到哪個分區。如果平partition為None,則相同key的消息會被釋出到相同分區(但是如果key為None,則随機選取分區)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為位元組資料或者通過配置的key_serializer序列化後的位元組資料.

headers (可選) – 設定消息header,header-value鍵值對表示的list。list項為元組:格式 (str_header,bytes_value)

timestamp_ms (int, 可選) –毫秒數 (從1970 1月1日 UTC算起) ,作為消息時間戳。預設為目前時間

函數傳回FutureRecordMetadata類型的RecordMetadata資料

flush(timeout=None)

發送所有可以立即擷取的緩沖消息(即時linger_ms大于0),線程block直到這些記錄發送完成。當一個線程等待flush調用完成而block時,其它線程可以繼續發送消息。

注意:flush調用不保證記錄發送成功

metrics(raw=False)

擷取生産者性能名額。

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

注:生産者代碼是線程安全的,支援多線程,而消費者則不然

from kafka import KafkaConsumer

from kafka import TopicPartition

consumer = KafkaConsumer('MY_TOPIC1',

                         bootstrap_servers=['127.0.0.1:9092'],

                         #auto_offset_reset='',

                         auto_offset_reset='latest',# 消費kafka中最近的資料,如果設定為earliest則消費最早的資料,不管這些資料是否消費

                         enable_auto_commit=True, # 自動送出消費者的offset

                         auto_commit_interval_ms=3000, ## 自動送出消費者offset的時間間隔

                         group_id='MY_GROUP1',

                         consumer_timeout_ms= 10000, # 如果10秒内kafka中沒有可供消費的資料,自動退出

                         client_id='consumer-python3'

                         )

for msg in consumer:

    print (msg)

    print('topic: ', msg.topic)

    print('partition: ', msg.partition)

    print('key: ', msg.key, 'value: ', msg.value)

    print('offset:', msg.offset)

    print('headers:', msg.headers)

# Get consumer metrics

metrics = consumer.metrics()

運作效果

通過assign、subscribe兩者之一為消費者設定消費的主題

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                         auto_offset_reset='latest',

                         enable_auto_commit=True, # 自動送出消費資料的offset

                         consumer_timeout_ms= 10000, # 如果1秒内kafka中沒有可供消費的資料,自動退出

                         value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費json 格式的消息

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

consumer.subscribe('MY_TOPIC1')

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可選,設定需要訂閱的topic,如果未設定,需要在消費記錄前調用subscribe或者assign。

client_id (str) – 用戶端名稱,預設值: ‘kafka-python-{version}’

group_id (str or None) – 消費組名稱。如果為None,則通過group coordinator auto-partition分區配置設定,offset送出被禁用。預設為None

auto_offset_reset (str) – 重置offset政策: 'earliest'将移動到最老的可用消息, 'latest'将移動到最近消息。 設定為其它任何值将抛出異常。預設值:'latest'。

enable_auto_commit (bool) –  如果為True,将自動定時送出消費者offset。預設為True。

auto_commit_interval_ms (int) – 自動送出offset之間的間隔毫秒數。如果enable_auto_commit 為true,預設值為: 5000。

value_deserializer(可調用對象) - 攜帶原始消息value并傳回反序列化後的value

subscribe(topics=(), pattern=None, listener=None)

訂閱需要的主題

topics (list) – 需要訂閱的主題清單

pattern (str) – 用于比對可用主題的模式,即正規表達式。注意:必須提供topics、pattern兩者參數之一,但不能同時提供兩者。

擷取消費者性能名額。

參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

from kafka.client import KafkaClient

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

# 擷取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

    print('broker: ', broker)

    print('broker nodeId: ', broker.nodeId)

# 擷取主題的所有分區

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

運作結果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

class kafka.client.KafkaClient(**configs)

request_timeout_ms (int) – 用戶端請求逾時時間,機關毫秒。預設值: 30000.

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

brokers()

擷取所有broker中繼資料

available_partitions_for_topic(topic)

傳回主題的所有分區

參考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

繼續閱讀