天天看點

PHP并發 任務隊列(2)

 例如:郵件隊列、任務隊列、消息隊列、feed隊列 ,手機短信,紀錄日志,轉換視訊格式,資料挖掘采集 ,為了增強使用者體驗可以用隊列或異步處理

php短耗時異步處理

PHP并發 任務隊列(2)

<?php  

echo '執行完畢提示';  

fastcgi_finish_request(); /* 響應完成, 關閉連接配接 */  

echo '不會輸出';  

sleep(3); //sleep模拟一些耗時的操作  

/* 記錄日志 */  

file_put_contents('log.txt', '生存還是毀滅,這是個問題.');  

?>  

這個部署有一個好處,就是可以不至于讓一個頁面的響應時間太久,直接傳回給使用者一個執行完畢的提示,但php背景實際還在繼續執行,這樣做可能會有使用者體驗上的舒服感。

php 長耗時用隊列

http://blog.s135.com/httpsqs/

記憶體隊列

PHP并發 任務隊列(2)

<?php    

/**   

* 使用共享記憶體的php循環記憶體隊列實作   

* 支援多程序, 支援各種資料類型的存儲   

* 注: 完成入隊或出隊操作,盡快使用unset(), 以釋放臨界區   

*   

* @author [email protected]   

* @created 2009-12-23   

*/    

class shmqueue    

{    

    private $maxqsize = 0; // 隊列最大長度    

    private $front = 0; // 隊頭指針    

    private $rear = 0;  // 隊尾指針    

    private $blocksize = 256;  // 塊的大小(byte)    

    private $memsize = 25600;  // 最大共享記憶體(byte)    

    private $shmid = 0;    

    private $fileptr = './shmq.ptr';    

    private $semid = 0;    

    public function __construct()    

    {            

        $shmkey = ftok(__file__, 't');    

        $this->shmid = shmop_open($shmkey, "c", 0644, $this->memsize );    

        $this->maxqsize = $this->memsize / $this->blocksize;    

         // 申請一個信号量    

        $this->semid = sem_get($shmkey, 1);    

        sem_acquire($this->semid); // 申請進入臨界區            

        $this->init();    

    }    

    private function init()    

