天天看点

pykafka性能比较

最近做了一下pykafka的性能测试,主要涉及到

use_greenlets

use_rdkafka

sync

这三个参数。

1. 测试的数据

我用一个770MB的日志文件来作为测试数据,文件包含的行数为10175702 行。

2. 测试的demo

在写测试demo的时候遇到了几个问题,别看这么简单、很短的代码却也遇到了几个”棘手”的问题。

#!env python
#coding=utf-8
# 
# Author:       [email protected]
# 
# Created Time: 2017年05月10日 星期三 21时58分38秒
# 
# FileName:     test_pykafka.py
# 
# Description:  
# 
# ChangeLog:

import time
import pykafka
import traceback

if __name__ == '__main__':
    global producer
    client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
    producer = client.topics['test_pykafka_out'].get_producer()
    fp = open('/home/lxg/logs/access.log')
    i = 
    for line in fp:
        producer.produce(line)
           

首先这个demo就是要测试默认参数的情况下pykafka的发送性能,我用这个demo跑了几遍测试数据,其中出现了几次

ReferenceError: weakly-referenced object no longer exists

错误,而且都是在数据全部已经发送完程序要退出的时候,根据error从google里面搜出来的一个pykafka的issue ReferenceError: weakly-referenced object no longer exists #422,简单点说就是我们因为使用的是异步发送数据,pykafka内部会有一个队列缓存我们发送的数据,但是我们的程序提前退出而缓存队列里面还有未完全发送完的数据,这样就导致了这个error。修改的方法其实很简单,就是在退出之前调用

producer.stop()

,其实我在正式的代码中是有这么一行的,只是在写demo的时候没有意识到这句话的重要性。

其实这个问题我觉得应该也算是一个比较重要的一类问题,就是我们在使用一些api库的时候一定要先仔细的阅读api的文档,别看你只要用到api中很简单的几个接口,但是很有可能你就会引入一个隐藏很深的严重问题。如果你实在没空去仔细阅读api文档,那么在你动手之前一定要先看看api提供的demo代码并且按照demo的代码流程来组织你自己的代码,因为demo代码是这个api库提供的最标准的代码。

修改了上面的这个问题,然后接着跑用

use_rdkafka = True

参数的用例(要使用

use_rdkafka = True

必须先安装librdkafka这个库),这个时候又出现了另外的一个问题,程序出现了

pykafka.exceptions.ProducerQueueFullError

错误。

看到这个错误的时候我首先想到的就是

block_on_queue_full

参数,但是这个参数默认就是

True

,也就是说在发送缓存队列满了的时候producer是等待而不是抛出异常,为了万无一失我在代码中还是明确设定

block_on_queue_full = True

,结果还是会抛出异常。

貌似是这个参数在rdkafka中无效,在

pykafka/rdkafka/producer.py

中的

_mk_rdkafka_config_lists()

函数中可以证实我们的猜想,这个函数是形成librdkafka的配置参数,在这个函数中没有

block_on_queue_full

参数,也就是说即使我们在producer中设置

block_on_queue_full

也会不生效。在librdkafka producer-api中有提到:

rd_kafka_produce() is a non-blocking API, it will enqueue the message on an internal queue and return immediately. If the number of queued messages would exceed the “queue.buffering.max.messages” configuration property then rd_kafka_produce() returns -1 and sets errno to ENOBUFS, thus providing a backpressure mechanism.

结合源码

_rd_kafkamodule.c

:

Py_BEGIN_ALLOW_THREADS
        res = rd_kafka_produce(self->rdk_topic_handle,
                               p_id,
                               ,  /* ie don't copy and don't dealloc v */
                               v, v_len,
                               pk, pk_len,
                               (void *)message);
    Py_END_ALLOW_THREADS
    if (res == -) {
        rd_kafka_resp_err_t err = rd_kafka_errno2err(errno);
        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
            set_pykafka_error("ProducerQueueFullError", "");
            goto failed;
        } else {
            /* Any other errors should go through the report queue,
             * because that's where pykafka.Producer would put them */
            PyObject *put_func = (PyObject *)rd_kafka_opaque(self->rdk_handle);
            if (- == Producer_delivery_report_put(put_func, message, err)) {
                goto failed;
            }
        }
        Py_DECREF(message);  /* There won't be a delivery-callback */
    }
           

rd_kafka_produce

返回-1的时候会抛出

ProducerQueueFullError

异常,也就是说我们没有办法不让

ProducerQueueFullError

异常抛出,我们只能捕获异常,然后重新发送消息,修改后的demo代码如下:

#!env python
#coding=utf-8
# 
# Author:       [email protected]
# 
# Created Time: 2017年05月10日 星期三 21时58分38秒
# 
# FileName:     test_pykafka.py
# 
# Description:  
# 
# ChangeLog:

import time
import pykafka
import traceback

if __name__ == '__main__':
    global producer
    client = pykafka.KafkaClient(hosts = 'xx.xx.xx.xx:9092')
    producer = client.topics['test_pykafka_out'].get_producer()
    fp = open('/home/lxg/logs/access.log')
    i = 
    for line in fp:
        while i < :
            try:
                producer.produce(line)
                break
            except pykafka.exceptions.ProducerQueueFullError,e:
                time.sleep()
                i += 

        i = 
    producer.stop()
           

3.测试结果

a). 如果设置了producer的

sync = True

来同步发送数据,那么最后的时间超过了30min,因为运行时间太长所以我没有等程序运行完就Ctrl+C结束了任务,因为完全不能忍受。

b). 如果设置KafkaClient的

use_greenlets=True

,并且安装greenlets,最后运行的时间如下(多次结果基本一致):

第一次运行的结果

real 5m18.654s

user 4m21.136s

sys 0m1.412s

第二次运行的结果

real 5m20.063s

user 4m22.368s

sys 0m1.228s

c). 如果设置producer的

use_rdkafka = True

,并且在安装pykafka之前已经安装了librdkafka,最后运行的时间如下:

第一次运行的结果

real 2m11.423s

user 1m57.708s

sys 0m31.984s

第二次运行的结果

real 2m10.071s

user 2m0.128s

d). 如果采用默认的参数,也就是用threading的方式,运行的时间如下:

第一次运行的结果

real 5m49.165s

user 5m12.560s

sys 0m32.904s

第二次运行的结果

real 5m48.189s

user 5m15.112s

sys 0m31.220s

4.结论

虽然用greenlets能提高pykafka的效率,但是提升有限,但是如果使用rdkafka的话效率有成倍的提升。

PS:pykafka的consumer就只有simpleconsumer能用rdkafka,balanced_consumer就不支持rdkafka了。