天天看點

Kafka簡介及使用PHP處理Kafka消息

Kafka簡介及使用PHP處理Kafka消息 Kafka 是一種高吞吐的分布式消息系統,能夠替代傳統的消息隊列用于解耦合資料處理,緩存未處理消息等,同時具有更高的吞吐率,支援分區、多副本、備援,是以被廣泛用于大規模消息資料處理應用。

Kafka的特點:

以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上資料也能保證常數時間複雜度的通路性能。

高吞吐率。即使在非常廉價的商用機器上也能做到單機支援每秒100K條以上消息的傳輸。【據了解,Kafka每秒可以生産約25萬消息(50 MB),每秒處理55萬消息(110 MB)】

支援Kafka Server間的消息分區,同時保證每個Partition内的消息順序傳輸。

分布式系統,易于向外擴充。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴充機器。

消息被處理的狀态是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。

同時支援離線資料處理和實時資料處理。

Kafka的架構:

Kafka簡介及使用PHP處理Kafka消息

kafka架構圖

Kafka的整體架構非常簡單,producer、broker(kafka)和consumer都可以有多個。Producer,consumer實作Kafka注冊的接口,資料從producer發送到broker,broker承擔一個中間緩存和分發的作用。broker分發注冊到系統中的consumer。broker的作用類似于緩存,即活躍的資料和離線處理系統之間的緩存。用戶端和伺服器端的通信,是基于簡單,高性能,且與程式設計語言無關的TCP協定。

Kafka基本概念:

Topic:特指Kafka處理的消息源(feeds of messages)的不同分類。

Partition:Topic實體上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。

Message:消息,是通信的基本機關,每個producer可以向一個topic(主題)釋出一些消息。

Producers:消息和資料生産者,向Kafka的一個topic釋出消息的過程叫做producers。

Consumers:消息和資料消費者,訂閱topics并處理其釋出的消息的過程叫做consumers。

Broker:緩存代理,Kafa叢集中的一台或多台伺服器統稱為broker。

Kafka消息發送的流程:

Kafka消息發送

下面是PHP生産、消費Kafka消息的例子(假設已經配置好Kafka):

1.從zookeeper源碼src/c/src安裝zookeeper c client

cd zookeeper-3.4.8/src/c

./configure

make && make install

2.編譯php libzookper擴充

git clone https://github.com/Timandes/libzookeeper.git

cd libzookeeper

phpize

./configure--with-libzookeeper=/usr/local/bin/cli_mt

make && makeinstall

3.編譯php zookeeper擴充

git clone https://github.com/andreiz/php-zookeeper.git

cd php-zookeeper

make && make install

4.修改php.ini配置,添加libzookeeper和php-zookeeper擴充

extension=libzookeeper.so

extension=zookeeper.so

PHP處理Kafka消息:

1.啟動zookeeper和kafka

./bin/zookeeper-server-start.sh config/zookeeper.properties

./bin/kafka-server-start.sh config/server.properties

2.建立由2個partition組成的、名為testtopic的topic

3.composer安裝nmred/kafka-php

1

composer require "nmred/kafka-php"

4.producer.php代碼

<php 

require_once('./vendor/autoload.php'); 

$produce=/Kafka/Produce::getInstance('localhost:2181',3000); 

$produce->setRequireAck(-1); $topicName='testtopic';

//擷取到topic下可用的partitions

$partitions=$produce->getAvailablePartitions($topicName);

$partitionCount=count($partitions); 

$count=1;//可以處理的消費者數量(可以了解為server數量)

while(true){    $message=json_encode(array('uid'=>$count,'age'=>$count%100,'datetime'=>date('Y-m-d H:i:s')));     

//發送消息到不同的partition   

 $partitionId=$count%$partitionCount;    

$produce->setMessages('testtopic',$partitionId,array($message));   

 $result=$produce->send();    

var_dump($result);     

$count++;   

 echo"producer sleeping/n";   

 sleep(1);

}

5、consumer.php代碼

<?php 

//擷取需要處理的partitionId

$partitionId = isset($argv[1]) ? intval($argv[1]) :0; 

$consumer =/Kafka/Consumer::getInstance('localhost:2181'); 

$consumer->setGroup('test-consumer-group');

$consumer->setPartition('testtopic', $partitionId);

$consumer->setFromOffset(true);

$consumer->setMaxBytes(102400); 

while(true){    

$topic = $consumer->fetch();     

foreach ($topic as $topicName => $partition{        

foreach ($partition as $partId => $messageSet{            

foreach ($messageSet as $message){                

var_dump($message);           

}        

}    

echo"consumer sleeping/n";   

sleep(1);

6、在3個終端界面分别運作

php producer.php

php consumer.php

7、兩個consumer腳本依次收到producer發送的消息

Kafka簡介及使用PHP處理Kafka消息