天天看點

基于swoole擴充建立websocket服務【可用于實作即時通訊、線上聊天室、消息推送、彈幕系統、直播系統】

一、思路讲解

服务端的websocket服务类似于现实生活中的邮局。

当客户端连接上服务端时,代表客户把自己的地址告知了邮局。

当客户端发送消息到服务端,则代表客户把信件投递到了邮筒。

若消息接收方在线,则websocket会将消息转发给消息接收方,类似于邮局能根据收件人地址精准投递。若消息接收方不在线,则websocket会在下一次消息接收方连接服务时进行消息推送。在现实生活中就是查无此人的情况下,邮局会在这个人出现的时候,把信件交到这个人手中。

在实际项目中,也会有其他手段来接棒用户在不线的环节,比如短信通知,邮件通知,甚至语音电话通知(钉钉)。

关于功能:

  • 即时通讯、在线聊天室、消息推送、直播系统都是实现点对点或点对面的精准消息推送,一般消息发送和接收方是建立了一定联系,如好友关系,群组关系,上下级关系、粉丝关系。
  • 弹幕系统则是无差别推送,一般消息发送方和接收方只是一种临时性的关系,一般是进行实时推送,即用户只能收到连接到服务后的消息,(因为数据量比较大,一般不会对之前的消息进行推送)。

websocket的原理是核心,掌握了核心就可以根据需求,设计好消息推送时机,消息接收方,实现不同的功能。

消息中转/推送的实现前提:

  1. 唯一性 客户端每连接一次服务端,服务端都会分配一个新的线程来处理两者之间的连接,断开连接,则挂掉对应的线程,这个是我们设计数据表时的一个重要依据(用来识别用户A是用户A,类似session的作用)。
  2. 双向性 连接在服务端的客户端,都可以向服务端发送消息,也可以接收来自服务端的消息。这一点是实现即时通讯、在线聊天室、消息推送的重要手段。

二、数据表设计

swoole线程-用户表

CREATE TABLE `blog_ws_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL DEFAULT '0' COMMENT '调起websocket服务的用户id',
  `f_id` int(11) NOT NULL DEFAULT '0' COMMENT 'swoole线程id',
  `ip` char(15) NOT NULL DEFAULT '0.0.0.0' COMMENT '客户端IP地址',
  `dtime` datetime NOT NULL DEFAULT '2019-01-01 00:00:00' COMMENT '调起时间',
  `time` int(11) NOT NULL DEFAULT '0' COMMENT '调起时间戳',
  `is_online` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否在线,0-下线,1-上线',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=132 DEFAULT CHARSET=utf8mb4;
           

chat聊天消息记录表

CREATE TABLE `blog_chats` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_id` int(11) NOT NULL DEFAULT '0' COMMENT '消息发送方id',
  `to_id` int(11) NOT NULL DEFAULT '0' COMMENT '消息接收方id',
  `group_id` int(11) NOT NULL DEFAULT '0' COMMENT '群组id',
  `content` varchar(255) NOT NULL DEFAULT ' ' COMMENT '消息内容',
  `img` varchar(255) NOT NULL DEFAULT ' ' COMMENT '图片地址',
  `dtime` datetime NOT NULL DEFAULT '2018-01-01 00:00:00' COMMENT '发送时间',
  `time` int(11) NOT NULL DEFAULT '0' COMMENT '反馈/回复时间戳',
  `is_send` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否已发送,0-未发送,1-已发送',
  `is_del` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否删除,0-否,1-删除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=125 DEFAULT CHARSET=utf8 COMMENT='聊天记录表';
           

三、服务端代码:

ws.php:

<?php
namespace app\network\controller;

/**
 * @Filename      :Ws_log.php
 * @CreateTime    :2019 2019/1/1 13:29
 * @Author        :Robin
 * @Description   :
 */
class Ws
{
    public $host = '0.0.0.0';
    public $ip = '127.0.0.1';
    public $port = 8812;
    public $username = 'root';
    public $password = 'root';
    public $database = 'blog';
    public $ws;
    public $db;
    public function __construct()
    {
        $this->ws = new \swoole_websocket_server($this->host, $this->port);
        //面向对象的回调
        $this->ws->on('open',[$this,'onOpen']);
        $this->ws->on('message',[$this,'onMessage']);
        $this->ws->on('close',[$this,'onClose']);
        //开启websocket服务
        $this->ws->start();
    }


