#include "network.h"
#include "buffer.h"
#include "websocket.h"
int eag
= 0;
static int
on_message(http_parser
*parser)
{
return 0;
}
on_path(http_parser
*parser,
const char *at,
size_t len)
rio_client
*client = parser->data;
client->path
= malloc(sizeof(char)
* (len+1));
if (client->path
== NULL)
error_exit("Malloc");
}
strncpy(client->path,
at, len);
client->path[len]
= '\0';
client->method
= (unsigned
char) parser->method;
debug_print("HTTP-REQ method: %d\n",
(int)
client->method);
debug_print("HTTP-REQ Path: %s %d\n",
client->path,
len);
int on_header_field(http_parser
int on_header_value(http_parser
int on_headers_complete(http_parser
*parser)
int on_body(http_parser
int on_message_complete(http_parser
http_parser_settings
parser_settings =
on_message,
on_path,
on_header_field,
on_header_value,
on_headers_complete,
on_body,
on_message_complete
};
void
handle_write(rio_worker
*worker,
rio_client *cli,
char* resp)
struct epoll_event
event;
int s,
ret;
khiter_t k;
cli->buffer
= new_rio_buffer_size(strlen(resp));
rio_buffer_copy_data(cli->buffer,
resp, strlen(resp));
free(resp);
debug_print("Handle Write: %d : %s\n",
cli->fd,
rio_buffer_get_data(cli->buffer));
event.events
= EPOLLOUT;
event.data.fd
= cli->fd;
if (epoll_ctl(worker->epoll_fd,
EPOLL_CTL_MOD, cli->fd,
&event)
== -1)
debug_print("Error on epoll_ctl_mod on %d\n",
cli->fd,
worker->epoll_fd);
k =
kh_put(clients,
h, cli->fd
, &ret);
kh_value(h,
k) =
*cli;
do_write(rio_worker
struct epoll_event
*event)
int sent;
debug_print("Do Write to fd: %d : %s\n",
rio_buffer_get_data(cli->buffer));
do {
sent
= send(cli->fd,
rio_buffer_get_data(cli->buffer),
cli->buffer->length,
MSG_DONTWAIT);
if (sent
< 0 &&
errno != EAGAIN)
debug_print("Do Write: send error on fd: %d errno: %d\n",
cli->fd,
errno);
break;
} else
if (sent
< 0 && errno
== EAGAIN)
debug_print("Do Write: EAGAIN on fd: %d\n",
cli->fd);
break;
> 0) {
rio_buffer_adjust(cli->buffer,
sent);
}
} while
(sent >
0 && rio_buffer_get_data(cli->buffer)
!= NULL);
debug_print("Do Write sent: %d strlen: %zu\n",
sent, cli->buffer->length);
remove_and_close(cli,
worker, event);
int
handle_read(rio_worker
*ev)
size_t len
= 4096;
ssize_t
received = 0;
total_received = 0;
//allocate space for data
= new_rio_buffer_size(sizeof(char)
* 4096);
debug_print("Handle Read from %d\n",
received
= recv(ev->data.fd,
cli->buffer->content,
len, MSG_DONTWAIT);
if (received
< 0)
if
(errno != EAGAIN
&& errno !=
EWOULDBLOCK) {
if
(received ==
0) {
//if error, remove from epoll and close socket
debug_print("Client received error: disconnected"
"errno %d\n",
}
else {
debug_print("Some other error %d\n",
}
//handle_http will take care of this :)
}
//received += 1;
debug_print("EAGAIN on recv from fd: %d\n",
//if EAGAIN, insert on epoll again
ev->events
= EPOLLIN |
EPOLLET;
//add socket to epoll
(epoll_ctl(worker->epoll_fd,
EPOLL_CTL_MOD,
cli->fd,
ev)
error_exit("Could not add conn_sock to epoll");
eag
+= 1;
printf("EAGAIN %d\n",
eag);
}
if (received
== 0)
//client disconnected
return
0;
total_received
+= received;
debug_print("READ AGAIN on %d\n",
(received >
debug_print("Total received %zu\n",
total_received);
cli->buffer->length
= total_received;
return received;
remove_and_close(rio_client
*client,
rio_worker
*worker,
struct
epoll_event *event)
int rc
= epoll_ctl(worker->epoll_fd,
EPOLL_CTL_DEL, client->fd,
event);
if (rc
debug_print("[WARNING] on epoll_ctl_del on %d\n",
client->fd,
if (close(client->fd)
debug_print("Error on close client %d\n",
client->fd,
if (client->buffer
!= NULL)
rio_buffer_free(&client->buffer);
return rc;
handle_http(rio_worker
struct epoll_event event,
rio_client *cli)
int response;
char buf[4096];
char resp[1024];
size_t n;
if (event.events
& EPOLLIN)
//handle read
int
received = handle_read(worker,
cli, &event);
//create http parser
http_parser
*parser = malloc(sizeof(http_parser));
if (!parser){
error_exit("malloc error: http_parser");
http_parser_init(parser,
HTTP_REQUEST);
//set parser data
parser->data
= (void*)cli;
//execute http parsing only if data was read
> 0)
debug_print("Execute http parsing client: %d\n",
n
= http_parser_execute(parser,
&parser_settings,
rio_buffer_get_data(cli->buffer),
received);
if (parser->upgrade)
//#TODO: what to do?
{ // client disconnected!
debug_print("Client %d Disconnected!\n",
//delete fd from epoll and close
remove_and_close(cli,
worker, &event);
free(parser);
return;
if (n
!= received)
debug_print("Error parsing, closing socket n:%zu received:%d\n",
n,
received);
rio_buffer_free(&cli->buffer);
response
= dispatch(cli,
cli->path);
if (response
!= DISPATCH_FINISHED)
//write response
debug_print("Async Dispatch to fd: %d\n",
debug_print("Freeing %s\n",
free(cli->path);
cli->path
= NULL;
free(parser);
} else
if (event.events
& EPOLLOUT)
//if socket is ready to write, do it!
do_write(worker,
init_clients()
h =
kh_init(clients);
free_clients()
khiter_t
element;
debug_print("Closing clients structures\n",
h);
for (element
= kh_begin(h);
element != kh_end(h);
++element)
if (kh_exist(h,
element)) {
debug_print("%d\n",
((rio_client)
kh_val(h,
element)).fd);
kh_del(clients,
h, element);
kh_destroy(clients,
socket_bind()
int server_fd;
int arg;
struct sockaddr_in
sin;
//bind
memset(&sin,
0, sizeof(struct
sockaddr_in));
sin.sin_family
= AF_INET;
sin.sin_port
= htons(80);
sin.sin_addr.s_addr
= inet_addr("0.0.0.0");
//create socket
if ((server_fd
= socket(AF_INET,
SOCK_STREAM, 0))
error_exit("Could not create socket.");
//set socket non-blocking
if (fcntl(server_fd,
F_SETFL, O_NONBLOCK)
error_exit("Could not set socket non-blocking");
//set socket options
arg =
1;
if (setsockopt
(server_fd,
SOL_SOCKET, SO_REUSEADDR,
&arg,
sizeof(arg))
error_exit("Socket options");
//bind socket to local addr
if (bind(server_fd,
(struct sockaddr
*)&sin,
sizeof(sin))
error_exit("bind");
//listen on this socket
if (listen(server_fd,
MAX_EVENTS) <
error_exit("listen");
return server_fd;
accept_incoming_connection(rio_runtime
*runtime,
rio_worker *worker)
int new_connection_socket;
int flags;
int ret;
unsigned
int client_len;
ev;
temp_client;
cli;
k;
client_len
= sizeof(temp_client);
//accept client connection
new_connection_socket
= accept(runtime->server_fd,
(struct
sockaddr *)
&temp_client,
&client_len);
if (new_connection_socket
//#TODO what to do?
error_exit("Could not accept socket");
//check sockets flags and set non-blocking after
if (-1
== (flags
= fcntl(new_connection_socket,
F_GETFL, 0)))
flags
if (fcntl(new_connection_socket,
F_SETFL, flags
| O_NONBLOCK)
error_exit("Could not set client socket non-blocking");
ev.events
ev.data.fd
= new_connection_socket;
//add socket to epoll
EPOLL_CTL_ADD,
new_connection_socket,
&ev)
error_exit("Could not add conn_sock to epoll");
//store client information
cli.fd
cli.websocket
cli.buffer
h, new_connection_socket
debug_print("New Client: %d\n",
cli.fd);
run_worker(int
id, rio_worker*
worker, rio_runtime
*runtime)
int size_epoll_events;
int rc;
ev, events[MAX_EVENTS];
sprintf(worker->name,
"worker %d", id);
debug_print("Identifying worker as %s pid %d\n",
worker->name,
getpid());
init_clients();
init_dispatcher();
init_static_server();
worker->zmq_context
= zmq_init(1);
worker->master
= zmq_socket(worker->zmq_context,
ZMQ_SUB);
zmq_setsockopt(worker->master,
ZMQ_SUBSCRIBE, "",
strlen(""));
zmq_connect(worker->master,
"ipc:///tmp/rio_master.sock");
//create epoll
worker->epoll_fd
= epoll_create(MAX_EVENTS);
if (worker->epoll_fd
error_exit("epoll_create");
//configure epoll events and file descriptor
EPOLLPRI;
= runtime->server_fd;
//add listen socket to epoll
runtime->server_fd,
error_exit("epoll_ctl: listen_sock");
while (1)
//poll events
size_epoll_events
= epoll_wait(worker->epoll_fd,
events,
MAX_EVENTS,
100);
if (size_epoll_events
== -1
for (int
n = 0;
n < size_epoll_events;
++n)
//if event fd == server fd -> accept new connection
(events[n].data.fd
== runtime->server_fd)
accept_incoming_connection(runtime,
worker);
else { //handle in out readyness :)
//retrieve client info by fd and handle event
k
= kh_get(clients,
h, events[n].data.fd);
cli
= kh_val(h,
k);
handle_http(worker,
events[n],
&cli);
//dispatch responses
dispatch_responses(worker);
//look for master messages
zmq_msg_t
msg;
zmq_msg_init(&msg);
rc =
zmq_recv(worker->master,
&msg,
ZMQ_NOBLOCK);
if (rc
debug_print("Worker %d Received %s from master\n",
id,
(char
*) zmq_msg_data(&msg));
if (strcmp((char
*) zmq_msg_data(&msg),
"terminate") ==
zmq_msg_close(&msg);
zmq_msg_close(&msg);
debug_print("\nWorker terminating gracefully\n",
rc =
zmq_close(worker->master);
debug_print("Worker ZMQ Socket close return %d\n",
rc);
zmq_term(worker->zmq_context);
debug_print("Worker ZMQ Context termination return :%d\n",
free_clients();
destroy_static_server();
destroy_dispatcher();
close(worker->epoll_fd);