1、首先安裝kafka擴充
2、生産者代碼示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for($i = 0; $i < 10; $i++) {
$topic->produce(0,0,'test' . $i);
}
3、消費者代碼示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0,10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);
本文轉自 無心低語 51CTO部落格,原文連結:http://blog.51cto.com/fengzhankui/1933340,如需轉載請自行聯系原作者