    /**
     * @FunctionName        :onOpen
     * @CreateTime          :2019 2019/1/1 13:31
     * @Author              :Robin
     * @Descript            监听ws连接事件
     * @param $ws
     * @param $request
     */
    public function onOpen($ws,$request)
    {
        $f_id = $request->fd;
        $info = $request->server;
        $ip = $info['remote_addr'];
        $request_uri = trim($info['request_uri'],'/');
        $data = explode('/',$request_uri);
        $dtime = date('Y-m-d H:i:s');
        $time = time();
        //连接数据库
        $this->db = mysqli_connect($this->ip,$this->username,$this->password) or die(mysqli_connect_error());
        mysqli_select_db($this->db,$this->database) or die();
        $sql = "insert into blog_ws_log(user_id,f_id,ip,dtime,time,is_online) values($data[0],$f_id,'$ip','$dtime',$time,1)";
        $this->db->query($sql);
        //如果有未读消息,进行消息展示
        $sql = "select count(id),from_id from blog_chats where to_id = $f_id and is_send = 0 group by from_id";
        $result = $this->db->query($sql);
        if($result->num_rows > 0){
            $data = mysqli_fetch_all($result);
            $ws->push($f_id,json_encode($data));
        }
        $this->db->close();
    }

    /**
     * @FunctionName        :onMessage
     * @CreateTime          :2019 2019/1/1 13:32
     * @Author              :Robin
     * @Descript            监听ws消息事件
     * @param $ws
     * @param $frame
     */
    public function onMessage($ws,$frame)
    {
        //$frame->fd;发送消息的线程
        $data = json_decode($frame->data,true);
        //连接数据库
        $this->db = mysqli_connect($this->ip,$this->username,$this->password) or die(mysqli_connect_error());
        mysqli_select_db($this->db,$this->database) or die();
        $content = htmlspecialchars($data['content']);
        if(strpos($content,'data:image') === false){
            $img = '';
        }else{
            //上传图片,另作处理
            $content = '';
            $img = '';
        }
        $dtime = date('Y-m-d H:i:s');
        $time = time();
        $sql = "insert into blog_chats(from_id,to_id,content,img,dtime,time) values($data[from_id],$data[to_id],'$content','$img','$dtime',$time)";
        $message_id = $this->db->query($sql);
        //如果消息接收方在线,向客户端发送消息
        $sql = "select f_id from blog_ws_log where is_online = 1 and user_id = $data[to_id] order by id desc limit 1";
        $res = $this->db->query($sql);
        if($res->num_rows){
            $to_id = mysqli_fetch_assoc($res)['f_id'];
            $push_data = json_encode(['from_id' => $data['from_id'],'content' => htmlspecialchars_decode($content),'img' => $img]);
            $ws->push($to_id,$push_data);
            $sql = "update blog_chats set is_send = 1 where id = $message_id";
            $this->db->query($sql);
        }
        $this->db->close();
    }

    /**
     * @FunctionName        :onClose
     * @CreateTime          :2019 2019/1/1 13:35
     * @Author              :Robin
     * @Descript            监听ws关闭事件
     * @param $ws
     * @param $fd
     */
    public function  onClose($ws,$fd)
    {
        $this->db = mysqli_connect($this->ip,$this->username,$this->password) or die(mysqli_connect_error());
        mysqli_select_db($this->db,$this->database) or die();
        $sql = "update blog_ws_log set is_online = 0 where f_id = $fd";
        $this->db->query($sql);
        $this->db->close();
    }

    public function __destruct()
    {
        $this->db = mysqli_connect($this->ip,$this->username,$this->password) or die(mysqli_connect_error());
        mysqli_select_db($this->db,$this->database) or die();
        $sql = "update blog_ws_log set is_online = 0";
        $this->db->query($sql);
        $this->db->close();
    }
}

$obj = new Ws();
           

四、客户端代码

监听消息推送事件的ws.js

var admin_id = $('#admin_id').val();
var url = "ws://ip:port/";
var ws = new WebSocket(url+admin_id);

/**
 * 监听连接打开事件
 * @param evt
 */
