天天看点

php连接kafka

1、首先安装kafka扩展

2、生产者代码示例

  $rcf = new RdKafka\Conf();

    $rcf->set('group.id', 'test');  //topicname

    $cf = new RdKafka\TopicConf();

    $cf->set('offset.store.method', 'broker');

    $cf->set('auto.offset.reset', 'smallest');

    $rk = new RdKafka\Producer($rcf);

    $rk->setLogLevel(LOG_DEBUG);

    $rk->addBrokers("127.0.0.1"); //brokeraddr

    $topic = $rk->newTopic("test", $cf);  //topicname

    for($i = 0; $i < 10; $i++) {

       $topic->produce(0,0,'test' . $i);

     }

3、消费者代码示例

    $rcf = new RdKafka\Conf();

    $rcf->set('group.id', 'test');

    $rcf->set('broker.version.fallback', '0.8.2');  //brokername,kafkaversion

    $cf->set('auto.commit.enable', true);

    $rk = new RdKafka\Consumer($rcf);

    $topic = $rk->newTopic("test", $cf);  //topicname,topicobject

    $topic->consumeStart(0,10);  //partition,offset

    $msg = $topic->consume(0, 1000);   //partition,timeout

    var_dump($msg);

本文转自 无心低语 51CTO博客,原文链接:http://blog.51cto.com/fengzhankui/1933340,如需转载请自行联系原作者