天天看點

php連接配接kafka

1、首先安裝kafka擴充

2、生産者代碼示例

  $rcf = new RdKafka\Conf();

    $rcf->set('group.id', 'test');  //topicname

    $cf = new RdKafka\TopicConf();

    $cf->set('offset.store.method', 'broker');

    $cf->set('auto.offset.reset', 'smallest');

    $rk = new RdKafka\Producer($rcf);

    $rk->setLogLevel(LOG_DEBUG);

    $rk->addBrokers("127.0.0.1"); //brokeraddr

    $topic = $rk->newTopic("test", $cf);  //topicname

    for($i = 0; $i < 10; $i++) {

       $topic->produce(0,0,'test' . $i);

     }

3、消費者代碼示例

    $rcf = new RdKafka\Conf();

    $rcf->set('group.id', 'test');

    $rcf->set('broker.version.fallback', '0.8.2');  //brokername,kafkaversion

    $cf->set('auto.commit.enable', true);

    $rk = new RdKafka\Consumer($rcf);

    $topic = $rk->newTopic("test", $cf);  //topicname,topicobject

    $topic->consumeStart(0,10);  //partition,offset

    $msg = $topic->consume(0, 1000);   //partition,timeout

    var_dump($msg);

本文轉自 無心低語 51CTO部落格,原文連結:http://blog.51cto.com/fengzhankui/1933340,如需轉載請自行聯系原作者