ws.onopen = function(evt){
    var data = JSON.parse(evt.data);
};

/**
 * 监听消息推送事件,当服务端向客户端推送消息时,调起该方法
 * @param evt
 */
ws.onmessage = function(evt){
    var data = JSON.parse(evt.data);
    var png = "/static/network/img/icon01.png";
    var date = new Date();
    var Y = date.getFullYear() + '-';
    var M = (date.getMonth() + 1 < 10 ? '0' + (date.getMonth() + 1) : date.getMonth() + 1) + '-';
    var D = date.getDate() < 10 ? '0' + date.getDate() : date.getDate() + ' ';
    var hh = date.getHours() < 10 ? '0' + date.getHours() : date.getHours() + ':';
    var mm = date.getMinutes() < 10 ? '0' + date.getMinutes() : date.getMinutes() + ':';
    var ss = date.getSeconds() < 10 ? '0' + date.getDate() : date.getSeconds() ;
    var demo = "<div class='clearfloat'><div class='author-name'><small class='chat-date'>" + Y + M + D +' ' + hh + mm + ss +"</small></div><div class='left'><div class='chat-avatars'><img src='" + png + "' alt='头像'></div>";
    if(data.img == ''){
        demo += "<div class='chat-message'>" + data.content + "</div></div></div";
    }else{
        demo += "<div class='chat-message'><img src='" + data.img + "' alt=''></div></div></div>";
    }
    $(".chatBox-content-demo").append(demo);
};

/**
 * 监听连接错误事件
 * @param evt
 */
ws.onerror = function(evt){

};
           

进入聊天列表界面:chat.js

$(".chat-list-people").click(function () {
        var n = $(this).index();
        $(".chatBox-head-one").toggle();
        $(".chatBox-head-two").toggle();
        $(".chatBox-list").fadeToggle();
        $(".chatBox-kuang").fadeToggle();
        $('#to_id').val($(this).attr('data-id'));
        //传名字
        $(".ChatInfoName").text($(this).children(".chat-name").children("p").eq(0).html());

        //传头像
        $(".ChatInfoHead>img").attr("src", $(this).children().eq(0).children("img").attr("src"));

        //聊天框默认最底部
        $(document).ready(function () {
            $("#chatBox-content-demo").scrollTop($("#chatBox-content-demo")[0].scrollHeight);
        });
        $.post('message_list',{'to_id':admin_id,'from_id':$(this).attr('data-id')},function (res) {
            var data = JSON.parse(res);
            var png = "/static/network/img/icon01.png";
            console.dir(data);
            $(".chatBox-content-demo").children().remove();
            for(var i=0;i<data.length;i++){
                var demo = "<div class='clearfloat'><div class='author-name'><small class='chat-date'>" + data[i].dtime +"</small></div>";
                if(data[i].from_id == admin_id){//本人发的
                    demo += "<div class='right'><div class='chat-message'>";
                    if(data[i].img == ''){
                        demo += data[i].content;
                    }else{
                        demo += "<img src='" + data[i].img +"'";
                    }
                    demo += "</div><div class='chat-avatars'><img src='" + png + "' alt='头像'></div></div></div>"
                }else{//对方发的
                    demo += "<div class='left'><div class='chat-avatars'><img src='" + png + "' alt='头像'></div><div class='chat-message'>";
                    if(data[i].img == ''){
                        demo += data[i].content;
                    }else{
                        demo += "<img src='" + data[i].img +"'";
                    }
                    demo += "</div></div></div>"
                }
                $(".chatBox-content-demo").append(demo);
            }

        });
    });
           

五、一些说明

有几点比较重要的地方:

  1. 客户端连接服务端时,要把websocket的线程和客户标识形成绑定关系,并且在客户端断开时,及时在数据表中更新状态。
  2. 客户端发送消息时,应该记录消息发送方和接收方,就像写信一样,如果发件人,寄件人都不知道,邮局该怎么处理信件呢?有些功能的实现不需要这两个参数,但是记录下来总是好的。写入消息表的应该是消息发送方和接收方的用户标识,而不是线程id,因为线程id是波动性很大的,临时性的,上一次连接和下一次连接得到的线程id是不一样的。

能力有限,可能代码中有疏漏的地方,欢迎大家交流指正!