    {    

        if ( file_exists($this->fileptr) ){    

            $contents = file_get_contents($this->fileptr);    

            $data = explode( '|', $contents );    

            if ( isset($data[0]) && isset($data[1])){    

                $this->front = (int)$data[0];    

                $this->rear  = (int)$data[1];    

            }    

        }    

    public function getlength()    

        return (($this->rear - $this->front + $this->memsize) % ($this->memsize) )/$this->blocksize;    

    public function enqueue( $value )    

        if ( $this->ptrinc($this->rear) == $this->front ){ // 隊滿    

            return false;    

        $data = $this->encode($value);    

        shmop_write($this->shmid, $data, $this->rear );    

        $this->rear = $this->ptrinc($this->rear);    

        return true;    

    public function dequeue()    

        if ( $this->front == $this->rear ){ // 隊空    

        $value = shmop_read($this->shmid, $this->front, $this->blocksize-1);    

        $this->front = $this->ptrinc($this->front);    

        return $this->decode($value);    

    private function ptrinc( $ptr )    

        return ($ptr + $this->blocksize) % ($this->memsize);    

    private function encode( $value )    

        $data = serialize($value) . "__eof";    

        if ( strlen($data) > $this->blocksize -1 ){    

            throw new exception(strlen($data)." is overload block size!");    

        return $data;    

    private function decode( $value )    

        $data = explode("__eof", $value);    

        return unserialize($data[0]);            

    public function __destruct()    

        $data = $this->front . '|' . $this->rear;    

        file_put_contents($this->fileptr, $data);    

        sem_release($this->semid); // 出臨界區, 釋放信号量    

}    

//使用的樣例代碼如下:  

// 進隊操作  

$shmq = new shmqueue();    

$data = 'test data';    

$shmq->enqueue($data);   

$data = 'test';    

$shmq->enqueue($data);    

unset($shmq);    

// 出隊操作    

$data = $shmq->dequeue();    

unset($shmq);  

memcache隊列 可以使用比較專業的工具,例如:apache activemq、memcacheq,下面是兩個基本簡單的實作方式:

PHP并發 任務隊列(2)

/** 

 * php memcache 隊列類 

 * @author lkk/lianq.net 

 * @version 0.3 

 * @修改說明: 

 * 1.放棄了之前的ab面輪值思路,使用類似數組的構造,重寫了此類. 

 * 2.隊列預設先進先出,但增加了反向讀取功能. 

 * @example: 

    $obj = new memcachequeue('duilie'); 

    for ($i = 1; $i <= 10; $i++) { 

        $obj->add($i . 'asdf'); 

    } 

    $count = $obj->getqueuelength(); 

    $list = $obj->read(10); //讀資料10條,沒有出棧 

    $poplist = $obj->get(8); //出棧 

 */  

class memcachequeue  

{  

    public static $client; //memcache用戶端連接配接  

    public $access; //隊列是否可更新  

    private $expire; //過期時間,秒,1~2592000,即30天内  

    private $sleeptime; //等待解鎖時間,微秒  

    private $queuename; //隊列名稱,唯一值  

    private $retrynum; //重試次數,= 10 * 理論并發數  

    public $currenthead; //目前隊首值  

    public $currenttail; //目前隊尾值  

    const maxnum = 20000; //最大隊列數,建議上限10k  

    const head_key = '_lkkqueuehead_'; //隊列首key  

    const tail_key = '_lkkqueuetail_'; //隊列尾key  

    const valu_key = '_lkkqueuevalu_'; //隊列值key  

    const lock_key = '_lkkqueuelock_'; //隊列鎖key  

    /** 

     * 構造函數 

     * @param string $queuename 隊列名稱 

     * @param int $expire 過期時間 

     * @param array $config memcache配置 

     * 

     * @return <type> 

     */  

    public function __construct($queuename = '', $expire = 0, $config = '')  

    {  

        if (empty($config)) {  

            self::$client = memcache_pconnect('127.0.0.1', 11211);  

        } elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')  

            self::$client = memcache_pconnect($config['host'], $config['port']);  

        } elseif (is_string($config)) { //"127.0.0.1:11211"  

            $tmp = explode(':', $config);  

            $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  

            $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  

            self::$client = memcache_pconnect($conf['host'], $conf['port']);  

        }  

        if (!self::$client) return false;  

        ignore_user_abort(true); //當客戶斷開連接配接,允許繼續執行  

        set_time_limit(0); //取消腳本執行延時上限  

        $this->access = false;  

        $this->sleeptime = 1000;  

        $expire = empty($expire) ? 3600 : intval($expire) + 1;  

        $this->expire = $expire;  

        $this->queuename = $queuename;  

        $this->retrynum = 1000;  

        $this->head_key = $this->queuename . self::head_key;  

        $this->tail_key = $this->queuename . self::tail_key;  

        $this->lock_key = $this->queuename . self::lock_key;  

        $this->_initsetheadntail();  

    }  

     * 初始化設定隊列首尾值 

    private function _initsetheadntail()  

        //目前隊列首的數值  

        $this->currenthead = memcache_get(self::$client, $this->head_key);  

        if ($this->currenthead === false) $this->currenthead = 0;  

        //目前隊列尾的數值  

        $this->currenttail = memcache_get(self::$client, $this->tail_key);  

        if ($this->currenttail === false) $this->currenttail = 0;  

     * 當取出元素時,改變隊列首的數值 

     * @param int $step 步長值 

    private function _changehead($step = 1)  

        $this->currenthead += $step;  

        memcache_set(self::$client, $this->head_key, $this->currenthead, false, $this->expire);  

     * 當添加元素時,改變隊列尾的數值 

     * @param bool $reverse 是否反向 

     * @return null 

    private function _changetail($step = 1, $reverse = false)  

        if (!$reverse) {  

            $this->currenttail += $step;  

        } else {  

            $this->currenttail -= $step;  

        memcache_set(self::$client, $this->tail_key, $this->currenttail, false, $this->expire);  

     * 隊列是否為空 

     * @return bool 

    private function _isempty()  

        return (bool)($this->currenthead === $this->currenttail);  

     * 隊列是否已滿 

    private function _isfull()  

        $len = $this->currenttail - $this->currenthead;  

        return (bool)($len === self::maxnum);  

     * 隊列加鎖 

    private function _getlock()  

        if ($this->access === false) {  

            while (!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire)) {  

                usleep($this->sleeptime);  

                @$i++;  

                if ($i > $this->retrynum) { //嘗試等待n次  

                    return false;  

                    break;  

                }  

            }  

            $this->_initsetheadntail();  

            return $this->access = true;  

        return $this->access;  

     * 隊列解鎖 

    private function _unlock()  

        memcache_delete(self::$client, $this->lock_key, 0);  

     * 擷取目前隊列的長度 

     * 該長度為理論長度,某些元素由于過期失效而丢失,真實長度<=該長度 

     * @return int 

    public function getqueuelength()  

        return intval($this->currenttail - $this->currenthead);  

     * 添加隊列資料 

     * @param void $data 要添加的資料 

    public function add($data)  

        if (!$this->_getlock()) return false;  

        if ($this->_isfull()) {  

            $this->_unlock();  

            return false;  

        $value_key = $this->queuename . self::valu_key . strval($this->currenttail + 1);  

        $result = memcache_set(self::$client, $value_key, $data, memcache_compressed, $this->expire);  

        if ($result) {  

            $this->_changetail();  

        $this->_unlock();  

        return $result;  

     * 讀取隊列資料 

     * @param int $length 要讀取的長度(反向讀取使用負數) 

     * @return array 

    public function read($length = 0)  

        if (!is_numeric($length)) return false;  

        if ($this->_isempty()) {  

        if (empty($length)) $length = self::maxnum; //預設所有  

        $keyarr = array();  

        if ($length > 0) { //正向讀取(從隊列首向隊列尾)  

            $tmpmin = $this->currenthead;  

            $tmpmax = $tmpmin + $length;  

            for ($i = $tmpmin; $i <= $tmpmax; $i++) {  

                $keyarr[] = $this->queuename . self::valu_key . $i;  

        } else { //反向讀取(從隊列尾向隊列首)  

            $tmpmax = $this->currenttail;  

            $tmpmin = $tmpmax + $length;  

            for ($i = $tmpmax; $i > $tmpmin; $i--) {  

        $result = @memcache_get(self::$client, $keyarr);  

     * 取出隊列資料 

     * @param int $length 要取出的長度(反向讀取使用負數) 

    public function get($length = 0)  

        $length = intval($length);  

            $this->_changehead($length);  

            $this->_changetail(abs($length), true);  

        foreach ($keyarr as $v) { //取出之後删除  

            @memcache_delete(self::$client, $v, 0);  

     * 清空隊列 

    public function clear()  

        $tmpmin = $this->currenthead--;  

        $tmpmax = $this->currenttail++;  

        for ($i = $tmpmin; $i <= $tmpmax; $i++) {  

            $tmpkey = $this->queuename . self::valu_key . $i;  

            @memcache_delete(self::$client, $tmpkey, 0);  

        $this->currenttail = $this->currenthead = 0;  

    /* 

     * 清除所有memcache緩存資料 

    public function memflush()  

        memcache_flush(self::$client);  

}