天天看點

gearman 執行個體一枚

有問題多看日志

gearman.log和gearmand.log

* related error messages: http://tech-lightnight.blogspot.jp/2013/04/start-with-gearman.html

* useful information:

https://groups.google.com/forum/#!topic/gearman/2LLx2iq1uhA

http://stackoverflow.com/questions/16815420/when-php-gearman-client-fails-to-connect-to-the-server-i-get-the-same-code-as-wh

start gearman : gearmand -d

stop gearman: gearadmin --shutdown (it seems that it doesn't work)

other commands: gearadmin --status  --workers

watch -n 1 "(echo status; sleep 0.1) | nc 127.0.0.1 4730"

----------------

有一個問題一直沒有解決:

$ gearmand -d

$ command for starting gearman worker

$ waiting for gearman client to run tasks

開始時運作正常,client添加的任務都可以被worker及時處理,但是過了一段時間(可能是幾天)後,client再發任務,worker就隻能收到一部分或收不到任務。

-----------------

下面是一段調通的gearman程式:

1. GearmanClient

從資料庫中取出一堆人checkins,将這堆人分成(小于等于)20組,然後為GearmanClient添加(小于等于)20個背景任務。

public function action_send_notification() {
        
        $event_id = Arr::get($_GET, 'event_id');
        $is_drill = Arr::get($_GET, 'is_drill');
    
        $sql = 'SELECT CheckIn.id as checkin_id, User.id as user_id, User.email, User.phonenumber, User.subscription, User.topicarn from CheckIn, User where CheckIn.eventID = :event_id and CheckIn.EmailStatus = 0 and CheckIn.SMSStatus = 0 and CheckIn.UserID = User.id';  
        $query = DB::query(Database::SELECT, $sql);  
        $query->param(':event_id', $event_id);  
        $checkins = $query->execute()->as_array(); 
        
        if (count($checkins) > 0) {
            $task_arr = $this->array_split($checkins, 20);

            try {
                $gmclient = new GearmanClient();
                $gmclient->addServer();
      
                for ($i=0; $i < count($task_arr); $i++) { 
                    $gmclient->addTaskBackground("sendmail", json_encode($task_arr[$i]));
                }

                $gmclient->runTasks();
            } catch (Exception $e) {
                $response_data['error'] = $e->getMessage();
            }
        }
        $response_data['status'] = 'client success';
        $this->response->body(json_encode($response_data));
    }

    private function array_split($array, $parts) {
        return array_chunk($array, ceil(count($array) / $parts));
    }
           

2. GearmanWorker

将worker設計為常駐程序,在父程序中fork四個子程序,每個子程序啟動一個worker,不停地等待并處理任務。

一旦worker不能正常工作,則所在子程序exit,父程序中的wait将收到子程序退出的通知,于是重新fork一個子程序并啟動補上一個worker。

</pre><p><pre name="code" class="php"><?php defined('SYSPATH') or die('No direct script access.');

    

class Task_Processmail extends Minion_Task {
    
    protected function _execute(array $params)
    {
        for ($i=0; $i < 4; $i++) { 
            $pid = pcntl_fork();
            if (0 == $pid) {      
                $gmworker = new GearmanWorker();
                $gmworker->addServer();
                $gmworker->addFunction("sendmail", "sendmail_fn");
                while (1) {
                    $gmworker->work();
                    if($gmworker->returnCode() != GEARMAN_SUCCESS)
                        break;
                }
                exit();
            } 
        }
    
        // 父程序
        while(TRUE) {
            $pid = pcntl_wait($status);
            
            // 重新開機關閉的子程序
            posix_kill($pid, SIGKILL);
            $pid = pcntl_fork();
            if (0 == $pid) {
                $gmworker = new GearmanWorker();
                $gmworker->addServer();
                $gmworker->addFunction("sendmail", "sendmail_fn");
                while (1) {
                    $gmworker->work();
                    if($gmworker->returnCode() != GEARMAN_SUCCESS)
                        break;
                }
                exit();
            } 
        }
    }

}

function sendmail_fn($job) {
    $workload = json_decode($job->workload(), true);
    // print_r($workload);

    foreach ($workload as $key => $value) {
        // $value = $workload;
        $checkin;
        try {
            $checkin = ORM::factory('Checkin')->where('id', '=', $value['checkin_id'])->find();
        } catch (Exception $e) {
            echo $e->getMessage();
            continue;
        }
 

        if ($checkin->loaded()) {  
            try {
                $content = array();
                $content['subject'] = "Emergency ".(!$value['is_drill'] ? "Situation Now" : "Drill Now");
                $content['content'] = "There is an emergency ".(!$value['is_drill'] ? "situation" : "drill").". Please click <a href=\"http://52.22.36.108/pinpoint/index.php/checkin?eventID=".$value['event_id']."&userID=".$value['user_id']."\">here</a> to checkin with Pinpoint.";

                $email = array();
                $email[] = $value['email'];
                AmazonSMS::sendmail($email, $content);

                //change status
                $checkin->EmailStatus = true;
                
                echo "send email ok.\n";
            } catch (Exception $e) {
                LOG::Instance()->add(Log::ERROR, 'Could not send email to :'.$value['email']);

                //change status
                $checkin->EmailStatus = false;

                echo "**send email fail: ";
                echo $e->getMessage();
            }              
        

            if (!empty($value['phonenumber']) && strlen($value['phonenumber']) > 8) {
                if (!empty($value['subscription']) && strlen($value['subscription'])>0) {
                    // LOG::Instance()->add(Log::INFO, 'SMS sent to :'.$value['phonenumber'], $value);
                    try {
                        AmazonSMS::sendMessage(array(
                            'arn' => $value['topicarn'],
                            'message' => "There is an emergency ".(!$Drill ? "situation" : "drill").". Please checkin with Pinpoint http://52.22.36.108/pinpoint/index.php/checkin?eventID=$event_id&userID=".$value['user_id']
                        ));

                        //change status
                        $checkin->SMSStatus = true;
                        
                    } catch (Exception $e) {
                        // LOG::Instance()->add(Log::ERROR, 'Could not send sms to :'.$value['phonenumber'], $e);

                        //change status
                        $checkin->SMSStatus = false;
                    }
                }
            }

            $checkin->save();
        } else {
            echo "ORM load fail";
        }
    }
}
           

下面的兩幅圖是間隔幾個小時的兩次程序查詢:

gearman 執行個體一枚
gearman 執行個體一枚

可見,第四列資料(程序号)已經改變了,說明在此期間有程序exit并重新fork了新程序。