天天看點

Google protocol buffer(4)—Google protocol buffer + socket實作message的連續傳輸

背景介紹:

  • 将連續到達的HTTP包資訊(URL,Cookie,Referer,Source_ip etc)通過Google protocol buffer + socket發送至server 端處理;
  • 要求向後相容:可以增加filed;

http. proto檔案:即message的定義:

message http_packet
{
	optional string url=1;
	optional string cookie=2;
	optional string src_ip=3;
	optional string dst_ip=4;
	optional string cont_type=5;
	optional string cont_length=6;
	optional string location=7;
	optional string referer=8;
	optional int64  pid=9;
	repeated string other_field=10;
}
           

client.cc

#include 
   
    
#include "http_packet.pb.h"
#include 
    
     
#include
     
      
#include
      
       
#include
       
         #include
        
          #include
         
           #include
          
            #include
           
             #include 
            
              #include 
             
               #include 
              
                #include 
               
                 #include 
                
                  #include "http_session_format_info.h" using namespace google::protobuf::io; using namespace std; int protobuf_send(http_session_format_info* http_info) { /* Coded output stram */ http_packet a_http; a_http.set_url(http_info->url); a_http.set_cookie(http_info->cookie); a_http.set_src_ip(http_info->src_ip); a_http.set_dst_ip(http_info->dst_ip); a_http.set_cont_type(http_info->content_type); a_http.set_cont_length(http_info->content_length); a_http.set_location(http_info->location); a_http.set_referer(http_info->referer); a_http.set_pid(http_info->pid); cout<<"size after serilizing is "<
                 
                  <
                  
                   WriteVarint32(a_http.ByteSize()); a_http.SerializeToCodedStream(coded_output); int host_port= 1101; char* host_name="10.0.6.227"; struct sockaddr_in my_addr; int bytecount; int hsock; int * p_int; int err; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1) { printf("Error initializing socket %d\n",errno); close(hsock); return 0; } //setting socket; p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ) { printf("Error setting options %d\n",errno); free(p_int); close(hsock); return 0 ; } free(p_int); //setting ok; my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = inet_addr(host_name); if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ) { if((err = errno) != EINPROGRESS) { fprintf(stderr, "Error connecting socket %d\n", errno); close(hsock); return 0; } } //send (); if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %d\n", errno); } else { printf("Sent bytes %d\n", bytecount); } close(hsock); usleep(1); delete pkt; return 0; } 
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
           

server.cc

//author:LUO Pan 
//date:2015/3/27
//version:0

#include 
   
    
#include 
    
     
#include 
     
      
#include 
      
       
#include 
       
         #include 
        
          #include 
         
           #include 
          
            #include 
           
             #include 
            
              #include 
             
               #include "http_packet.pb.h" #include 
              
                #include 
               
                 #include 
                
                  #include "server.h" using namespace std; using namespace google::protobuf::io; void* SocketHandler(void*); void* recv_thread(void*); int main(int argv, char** argc){ int host_port= 1101; struct sockaddr_in my_addr; int hsock; socklen_t addr_size = 0; pthread_t thread_id=0; sockaddr_in sadr; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1) { printf("Error initializing socket %d\n", errno); return 0; } //set socket; int* p_int; p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %d\n", errno); free(p_int); return 0; } free(p_int); //setting ok; my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = INADDR_ANY ; if( bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ) { fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); return 0; } if(listen(hsock, 10) == -1 ) { fprintf(stderr, "Error listening %d\n",errno); return 0; } acsock_argv arg; acsock_argv* p_arg=&arg; p_arg->hsock=hsock; p_arg->sadr=sadr; p_arg->addr_size=addr_size; //Now lets do the server stuff addr_size = sizeof(sockaddr_in); pthread_create(&thread_id,0,&recv_thread,(void*)p_arg); pthread_join(thread_id,NULL); /* while(true) { printf("waiting for a connection\n"); csock = (int*)malloc(sizeof(int)); if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1) { printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); pthread_create(&thread_id,0,&SocketHandler, (void*)csock ); pthread_detach(thread_id); } else{ fprintf(stderr, "Error accepting %d\n", errno); } } */ return 1 ; //oops } void* recv_thread(void* p_argv) { acsock_argv* p_arg=(acsock_argv*) (p_argv); int hsock=p_arg->hsock; sockaddr_in sadr=p_arg->sadr; int* csock; socklen_t addr_size=p_arg->addr_size; pthread_t thread_id=0; while(true) { printf("waiting for a connection\n"); csock = (int*)malloc(sizeof(int)); if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1) { printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); pthread_create(&thread_id,0,&SocketHandler, (void*)csock ); pthread_detach(thread_id); } else { fprintf(stderr, "Error accepting %d\n", errno); } } return NULL; } google::protobuf::uint32 readHdr(char *buf) { google::protobuf::uint32 size; google::protobuf::io::ArrayInputStream ais(buf,4); CodedInputStream coded_input(&ais); coded_input.ReadVarint32(&size);//Decode the HDR and get the size cout<<"size of payload is "<
                 
                  <
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
           

繼續閱讀