僅供娛樂:Tokyo Tyrant的協定是先請求再傳回資料,想給它加上主動推資料。c語言不熟,原來計劃少改幾行c代碼,再加上Tokyo Tyrant的lua語言擴充就可以搞定。後來發現功能可以實作,要解決通信和線程的一些問題,可就麻煩了。我用的版本是tokyotyrant-1.1.36,主要看ttserver.c和ttutil.c,代碼寫的很好。我說好,意思是java程式員能看懂。
從ttserver.c裡邊main函數為線索,雖然沒什麼注釋,但是從代碼裡的log字元串也能看出一段代碼的功能。main主要處理指令行參數,然後調用proc函數,其它的先不管,主要關心proc調用的ttservstart函數,來看這個函數,主要看我加的中文注釋:
/* Start the service of a server object. */
bool ttservstart(TTSERV *serv){
assert(serv);
int lfd;
if(serv->port < 1){
lfd = ttopenservsockunix(serv->host);//建立server socket
if(lfd == -1){
ttservlog(serv, TTLOGERROR, "ttopenservsockunix failed");
return false;
}
} else {//建立server socket
lfd = ttopenservsock(serv->addr[0] != '\0' ? serv->addr : NULL, serv->port);
if(lfd == -1){
ttservlog(serv, TTLOGERROR, "ttopenservsock failed");
return false;
}
}
int epfd = epoll_create(TTEVENTMAX);//epoll建立
if(epfd == -1){
close(lfd);
ttservlog(serv, TTLOGERROR, "epoll_create failed");
return false;
}
ttservlog(serv, TTLOGSYSTEM, "service started: %d", getpid());
bool err = false;
for(int i = 0; i < serv->timernum; i++){
TTTIMER *timer = serv->timers + i;
timer->alive = false;
timer->serv = serv;
if(pthread_create(&(timer->thid), NULL, ttservtimer, timer) == 0){
ttservlog(serv, TTLOGINFO, "timer thread %d started", i + 1);
timer->alive = true;
} else {
ttservlog(serv, TTLOGERROR, "pthread_create (ttservtimer) failed");
err = true;
}
}
int thnum = serv->thnum;
TTREQ reqs[thnum];
for(int i = 0; i < thnum; i++){
reqs[i].alive = true;
reqs[i].serv = serv;
reqs[i].epfd = epfd;
reqs[i].mtime = tctime();
reqs[i].keep = false;
reqs[i].idx = i;
//建立處理用戶端請求的線程
if(pthread_create(&reqs[i].thid, NULL, ttservdeqtasks, reqs + i) == 0){
ttservlog(serv, TTLOGINFO, "worker thread %d started", i + 1);
} else {
reqs[i].alive = false;
err = true;
ttservlog(serv, TTLOGERROR, "pthread_create (ttservdeqtasks) failed");
}
}
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.fd = lfd;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev) != 0){//開始監聽伺服器事件
err = true;
ttservlog(serv, TTLOGERROR, "epoll_ctl failed");
}
ttservlog(serv, TTLOGSYSTEM, "listening started");
while(!serv->term){
struct epoll_event events[TTEVENTMAX];
int fdnum = epoll_wait(epfd, events, TTEVENTMAX, TTWAITREQUEST * 1000);//等待epoll事件
if(fdnum != -1){
for(int i = 0; i < fdnum; i++){
if(events[i].data.fd == lfd){//是伺服器事件
char addr[TTADDRBUFSIZ];
int port;
int cfd;
if(serv->port < 1){
cfd = ttacceptsockunix(lfd);//接受連接配接
sprintf(addr, "(unix)");
port = 0;
} else {
cfd = ttacceptsock(lfd, addr, &port);//接受連接配接
}
if(epoll_reassoc(epfd, lfd) != 0){
if(cfd != -1) close(cfd);
cfd = -1;
}
if(cfd != -1){
ttservlog(serv, TTLOGINFO, "connected: %s:%d", addr, port);
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLONESHOT;//讀 一次
ev.data.fd = cfd;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev) != 0){//開始監聽用戶端事件
close(cfd);
err = true;
ttservlog(serv, TTLOGERROR, "epoll_ctl failed");
}
} else {
err = true;
ttservlog(serv, TTLOGERROR, "ttacceptsock failed");
}
} else {//用戶端事件
int cfd = events[i].data.fd;
if(pthread_mutex_lock(&serv->qmtx) == 0){
tclistpush(serv->queue, &cfd, sizeof(cfd));//加入處理隊列
if(pthread_mutex_unlock(&serv->qmtx) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "pthread_mutex_unlock failed");
}
if(pthread_cond_signal(&serv->qcnd) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "pthread_cond_signal failed");
}
} else {
err = true;
ttservlog(serv, TTLOGERROR, "pthread_mutex_lock failed");
}
}
}
。。。。。。。。。。。。。。。。。
}
來看看如果處理用戶端的請求,從ttservdeqtasks開始,我還是隻關心用中文注釋出來的關鍵點,其它細節暫時用不到:
/* Dequeue tasks of a server object and dispatch them.
`argp' specifies the argument structure of the server object.
The return value is `NULL' on success and other on failure. */
static void *ttservdeqtasks(void *argp){
TTREQ *req = argp;
TTSERV *serv = req->serv;
bool err = false;
if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "pthread_setcancelstate failed");
}
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGPIPE);
sigset_t oldsigset;
sigemptyset(&sigset);
if(pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "pthread_sigmask failed");
}
bool empty = false;
while(!serv->term){
if(pthread_mutex_lock(&serv->qmtx) == 0){//得到鎖的才能操作隊列
struct timeval tv;
struct timespec ts;
if(gettimeofday(&tv, NULL) == 0){
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000.0 + TTWAITREQUEST * 1000000000.0;
if(ts.tv_nsec >= 1000000000){
ts.tv_nsec -= 1000000000;
ts.tv_sec++;
}
} else {
ts.tv_sec = (1ULL << (sizeof(time_t) * 8 - 1)) - 1;
ts.tv_nsec = 0;
}
int code = empty ? pthread_cond_timedwait(&serv->qcnd, &serv->qmtx, &ts) : 0;
if(code == 0 || code == ETIMEDOUT || code == EINTR){
void *val = tclistshift2(serv->queue);//待處理用戶端出隊
if(pthread_mutex_unlock(&serv->qmtx) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "pthread_mutex_unlock failed");
}
if(val){
empty = false;
int cfd = *(int *)val;
tcfree(val);
pthread_cleanup_push((void (*)(void *))close, (void *)(intptr_t)cfd);
TTSOCK *sock = ttsocknew(cfd);//用戶端socket
pthread_cleanup_push((void (*)(void *))ttsockdel, sock);
bool reuse;
do {
if(serv->timeout > 0) ttsocksetlife(sock, serv->timeout);
req->mtime = tctime();
req->keep = false;
ttservtask(sock, req);//調用ttserver.c裡的do_task來處理具體請求
reuse = false;
if(sock->end){
req->keep = false;
} else if(sock->ep > sock->rp){
reuse = true;
}
} while(reuse);
pthread_cleanup_pop(1);
pthread_cleanup_pop(0);
if(req->keep){//再把用戶端加入epoll事件監聽,對通信來說這裡很重要
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
[b]ev.events = EPOLLIN | EPOLLONESHOT;[/b]
ev.data.fd = cfd;
if([b]epoll_ctl(req->epfd, EPOLL_CTL_MOD, cfd, &ev)[/b] != 0){
close(cfd);
err = true;
ttservlog(serv, TTLOGERROR, "epoll_ctl failed");
}
} else {
if(epoll_ctl(req->epfd, EPOLL_CTL_DEL, cfd, NULL) != 0){
err = true;
ttservlog(serv, TTLOGERROR, "epoll_ctl failed");
}
if(!ttclosesock(cfd)){
err = true;
ttservlog(serv, TTLOGERROR, "close failed");
}
ttservlog(serv, TTLOGINFO, "connection finished");
}
。。。。。。。。。
}
上面函數調用的do_task函數就簡單了,先是處理二進制協定的各種指令,然後是對memcached協定和HTTP的支援
/* handle a task and dispatch it */
static void do_task(TTSOCK *sock, void *opq, TTREQ *req){
TASKARG *arg = (TASKARG *)opq;
int c = ttsockgetc(sock);
if(c == TTMAGICNUM){
switch(ttsockgetc(sock)){//處理二進制協定的各種指令
case TTCMDPUT:
do_put(sock, arg, req);
break;
case TTCMDPUTKEEP:
do_putkeep(sock, arg, req);
break;
。。。。。。。。。
case TTCMDEXT:
do_ext(sock, arg, req);//對這個比較感興趣,還要往下看
break;
。。。。。。。。。
case TTCMDREPL:
do_repl(sock, arg, req);
break;
default:
ttservlog(g_serv, TTLOGINFO, "unknown command");
break;
}
} else {
ttsockungetc(sock, c);
char *line = ttsockgets2(sock);
if(line){
pthread_cleanup_push(tcfree, line);
int tnum;
char **tokens = tokenize(line, &tnum);
pthread_cleanup_push(tcfree, tokens);
if(tnum > 0){
const char *cmd = tokens[0];
if(!strcmp(cmd, "set")){//[b]memcached協定[/b]
do_mc_set(sock, arg, req, tokens, tnum);
} else if(!strcmp(cmd, "add")){
do_mc_add(sock, arg, req, tokens, tnum);
。。。。。。。。。。。。。。。。
} else if(!strcmp(cmd, "quit")){
do_mc_quit(sock, arg, req, tokens, tnum);
} else if(tnum > 2 && tcstrfwm(tokens[2], "HTTP/1.")){
int ver = tcatoi(tokens[2] + 7);
const char *uri = tokens[1];
if(tcstrifwm(uri, "http://")){
const char *pv = strchr(uri + 7, '/');
if(pv) uri = pv;
}
if(!strcmp(cmd, "GET")){//[b]HTTP的支援[/b]
do_http_get(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "HEAD")){
do_http_head(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "PUT")){
do_http_put(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "POST")){
do_http_post(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "DELETE")){
do_http_delete(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "OPTIONS")){
do_http_options(sock, arg, req, ver, uri);
}
}
}
pthread_cleanup_pop(1);
pthread_cleanup_pop(1);
}
}
}
要給用戶端推送它事先訂閱的内容,要有個地方記錄,不能用lua的全局變量,可以用stash,全局的,記憶體的。想擴充又想盡量少改C代碼,ext command 可以調LUA腳本,腳本語言會簡單一些吧?再說ext command的本意就是用來擴充的。lua擴充裡沒有fd,是以要改C。要保證多線程下,推資料和拉的資料不發生混亂,那個以我的c水準搞不定。
/* handle the ext command */
static void do_ext(TTSOCK *sock, TASKARG *arg, TTREQ *req){
ttservlog(g_serv, TTLOGDEBUG, "doing ext command");
arg->counts[TTSEQNUM*req->idx+TTSEQEXT]++;
uint64_t mask = arg->mask;
pthread_mutex_t *rmtxs = arg->rmtxs;
void *scr = arg->screxts[req->idx];
int nsiz = ttsockgetint32(sock);
int opts = ttsockgetint32(sock);//opts是用戶端指定的,1左移0位和1位分别代表全局鎖和記錄鎖
int ksiz = ttsockgetint32(sock);
int vsiz = ttsockgetint32(sock);
if(ttsockcheckend(sock) || nsiz < 0 || nsiz >= TTADDRBUFSIZ ||
ksiz < 0 || ksiz > MAXARGSIZ || vsiz < 0 || vsiz > MAXARGSIZ){
ttservlog(g_serv, TTLOGINFO, "do_ext: invalid parameters");
//要注意位元組序啊,要不然也會出這個無效參數問題
return;
}
int rsiz = nsiz + ksiz + vsiz;
char stack[TTIOBUFSIZ];
char *buf = (rsiz < TTIOBUFSIZ) ? stack : tcmalloc(rsiz + 1);
pthread_cleanup_push(free, (buf == stack) ? NULL : buf);
if(ttsockrecv(sock, buf, rsiz) && !ttsockcheckend(sock)){
char name[TTADDRBUFSIZ];
memcpy(name, buf, nsiz);
name[nsiz] = '\0';
const char *kbuf = buf + nsiz;
const char *vbuf = kbuf + ksiz;
int xsiz = 0;
char *xbuf = NULL;
if(mask & ((1ULL << TTSEQEXT) | (1ULL << TTSEQALLORG))){
ttservlog(g_serv, TTLOGINFO, "do_ext: forbidden");
} else if(scr){
if(opts & RDBXOLCKGLB){//全局鎖
bool err = false;
for(int i = 0; i < RECMTXNUM; i++){
if(pthread_mutex_lock(rmtxs + i) != 0){
ttservlog(g_serv, TTLOGERROR, "do_ext: pthread_mutex_lock failed");
while(--i >= 0){
pthread_mutex_unlock(rmtxs + i);
}
err = true;
break;
}
}
if(!err){//調用lua腳本去處理key和value
xbuf = scrextcallmethod(scr, name, kbuf, ksiz, vbuf, vsiz, &xsiz);
for(int i = RECMTXNUM - 1; i >= 0; i--){
if(pthread_mutex_unlock(rmtxs + i) != 0)
ttservlog(g_serv, TTLOGERROR, "do_ext: pthread_mutex_unlock failed");
}
}
} else if(opts & RDBXOLCKREC){//記錄鎖
int mtxidx = recmtxidx(kbuf, ksiz);
if(pthread_mutex_lock(rmtxs + mtxidx) == 0){//調用lua腳本去處理key和value
xbuf = scrextcallmethod(scr, name, kbuf, ksiz, vbuf, vsiz, &xsiz);
if(pthread_mutex_unlock(rmtxs + mtxidx) != 0)
ttservlog(g_serv, TTLOGERROR, "do_ext: pthread_mutex_unlock failed");
} else {
ttservlog(g_serv, TTLOGERROR, "do_ext: pthread_mutex_lock failed");
}
} else {//沒鎖 調用lua腳本去處理key和value
xbuf = scrextcallmethod(scr, name, kbuf, ksiz, vbuf, vsiz, &xsiz);
}
}
if(xbuf){//腳本處理的傳回值,要發給用戶端
int rsiz = xsiz + sizeof(uint8_t) + sizeof(uint32_t);
char *rbuf = (rsiz < TTIOBUFSIZ) ? stack : tcmalloc(rsiz);
pthread_cleanup_push(free, (rbuf == stack) ? NULL : rbuf);
*rbuf = 0;
uint32_t num;
num = TTHTONL((uint32_t)xsiz);
memcpy(rbuf + sizeof(uint8_t), &num, sizeof(uint32_t));
memcpy(rbuf + sizeof(uint8_t) + sizeof(uint32_t), xbuf, xsiz);
tcfree(xbuf);
if(ttsocksend(sock, rbuf, rsiz)){//發給用戶端
req->keep = true;
} else {
ttservlog(g_serv, TTLOGINFO, "do_ext: response failed");
}
pthread_cleanup_pop(1);
} else {
uint8_t code = 1;
if(ttsocksend(sock, &code, sizeof(code))){
req->keep = true;
} else {
ttservlog(g_serv, TTLOGINFO, "do_ext: response failed");
}
}
} else {
ttservlog(g_serv, TTLOGINFO, "do_ext: invalid entity");
}
pthread_cleanup_pop(1);
}
opts是用戶端指定的,1左移0位和1位分别代表全局鎖和記錄鎖,下邊用左移2位和3位辨別不同操作。
要想儲存fd,在判斷全局鎖之前加入如下代碼:
if(opts & (1<<2) ){
char *tbuf = buf + nsiz + ksiz;
sprintf( tbuf, "%d", sock->fd);
vsiz = strlen(tbuf);
//以上代碼把用戶端的fd做為value傳到腳本裡,腳本可以把它儲存到stash
}
腳本處理的傳回值,要發給特定的用戶端,比如從stash裡拿出的,可以這樣試一下,在 if(xbuf){ 之後加上:
if(opts & (1<<3) ){
int fd2 = atoi(xbuf);
sock->fd = fd2;//把從stash裡取出的值當作sock的fd
xsiz = vsiz;
}
以上隻是用最少的改動驗證一些想法是否可行,僅供娛樂,記着編譯要./configure --enable-lua。
現在還少相應lua腳本,這個也是簡單改了一下[url=http://github.com/igrigorik/tokyo-recipes]開源的代碼[/url],lua腳本咱也不熟。(什麼也不熟也出來混?隻想看下學一門新技術要多長時間,下次我用java做,也有個比較。)
local SEP = '\n'
function _set_len(stream)
local count = 0
if stream then
count = table.getn(_split(stream, SEP))
end
return count
end
function set_length(key, value)
return _set_len(_stashget(key))
end
function set_get(key)
return _stashget(key)
end
function set_append(key, value)
local stream = _stashget(key)
if not stream then
_stashput(key, value)
else
local set_len = _set_len(stream)
if set_len == 1 then
if stream == value then return nil end
elseif set_len > 1 then
for _, element in ipairs(_split(stream, SEP)) do
if element == value then return nil end
end
end
if not _stashput(key, stream .. SEP .. value) then
return nil
end
end
return value
end
function set_delete(key, value)
local stream = _stashget(key)
if stream then
local set_len = _set_len(stream)
if set_len == 1 and stream == value then
if _stashout(key) then return value end
elseif set_len > 1 then
local found = -1
local set_list = _split(stream, SEP)
for i, element in ipairs(set_list) do
if element == value then
found = i
break
end
end
if found > -1 then
table.remove(set_list, found)
if _stashput(key, table.concat(set_list, SEP)) then return value end
end
end
end
return nil
end
function set_delete_all()
_stashvanish()
end
還有用戶端的代碼,我用actionscript,代碼太長,隻貼片段:
function testext(){
var fn:String = "set_append";//調用lua腳本set_append函數,向stash裡
var key:String = "600000"; //以600000為key的"set"裡放
var val:String = "flash"; //字元串值
var ha:ByteArray=new ByteArray();
ha.endian = Endian.BIG_ENDIAN;
ha.writeByte(0xc8);
ha.writeByte(0x68);
ha.writeUnsignedInt(fn.length);
ha.writeUnsignedInt(5); //101 C語言裡讀取到的opts值
ha.writeUnsignedInt(key.length);
ha.writeUnsignedInt(val.length);
ha.writeUTFBytes(fn);
ha.writeUTFBytes(key);
ha.writeUTFBytes(val);
socket.writeBytes(ha,0,ha.length);
socket.flush();
}
function testpush(){
var fn:String = "set_get";//調用lua腳本set_get函數,向stash裡取
var key:String = "600000";//以600000為key的"set"裡所有值
var val:String = "flash";
var ha:ByteArray=new ByteArray();
ha.endian = Endian.BIG_ENDIAN;
ha.writeByte(0xc8);
ha.writeByte(0x68);
ha.writeUnsignedInt(fn.length);
ha.writeUnsignedInt(9); //1001 C語言裡讀取到的opts值
ha.writeUnsignedInt(key.length);
ha.writeUnsignedInt(val.length);
ha.writeUTFBytes(fn);
ha.writeUTFBytes(key);
ha.writeUTFBytes(val);
socket.writeBytes(ha,0,ha.length);
socket.flush();
}
再想解決一些深入的問題就不是這麼簡單了,我甯願用java來做,是以就到這了。