天天看點

pykafka性能比較

最近做了一下pykafka的性能測試,主要涉及到

use_greenlets

use_rdkafka

sync

這三個參數。

1. 測試的資料

我用一個770MB的日志檔案來作為測試資料,檔案包含的行數為10175702 行。

2. 測試的demo

在寫測試demo的時候遇到了幾個問題,别看這麼簡單、很短的代碼卻也遇到了幾個”棘手”的問題。

#!env python
#coding=utf-8
# 
# Author:       [email protected]
# 
# Created Time: 2017年05月10日 星期三 21時58分38秒
# 
# FileName:     test_pykafka.py
# 
# Description:  
# 
# ChangeLog:

import time
import pykafka
import traceback

if __name__ == '__main__':
    global producer
    client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
    producer = client.topics['test_pykafka_out'].get_producer()
    fp = open('/home/lxg/logs/access.log')
    i = 
    for line in fp:
        producer.produce(line)
           

首先這個demo就是要測試預設參數的情況下pykafka的發送性能,我用這個demo跑了幾遍測試資料,其中出現了幾次

ReferenceError: weakly-referenced object no longer exists

錯誤,而且都是在資料全部已經發送完程式要退出的時候,根據error從google裡面搜出來的一個pykafka的issue ReferenceError: weakly-referenced object no longer exists #422,簡單點說就是我們因為使用的是異步發送資料,pykafka内部會有一個隊列緩存我們發送的資料,但是我們的程式提前退出而緩存隊列裡面還有未完全發送完的資料,這樣就導緻了這個error。修改的方法其實很簡單,就是在退出之前調用

producer.stop()

,其實我在正式的代碼中是有這麼一行的,隻是在寫demo的時候沒有意識到這句話的重要性。

其實這個問題我覺得應該也算是一個比較重要的一類問題,就是我們在使用一些api庫的時候一定要先仔細的閱讀api的文檔,别看你隻要用到api中很簡單的幾個接口,但是很有可能你就會引入一個隐藏很深的嚴重問題。如果你實在沒空去仔細閱讀api文檔,那麼在你動手之前一定要先看看api提供的demo代碼并且按照demo的代碼流程來組織你自己的代碼,因為demo代碼是這個api庫提供的最标準的代碼。

修改了上面的這個問題,然後接着跑用

use_rdkafka = True

參數的用例(要使用

use_rdkafka = True

必須先安裝librdkafka這個庫),這個時候又出現了另外的一個問題,程式出現了

pykafka.exceptions.ProducerQueueFullError

錯誤。

看到這個錯誤的時候我首先想到的就是

block_on_queue_full

參數,但是這個參數預設就是

True

,也就是說在發送緩存隊列滿了的時候producer是等待而不是抛出異常,為了萬無一失我在代碼中還是明确設定

block_on_queue_full = True

,結果還是會抛出異常。

貌似是這個參數在rdkafka中無效,在

pykafka/rdkafka/producer.py

中的

_mk_rdkafka_config_lists()

函數中可以證明我們的猜想,這個函數是形成librdkafka的配置參數,在這個函數中沒有

block_on_queue_full

參數,也就是說即使我們在producer中設定

block_on_queue_full

也會不生效。在librdkafka producer-api中有提到:

rd_kafka_produce() is a non-blocking API, it will enqueue the message on an internal queue and return immediately. If the number of queued messages would exceed the “queue.buffering.max.messages” configuration property then rd_kafka_produce() returns -1 and sets errno to ENOBUFS, thus providing a backpressure mechanism.

結合源碼

_rd_kafkamodule.c

:

Py_BEGIN_ALLOW_THREADS
        res = rd_kafka_produce(self->rdk_topic_handle,
                               p_id,
                               ,  /* ie don't copy and don't dealloc v */
                               v, v_len,
                               pk, pk_len,
                               (void *)message);
    Py_END_ALLOW_THREADS
    if (res == -) {
        rd_kafka_resp_err_t err = rd_kafka_errno2err(errno);
        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
            set_pykafka_error("ProducerQueueFullError", "");
            goto failed;
        } else {
            /* Any other errors should go through the report queue,
             * because that's where pykafka.Producer would put them */
            PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle);
            if (- == Producer_delivery_report_put(put_func, message, err)) {
                goto failed;
            }
        }
        Py_DECREF(message);  /* There won't be a delivery-callback */
    }
           

rd_kafka_produce

傳回-1的時候會抛出

ProducerQueueFullError

異常,也就是說我們沒有辦法不讓

ProducerQueueFullError

異常抛出,我們隻能捕獲異常,然後重新發送消息,修改後的demo代碼如下:

#!env python
#coding=utf-8
# 
# Author:       [email protected]
# 
# Created Time: 2017年05月10日 星期三 21時58分38秒
# 
# FileName:     test_pykafka.py
# 
# Description:  
# 
# ChangeLog:

import time
import pykafka
import traceback

if __name__ == '__main__':
    global producer
    client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
    producer = client.topics['test_pykafka_out'].get_producer()
    fp = open('/home/lxg/logs/access.log')
    i = 
    for line in fp:
        while i < :
            try:
                producer.produce(line)
                break
            except pykafka.exceptions.ProducerQueueFullError,e:
                time.sleep()
                i += 

        i = 
    producer.stop()
           

3.測試結果

a). 如果設定了producer的

sync = True

來同步發送資料,那麼最後的時間超過了30min,因為運作時間太長是以我沒有等程式運作完就Ctrl+C結束了任務,因為完全不能忍受。

b). 如果設定KafkaClient的

use_greenlets=True

,并且安裝greenlets,最後運作的時間如下(多次結果基本一緻):

第一次運作的結果

real 5m18.654s

user 4m21.136s

sys 0m1.412s

第二次運作的結果

real 5m20.063s

user 4m22.368s

sys 0m1.228s

c). 如果設定producer的

use_rdkafka = True

,并且在安裝pykafka之前已經安裝了librdkafka,最後運作的時間如下:

第一次運作的結果

real 2m11.423s

user 1m57.708s

sys 0m31.984s

第二次運作的結果

real 2m10.071s

user 2m0.128s

d). 如果采用預設的參數,也就是用threading的方式,運作的時間如下:

第一次運作的結果

real 5m49.165s

user 5m12.560s

sys 0m32.904s

第二次運作的結果

real 5m48.189s

user 5m15.112s

sys 0m31.220s

4.結論

雖然用greenlets能提高pykafka的效率,但是提升有限,但是如果使用rdkafka的話效率有成倍的提升。

PS:pykafka的consumer就隻有simpleconsumer能用rdkafka,balanced_consumer就不支援rdkafka了。