消息伺服器使用socket,為避免伺服器過載,單台隻允許500個socket連接配接,當一台不夠的時候,擴充消息伺服器是必然,問題來了,如何讓連結在不同消息伺服器上的使用者可以實作消息發送呢?
要實作消息互通就必須要讓這些消息伺服器本身能互通,想了兩個方式,一種是消息伺服器之間交叉連結,另一種是增加一個特殊的消息伺服器,這個消息伺服器不對外開放,隻負責消息轉發和推送。
下列測試不考慮防火牆等。僅測試可行性和效率。
測試環境
消息伺服器
ip位址:192.168.0.201 端口:9501
ip位址:192.168.0.202 端口:9501
轉發伺服器
ip位址:192.168.0.203 端口:9501
公共緩存(redis)
ip位址:192.168.0.231 端口:6379
軟體環境
centos 6.5 mini swoole php
流程說明
- client1(伺服器1 - 使用者user)可向client2(伺服器2 - 使用者user)或者其他client(伺服器 - 使用者user)發送消息,并接收其他client(伺服器 - 使用者user)發送的消息。
- Redis中儲存client(伺服器 - 使用者user)連接配接的資訊,給每個使用者配置設定唯一的key,包括連結的哪台伺服器,轉發伺服器定時檢測消息伺服器,如消息伺服器挂掉,由轉發伺服器清理掉Redis已經挂掉的所有連結。
完整流程
- Client1給Client2發送一條消息
- Socket1接收到消息,根據key從Redis取出Client2的連接配接資訊,連接配接在本機,直接推送給Client2,流程結束。
- 如果連接配接不在本機,把消息推送到轉發伺服器,由轉發伺服器把該消息推送給連接配接所在消息伺服器,消息伺服器接收消息,推送給Client2。
- 消息發送結束。
程式設計實作
socket1
伺服器上建立一個server.php,内容如下:
<?php //服務端
$serv = new swoole_server("0.0.0.0", 9501);
//redis
$redis = new \Redis();
$redis->connect("192.168.0.231", 6379);
//client
$proxy = new swoole_client(SWOOLE_TCP | SWOOLE_KEEP);
$proxy->connect("192.168.0.203", 9501);
$serv->on('start', function($serv) {
echo "Service:Start...";
});
$serv->on('connect', function ($serv, $fd) {
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
global $redis;
$data = (array) json_decode($data);
$cmd = $data['cmd'];
switch ($cmd) {
case "login"://登陸
//儲存連接配接資訊
$save = array(
'fd' => $fd,
'socket_ip' => "192.168.0.201"
);
$redis->set($data['name'], serialize($save));
break;
case "chat":
$recv = unserialize($redis->get($data['recv']));
if ($recv['socket_ip'] != "192.168.0.201") {
//需要轉發
$data['cmd'] = 'forward';
$data['recv_ip'] = $recv['socket_ip'];
$serv->task(json_encode($data));
} else {
//直接發送
$serv->send($recv['fd'], "{$data['send']}給您發了消息:{$data['content']}");
}
break;
case "forward":
//接收轉發消息
$recv = unserialize($redis->get($data['recv']));
$serv->send($recv['fd'], "{$data['send']}給您發了消息:{$data['content']}");
break;
}
//$serv->send($fd, 'Swoole: ' . $data);
});
//任務推送
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
global $proxy;
$proxy->send($data);
});
$serv->on('finish', function ($serv, $task_id, $data) {
});
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
$serv->set(array('task_worker_num' => 4));
$serv->start();
Socket2
上隻需把ip變更一下即可。192.168.0.201變更為192.168.0.202
【主要制分發伺服器】Proxy
在轉發伺服器上建立腳本proxy.php,内容如下:
$serv = new swoole_server("0.0.0.0", 9501); //服務端
$serv->on('start', function($serv) {
echo "Service:Start...";
});
$serv->on('connect', function ($serv, $fd) {
});
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
global $redis;
$serv->task($data);
});
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
$forward = (array) json_decode($data);
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect($forward['recv_ip'], 9501);
unset($forward['recv_ip']);
$client->send(json_encode($forward));
$client->close();
});
$serv->on('finish', function ($serv, $task_id, $data) {
});
$serv->on('close', function ($serv, $fd) {
echo "Client: Close.\n";
});
$serv->set(array('task_worker_num' => 4));
$serv->start();
測試
注意開啟順序
- 開啟轉發伺服器php proxy.php
- 分别開啟socket伺服器php server.php
-
開始測試,分别打開兩個telnet,連接配接兩個消息伺服器,發送消息測試:
登陸
可以在轉發伺服器上看到兩個消息伺服器已經連接配接
基于強大的swoole擴充,讓php高效的實作這些成為可能,目前消息伺服器到轉發伺服器是長連接配接,轉發伺服器到消息伺服器是短連接配接,存在性能瓶頸,也浪費了連接配接資源。下一步改造成長連接配接,消息伺服器的client使用異步。
redis sub/pub 模型
原理
使用redis釋出訂閱模型實作中間件消息通信。
改進版
swoole + aliyun - 消息隊列 RocketMQ 版
模型圖
原文連結