天天看點

swoole TCP 叢集 MQTT消息通信伺服器

消息伺服器使用socket,為避免伺服器過載,單台隻允許500個socket連接配接,當一台不夠的時候,擴充消息伺服器是必然,問題來了,如何讓連結在不同消息伺服器上的使用者可以實作消息發送呢?

要實作消息互通就必須要讓這些消息伺服器本身能互通,想了兩個方式,一種是消息伺服器之間交叉連結,另一種是增加一個特殊的消息伺服器,這個消息伺服器不對外開放,隻負責消息轉發和推送。

下列測試不考慮防火牆等。僅測試可行性和效率。

測試環境

消息伺服器

ip位址:192.168.0.201 端口:9501 
ip位址:192.168.0.202 端口:9501
           

轉發伺服器

ip位址:192.168.0.203  端口:9501
           

公共緩存(redis)

ip位址:192.168.0.231  端口:6379
           

軟體環境

centos 6.5 mini swoole php
           

流程說明

  • client1(伺服器1 - 使用者user)可向client2(伺服器2 - 使用者user)或者其他client(伺服器 - 使用者user)發送消息,并接收其他client(伺服器 - 使用者user)發送的消息。
  • Redis中儲存client(伺服器 - 使用者user)連接配接的資訊,給每個使用者配置設定唯一的key,包括連結的哪台伺服器,轉發伺服器定時檢測消息伺服器,如消息伺服器挂掉,由轉發伺服器清理掉Redis已經挂掉的所有連結。

完整流程

  1. Client1給Client2發送一條消息
  2. Socket1接收到消息,根據key從Redis取出Client2的連接配接資訊,連接配接在本機,直接推送給Client2,流程結束。
  3. 如果連接配接不在本機,把消息推送到轉發伺服器,由轉發伺服器把該消息推送給連接配接所在消息伺服器,消息伺服器接收消息,推送給Client2。
  4. 消息發送結束。

程式設計實作

socket1

伺服器上建立一個server.php,内容如下:

<?php //服務端 
   $serv = new swoole_server("0.0.0.0", 9501);
 
  //redis
  $redis = new \Redis();        
  $redis->connect("192.168.0.231", 6379);
  
  //client
  $proxy = new swoole_client(SWOOLE_TCP | SWOOLE_KEEP);
  $proxy->connect("192.168.0.203", 9501);
  
  $serv->on('start', function($serv) {
      echo "Service:Start...";
  });
  $serv->on('connect', function ($serv, $fd) {
  
  });
  $serv->on('receive', function ($serv, $fd, $from_id, $data) {
      global $redis;
  
      $data = (array) json_decode($data);
      $cmd = $data['cmd'];
 
     switch ($cmd) {
 
          case "login"://登陸
              //儲存連接配接資訊
              $save = array(
                  'fd' => $fd,
                  'socket_ip' => "192.168.0.201"
              );
              $redis->set($data['name'], serialize($save));
              break;
 
          case "chat":
              $recv = unserialize($redis->get($data['recv']));
              if ($recv['socket_ip'] != "192.168.0.201") {
                  //需要轉發
                  $data['cmd'] = 'forward';
                  $data['recv_ip'] = $recv['socket_ip'];
                  $serv->task(json_encode($data));
              } else {
                  //直接發送
                  $serv->send($recv['fd'], "{$data['send']}給您發了消息:{$data['content']}");
              }
          break;
 
          case "forward":
              //接收轉發消息
              $recv = unserialize($redis->get($data['recv']));
              $serv->send($recv['fd'], "{$data['send']}給您發了消息:{$data['content']}");
 
          break;
      }
      //$serv->send($fd, 'Swoole: ' . $data);
  });
  
  //任務推送
  $serv->on('task', function ($serv, $task_id, $from_id, $data) {
      global $proxy;
      $proxy->send($data);
  });
 
  $serv->on('finish', function ($serv, $task_id, $data) {
 
  });
  $serv->on('close', function ($serv, $fd) {
      echo "Client: Close.\n";
  });
 
  $serv->set(array('task_worker_num' => 4));
 
  $serv->start();
           

Socket2

上隻需把ip變更一下即可。192.168.0.201變更為192.168.0.202

【主要制分發伺服器】Proxy

在轉發伺服器上建立腳本proxy.php,内容如下:

$serv = new swoole_server("0.0.0.0", 9501); //服務端
 $serv->on('start', function($serv) {
     echo "Service:Start...";
 });
 $serv->on('connect', function ($serv, $fd) {
 
 });
 $serv->on('receive', function ($serv, $fd, $from_id, $data) {
     global $redis;
     $serv->task($data);
 });
 $serv->on('task', function ($serv, $task_id, $from_id, $data) {
     $forward = (array) json_decode($data);
     $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
 
     $client->connect($forward['recv_ip'], 9501);
     unset($forward['recv_ip']);
     $client->send(json_encode($forward));
     $client->close();
 });
 
 $serv->on('finish', function ($serv, $task_id, $data) {
 
 });
 $serv->on('close', function ($serv, $fd) {
     echo "Client: Close.\n";
 });
 
 $serv->set(array('task_worker_num' => 4));
 
 $serv->start();
           

測試

注意開啟順序

  1. 開啟轉發伺服器php proxy.php
  2. 分别開啟socket伺服器php server.php
  3. 開始測試,分别打開兩個telnet,連接配接兩個消息伺服器,發送消息測試:

    登陸

可以在轉發伺服器上看到兩個消息伺服器已經連接配接

基于強大的swoole擴充,讓php高效的實作這些成為可能,目前消息伺服器到轉發伺服器是長連接配接,轉發伺服器到消息伺服器是短連接配接,存在性能瓶頸,也浪費了連接配接資源。下一步改造成長連接配接,消息伺服器的client使用異步。

redis sub/pub 模型

原理

使用redis釋出訂閱模型實作中間件消息通信。

改進版

swoole + aliyun - 消息隊列 RocketMQ 版

模型圖

swoole TCP 叢集 MQTT消息通信伺服器

原文連結