最近做了一下pykafka的性能測試,主要涉及到
use_greenlets
、
use_rdkafka
、
sync
這三個參數。
1. 測試的資料
我用一個770MB的日志檔案來作為測試資料,檔案包含的行數為10175702 行。
2. 測試的demo
在寫測試demo的時候遇到了幾個問題,别看這麼簡單、很短的代碼卻也遇到了幾個”棘手”的問題。
#!env python
#coding=utf-8
#
# Author: [email protected]
#
# Created Time: 2017年05月10日 星期三 21時58分38秒
#
# FileName: test_pykafka.py
#
# Description:
#
# ChangeLog:
import time
import pykafka
import traceback
if __name__ == '__main__':
global producer
client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
producer = client.topics['test_pykafka_out'].get_producer()
fp = open('/home/lxg/logs/access.log')
i =
for line in fp:
producer.produce(line)
首先這個demo就是要測試預設參數的情況下pykafka的發送性能,我用這個demo跑了幾遍測試資料,其中出現了幾次
ReferenceError: weakly-referenced object no longer exists
錯誤,而且都是在資料全部已經發送完程式要退出的時候,根據error從google裡面搜出來的一個pykafka的issue ReferenceError: weakly-referenced object no longer exists #422,簡單點說就是我們因為使用的是異步發送資料,pykafka内部會有一個隊列緩存我們發送的資料,但是我們的程式提前退出而緩存隊列裡面還有未完全發送完的資料,這樣就導緻了這個error。修改的方法其實很簡單,就是在退出之前調用
producer.stop()
,其實我在正式的代碼中是有這麼一行的,隻是在寫demo的時候沒有意識到這句話的重要性。
其實這個問題我覺得應該也算是一個比較重要的一類問題,就是我們在使用一些api庫的時候一定要先仔細的閱讀api的文檔,别看你隻要用到api中很簡單的幾個接口,但是很有可能你就會引入一個隐藏很深的嚴重問題。如果你實在沒空去仔細閱讀api文檔,那麼在你動手之前一定要先看看api提供的demo代碼并且按照demo的代碼流程來組織你自己的代碼,因為demo代碼是這個api庫提供的最标準的代碼。
修改了上面的這個問題,然後接着跑用
use_rdkafka = True
參數的用例(要使用
use_rdkafka = True
必須先安裝librdkafka這個庫),這個時候又出現了另外的一個問題,程式出現了
pykafka.exceptions.ProducerQueueFullError
錯誤。
看到這個錯誤的時候我首先想到的就是
block_on_queue_full
參數,但是這個參數預設就是
True
,也就是說在發送緩存隊列滿了的時候producer是等待而不是抛出異常,為了萬無一失我在代碼中還是明确設定
block_on_queue_full = True
,結果還是會抛出異常。
貌似是這個參數在rdkafka中無效,在
pykafka/rdkafka/producer.py
中的
_mk_rdkafka_config_lists()
函數中可以證明我們的猜想,這個函數是形成librdkafka的配置參數,在這個函數中沒有
block_on_queue_full
參數,也就是說即使我們在producer中設定
block_on_queue_full
也會不生效。在librdkafka producer-api中有提到:
rd_kafka_produce() is a non-blocking API, it will enqueue the message on an internal queue and return immediately. If the number of queued messages would exceed the “queue.buffering.max.messages” configuration property then rd_kafka_produce() returns -1 and sets errno to ENOBUFS, thus providing a backpressure mechanism.
結合源碼
_rd_kafkamodule.c
:
Py_BEGIN_ALLOW_THREADS
res = rd_kafka_produce(self->rdk_topic_handle,
p_id,
, /* ie don't copy and don't dealloc v */
v, v_len,
pk, pk_len,
(void *)message);
Py_END_ALLOW_THREADS
if (res == -) {
rd_kafka_resp_err_t err = rd_kafka_errno2err(errno);
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
set_pykafka_error("ProducerQueueFullError", "");
goto failed;
} else {
/* Any other errors should go through the report queue,
* because that's where pykafka.Producer would put them */
PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle);
if (- == Producer_delivery_report_put(put_func, message, err)) {
goto failed;
}
}
Py_DECREF(message); /* There won't be a delivery-callback */
}
當
rd_kafka_produce
傳回-1的時候會抛出
ProducerQueueFullError
異常,也就是說我們沒有辦法不讓
ProducerQueueFullError
異常抛出,我們隻能捕獲異常,然後重新發送消息,修改後的demo代碼如下:
#!env python
#coding=utf-8
#
# Author: [email protected]
#
# Created Time: 2017年05月10日 星期三 21時58分38秒
#
# FileName: test_pykafka.py
#
# Description:
#
# ChangeLog:
import time
import pykafka
import traceback
if __name__ == '__main__':
global producer
client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
producer = client.topics['test_pykafka_out'].get_producer()
fp = open('/home/lxg/logs/access.log')
i =
for line in fp:
while i < :
try:
producer.produce(line)
break
except pykafka.exceptions.ProducerQueueFullError,e:
time.sleep()
i +=
i =
producer.stop()
3.測試結果
a). 如果設定了producer的
sync = True
來同步發送資料,那麼最後的時間超過了30min,因為運作時間太長是以我沒有等程式運作完就Ctrl+C結束了任務,因為完全不能忍受。
b). 如果設定KafkaClient的
use_greenlets=True
,并且安裝greenlets,最後運作的時間如下(多次結果基本一緻):
第一次運作的結果
real 5m18.654s
user 4m21.136s
sys 0m1.412s
第二次運作的結果
real 5m20.063s
user 4m22.368s
sys 0m1.228s
c). 如果設定producer的
use_rdkafka = True
,并且在安裝pykafka之前已經安裝了librdkafka,最後運作的時間如下:
第一次運作的結果
real 2m11.423s
user 1m57.708s
sys 0m31.984s
第二次運作的結果
real 2m10.071s
user 2m0.128s
d). 如果采用預設的參數,也就是用threading的方式,運作的時間如下:
第一次運作的結果
real 5m49.165s
user 5m12.560s
sys 0m32.904s
第二次運作的結果
real 5m48.189s
user 5m15.112s
sys 0m31.220s
4.結論
雖然用greenlets能提高pykafka的效率,但是提升有限,但是如果使用rdkafka的話效率有成倍的提升。
PS:pykafka的consumer就隻有simpleconsumer能用rdkafka,balanced_consumer就不支援rdkafka了。