有問題多看日志
gearman.log和gearmand.log
* related error messages: http://tech-lightnight.blogspot.jp/2013/04/start-with-gearman.html
* useful information:
https://groups.google.com/forum/#!topic/gearman/2LLx2iq1uhA
http://stackoverflow.com/questions/16815420/when-php-gearman-client-fails-to-connect-to-the-server-i-get-the-same-code-as-wh
start gearman : gearmand -d
stop gearman: gearadmin --shutdown (it seems that it doesn't work)
other commands: gearadmin --status --workers
watch -n 1 "(echo status; sleep 0.1) | nc 127.0.0.1 4730"
----------------
有一個問題一直沒有解決:
$ gearmand -d
$ command for starting gearman worker
$ waiting for gearman client to run tasks
開始時運作正常,client添加的任務都可以被worker及時處理,但是過了一段時間(可能是幾天)後,client再發任務,worker就隻能收到一部分或收不到任務。
-----------------
下面是一段調通的gearman程式:
1. GearmanClient
從資料庫中取出一堆人checkins,将這堆人分成(小于等于)20組,然後為GearmanClient添加(小于等于)20個背景任務。
public function action_send_notification() {
$event_id = Arr::get($_GET, 'event_id');
$is_drill = Arr::get($_GET, 'is_drill');
$sql = 'SELECT CheckIn.id as checkin_id, User.id as user_id, User.email, User.phonenumber, User.subscription, User.topicarn from CheckIn, User where CheckIn.eventID = :event_id and CheckIn.EmailStatus = 0 and CheckIn.SMSStatus = 0 and CheckIn.UserID = User.id';
$query = DB::query(Database::SELECT, $sql);
$query->param(':event_id', $event_id);
$checkins = $query->execute()->as_array();
if (count($checkins) > 0) {
$task_arr = $this->array_split($checkins, 20);
try {
$gmclient = new GearmanClient();
$gmclient->addServer();
for ($i=0; $i < count($task_arr); $i++) {
$gmclient->addTaskBackground("sendmail", json_encode($task_arr[$i]));
}
$gmclient->runTasks();
} catch (Exception $e) {
$response_data['error'] = $e->getMessage();
}
}
$response_data['status'] = 'client success';
$this->response->body(json_encode($response_data));
}
private function array_split($array, $parts) {
return array_chunk($array, ceil(count($array) / $parts));
}
2. GearmanWorker
将worker設計為常駐程序,在父程序中fork四個子程序,每個子程序啟動一個worker,不停地等待并處理任務。
一旦worker不能正常工作,則所在子程序exit,父程序中的wait将收到子程序退出的通知,于是重新fork一個子程序并啟動補上一個worker。
</pre><p><pre name="code" class="php"><?php defined('SYSPATH') or die('No direct script access.');
class Task_Processmail extends Minion_Task {
protected function _execute(array $params)
{
for ($i=0; $i < 4; $i++) {
$pid = pcntl_fork();
if (0 == $pid) {
$gmworker = new GearmanWorker();
$gmworker->addServer();
$gmworker->addFunction("sendmail", "sendmail_fn");
while (1) {
$gmworker->work();
if($gmworker->returnCode() != GEARMAN_SUCCESS)
break;
}
exit();
}
}
// 父程序
while(TRUE) {
$pid = pcntl_wait($status);
// 重新開機關閉的子程序
posix_kill($pid, SIGKILL);
$pid = pcntl_fork();
if (0 == $pid) {
$gmworker = new GearmanWorker();
$gmworker->addServer();
$gmworker->addFunction("sendmail", "sendmail_fn");
while (1) {
$gmworker->work();
if($gmworker->returnCode() != GEARMAN_SUCCESS)
break;
}
exit();
}
}
}
}
function sendmail_fn($job) {
$workload = json_decode($job->workload(), true);
// print_r($workload);
foreach ($workload as $key => $value) {
// $value = $workload;
$checkin;
try {
$checkin = ORM::factory('Checkin')->where('id', '=', $value['checkin_id'])->find();
} catch (Exception $e) {
echo $e->getMessage();
continue;
}
if ($checkin->loaded()) {
try {
$content = array();
$content['subject'] = "Emergency ".(!$value['is_drill'] ? "Situation Now" : "Drill Now");
$content['content'] = "There is an emergency ".(!$value['is_drill'] ? "situation" : "drill").". Please click <a href=\"http://52.22.36.108/pinpoint/index.php/checkin?eventID=".$value['event_id']."&userID=".$value['user_id']."\">here</a> to checkin with Pinpoint.";
$email = array();
$email[] = $value['email'];
AmazonSMS::sendmail($email, $content);
//change status
$checkin->EmailStatus = true;
echo "send email ok.\n";
} catch (Exception $e) {
LOG::Instance()->add(Log::ERROR, 'Could not send email to :'.$value['email']);
//change status
$checkin->EmailStatus = false;
echo "**send email fail: ";
echo $e->getMessage();
}
if (!empty($value['phonenumber']) && strlen($value['phonenumber']) > 8) {
if (!empty($value['subscription']) && strlen($value['subscription'])>0) {
// LOG::Instance()->add(Log::INFO, 'SMS sent to :'.$value['phonenumber'], $value);
try {
AmazonSMS::sendMessage(array(
'arn' => $value['topicarn'],
'message' => "There is an emergency ".(!$Drill ? "situation" : "drill").". Please checkin with Pinpoint http://52.22.36.108/pinpoint/index.php/checkin?eventID=$event_id&userID=".$value['user_id']
));
//change status
$checkin->SMSStatus = true;
} catch (Exception $e) {
// LOG::Instance()->add(Log::ERROR, 'Could not send sms to :'.$value['phonenumber'], $e);
//change status
$checkin->SMSStatus = false;
}
}
}
$checkin->save();
} else {
echo "ORM load fail";
}
}
}
下面的兩幅圖是間隔幾個小時的兩次程序查詢:
可見,第四列資料(程序号)已經改變了,說明在此期間有程序exit并重新fork了新程序。