Kafka簡介及使用PHP處理Kafka消息 Kafka 是一種高吞吐的分布式消息系統,能夠替代傳統的消息隊列用于解耦合資料處理,緩存未處理消息等,同時具有更高的吞吐率,支援分區、多副本、備援,是以被廣泛用于大規模消息資料處理應用。
Kafka的特點:
以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。
高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上消息的傳輸。【據了解,Kafka每秒可以生産約25萬消息(50 MB),每秒處理55萬消息(110 MB)】
支援Kafka Server間的消息分區,同時保證每個Partition内的消息順序傳輸。
分布式系統,易于向外擴充。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴充機器。
消息被處理的狀态是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。
同時支援離線資料處理和實時資料處理。
Kafka的架構:

kafka架構圖
Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實作Kafka注冊的接口,資料從producer發送到broker,broker承擔一個中間緩存和分發的作用。broker分發注冊到系統中的consumer。broker的作用類似于緩存,即活躍的資料和離線處理系統之間的緩存。用戶端和伺服器端的通信,是基于簡單,高性能,且與程式設計語言無關的TCP協定。
Kafka基本概念:
Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。
Partition:Topic實體上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。
Message:消息,是通信的基本機關,每個producer可以向一個topic(主題)釋出一些消息。
Producers:消息和資料生産者,向Kafka的一個topic釋出消息的過程叫做producers。
Consumers:消息和資料消費者,訂閱topics并處理其釋出的消息的過程叫做consumers。
Broker:緩存代理,Kafa叢集中的一台或多台伺服器統稱為broker。
Kafka消息發送的流程:
Kafka消息發送
下面是PHP生産、消費Kafka消息的例子(假設已經配置好Kafka):
1.從zookeeper源碼src/c/src安裝zookeeper c client
cd zookeeper-3.4.8/src/c
./configure
make && make install
2.編譯php libzookper擴充
git clone https://github.com/Timandes/libzookeeper.git
cd libzookeeper
phpize
./configure--with-libzookeeper=/usr/local/bin/cli_mt
make && makeinstall
3.編譯php zookeeper擴充
git clone https://github.com/andreiz/php-zookeeper.git
cd php-zookeeper
make && make install
4.修改php.ini配置,添加libzookeeper和php-zookeeper擴充
extension=libzookeeper.so
extension=zookeeper.so
PHP處理Kafka消息:
1.啟動zookeeper和kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
2.建立由2個partition組成的、名為testtopic的topic
3.composer安裝nmred/kafka-php
1
composer require "nmred/kafka-php"
4.producer.php代碼
<php
require_once('./vendor/autoload.php');
$produce=/Kafka/Produce::getInstance('localhost:2181',3000);
$produce->setRequireAck(-1); $topicName='testtopic';
//擷取到topic下可用的partitions
$partitions=$produce->getAvailablePartitions($topicName);
$partitionCount=count($partitions);
$count=1;//可以處理的消費者數量(可以了解為server數量)
while(true){ $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));
//發送消息到不同的partition
$partitionId=$count%$partitionCount;
$produce->setMessages('testtopic',$partitionId,array($message));
$result=$produce->send();
var_dump($result);
$count++;
echo"producer sleeping/n";
sleep(1);
}
5、consumer.php代碼
<?php
//擷取需要處理的partitionId
$partitionId = isset($argv[1]) ? intval($argv[1]) :0;
$consumer =/Kafka/Consumer::getInstance('localhost:2181');
$consumer->setGroup('test-consumer-group');
$consumer->setPartition('testtopic', $partitionId);
$consumer->setFromOffset(true);
$consumer->setMaxBytes(102400);
while(true){
$topic = $consumer->fetch();
foreach ($topic as $topicName => $partition{
foreach ($partition as $partId => $messageSet{
foreach ($messageSet as $message){
var_dump($message);
}
}
echo"consumer sleeping/n";
sleep(1);
6、在3個終端界面分别運作
php producer.php
php consumer.php
7、兩個consumer腳本依次收到producer發送的消息