天天看点

C++ grpc实现异步回射服务器

在上一篇博客中我使用grpc实现了一个同步的回射服务器,点击查看

废话不多说,直接上代码
echo_async_sample_server.cc
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>

#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/echosample.grpc.pb.h"
#else
#include "echo_sample.grpc.pb.h"
#endif

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using echosample::EchoRequest;
using echosample::EchoReply;
using echosample::Reflecter;



class EchoAnsyServiceImpl final {
 public:
	~EchoAnsyServiceImpl() {
	  for(int i=0;i<10;++i) {
		 thread_pool_[i].join(); 
	  }
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }
  
  void Start() {
	  for(int i=0;i<10;++i) {
		 thread_pool_.push_back(std::thread(&EchoAnsyServiceImpl::RunInThread,this,i));
	  }
  }
  
  void RunInThread(int i) {
     new CallData(&service_, cq_.get());	  
	 void* tag;  // uniquely identifies a request.
     bool ok;
     while (true) {
	   
	   std::cout<<thread_pool_[i].get_id()<<std::endl;
       // Block waiting to read the next event from the completion queue. The
       // event is uniquely identified by its tag, which in this case is the
       // memory address of a CallData instance.
       // The return value of Next should always be checked. This return value
       // tells us whether there is any kind of event or cq_ is shutting down.
	   mtx_.lock();
       GPR_ASSERT(cq_->Next(&tag, &ok));
       GPR_ASSERT(ok);
	   mtx_.unlock();
       static_cast<CallData*>(tag)->Proceed();
     }  
  }
  
  // There is no shutdown handling in this code.
  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService(&service_);
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    Start();
  }
  
 private:
  // Class encompasing the state and logic needed to serve a request.
  class CallData {
   public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(Reflecter::AsyncService* service, ServerCompletionQueue* cq)
        : service_(service), cq_(cq), responder_(&ctx_) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed() {
		if (status_ == CREATE) {
        // Make this instance progress to the PROCESS state.
        status_ = PROCESS;

	    //std::cout<<"11111"<<std::endl;
        // As part of the initial CREATE state, we *request* that the system
        // start processing SayHello requests. In this request, "this" acts are
        // the tag uniquely identifying the request (so that different CallData
        // instances can serve different requests concurrently), in this case
        // the memory address of this CallData instance.
        service_->RequestRefect(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
      } else if (status_ == PROCESS) {
        // Spawn a new CallData instance to serve new clients while we process
        // the one for this CallData. The instance will deallocate itself as
        // part of its FINISH state.
		
        reply_.set_reply_msg(request_.request_msg());

        // And we are done! Let the gRPC runtime know we've finished, using the
        // memory address of this instance as the uniquely identifying tag for
        // the event.
        status_ = FINISH;
        responder_.Finish(reply_, Status::OK, this);
		new CallData(service_, cq_);
      } else {
        GPR_ASSERT(status_ == FINISH);
        // Once in the FINISH state, deallocate ourselves (CallData).
        delete this;
      }
    }

   private:
    // The means of communication with the gRPC runtime for an asynchronous
    // server.
    Reflecter::AsyncService* service_;
    // The producer-consumer queue where for asynchronous server notifications.
    ServerCompletionQueue* cq_;
    // Context for the rpc, allowing to tweak aspects of it such as the use
    // of compression, authentication, as well as to send metadata back to the
    // client.
    ServerContext ctx_;

    // What we get from the client.
    EchoRequest request_;
    // What we send back to the client.
    EchoReply reply_;

    // The means to get back to the client.
    ServerAsyncResponseWriter<EchoReply> responder_;
	
	enum CallStatus { CREATE, PROCESS, FINISH };
    CallStatus status_;  // The current serving state.
  };
   void HandleRpcs() {
     new CallData(&service_, cq_.get());
     void* tag;  // uniquely identifies a request.
     bool ok;
     while (true) {
       // Block waiting to read the next event from the completion queue. The
       // event is uniquely identified by its tag, which in this case is the
       // memory address of a CallData instance.
       // The return value of Next should always be checked. This return value
       // tells us whether there is any kind of event or cq_ is shutting down.
	   
	   std::cout<<"22222"<<std::endl;
       GPR_ASSERT(cq_->Next(&tag, &ok));
	   std::cout<<"333333"<<std::endl;
       GPR_ASSERT(ok);
       static_cast<CallData*>(tag)->Proceed();
     }
   }
   std::unique_ptr<ServerCompletionQueue> cq_;
   Reflecter::AsyncService service_;
   std::unique_ptr<Server> server_;
   std::vector<std::thread> thread_pool_;
   std::mutex  mtx_;
};

// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Reflecter::Service {
  Status Refect(ServerContext* context, const EchoRequest* request,
                  EchoReply* reply) override {
    reply->set_reply_msg(request->request_msg());
	
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  ServerBuilder builder;
  // Listen on the given address without any authentication mechanism.
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  // Register "service" as the instance through which we'll communicate with
  // clients. In this case it corresponds to an *synchronous* service.
  builder.RegisterService(&service);
  // Finally assemble the server.
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;

  // Wait for the server to shutdown. Note that some other thread must be
  // responsible for shutting down the server for this call to ever return.
  server->Wait();
}

int main(int argc, char** argv) {
  EchoAnsyServiceImpl server;
  server.Run();
  return 0;
}

           
echo_async_sample_client.cc的代码实现
#include <iostream>
#include <memory>
#include <string>

#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/echo_sample.grpc.pb.h"
#else
#include "echo_sample.grpc.pb.h"
#endif

using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using echosample::EchoRequest;
using echosample::EchoReply;
using echosample::Reflecter;


class GreeterClient {
 public:
  GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Reflecter::NewStub(channel)) {}

  // Assembles the client's payload, sends it and presents the response back
  // from the server.
  std::string Refect(const std::string& msg) {
    // Data we are sending to the server.
    EchoRequest request;
    request.set_request_msg(msg);

    // Container for the data we expect from the server.
    EchoReply reply;

    // Context for the client. It could be used to convey extra information to
    // the server and/or tweak certain RPC behaviors.
    ClientContext context;

    // The actual RPC.
    Status status = stub_->Refect(&context, request, &reply);

    // Act upon its status.
    if (status.ok()) {
      return reply.reply_msg();
    } else {
      std::cout << status.error_code() << ": " << status.error_message()
                << std::endl;
      return "RPC failed";
    }
  }

 private:
  std::unique_ptr<Reflecter::Stub> stub_;
};

int main(int argc, char** argv) {
  // Instantiate the client. It requires a channel, out of which the actual RPCs
  // are created. This channel models a connection to an endpoint (in this case,
  // localhost at port 50051). We indicate that the channel isn't authenticated
  // (use of InsecureChannelCredentials()).
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));
  char sendline[4096] = {0};
  while(fgets(sendline, 4096, stdin) != NULL) {
    std::string reply = greeter.Refect(sendline);
    std::cout << "Greeter received: " << reply << std::endl;
    memset(sendline,0x00,4096);
  }
  return 0;
}

           
proto文件在我上篇同步echo demo中获取
Makefile文件
#
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

HOST_SYSTEM = $(shell uname | cut -f 1 -d_)
SYSTEM ?= $(HOST_SYSTEM)
CXX = g++
CPPFLAGS += `pkg-config --cflags protobuf grpc`
CXXFLAGS += -std=c++11
ifeq ($(SYSTEM),Darwin)
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\
           -pthread\
           -lgrpc++_reflection\
           -ldl
else
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\
           -pthread\
           -Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed\
           -ldl
endif
PROTOC = protoc
GRPC_CPP_PLUGIN = grpc_cpp_plugin
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`

PROTOS_PATH = ../../protos

vpath %.proto $(PROTOS_PATH)

all: system-check echo_sample_client echo_sample_server echo_async_sample_server  echo_async_sample_client

echo_sample_client: echo_sample.pb.o echo_sample.grpc.pb.o echo_sample_client.o
	$(CXX) $^ $(LDFLAGS) -o [email protected]

echo_sample_server: echo_sample.pb.o echo_sample.grpc.pb.o echo_sample_server.o
	$(CXX) $^ $(LDFLAGS) -o [email protected]

echo_async_sample_server: echo_sample.pb.o echo_sample.grpc.pb.o echo_async_sample_server.o
	$(CXX) $^ $(LDFLAGS) -o [email protected]
	
echo_async_sample_client: echo_sample.pb.o echo_sample.grpc.pb.o echo_async_sample_client.o
	$(CXX) $^ $(LDFLAGS) -o [email protected]


.PRECIOUS: %.grpc.pb.cc
%.grpc.pb.cc: %.proto
	$(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $<

.PRECIOUS: %.pb.cc
%.pb.cc: %.proto
	$(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $<

clean:
	rm -f *.o *.pb.cc *.pb.h echo_sample_client echo_sample_server echo_async_sample_server echo_async_sample_client


# The following is to test your system and ensure a smoother experience.
# They are by no means necessary to actually compile a grpc-enabled software.

PROTOC_CMD = which $(PROTOC)
PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3
PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN)
HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false)
ifeq ($(HAS_PROTOC),true)
HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false)
endif
HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false)

SYSTEM_OK = false
ifeq ($(HAS_VALID_PROTOC),true)
ifeq ($(HAS_PLUGIN),true)
SYSTEM_OK = true
endif
endif

system-check:
ifneq ($(HAS_VALID_PROTOC),true)
	@echo " DEPENDENCY ERROR"
	@echo
	@echo "You don't have protoc 3.0.0 installed in your path."
	@echo "Please install Google protocol buffers 3.0.0 and its compiler."
	@echo "You can find it here:"
	@echo
	@echo "   https://github.com/google/protobuf/releases/tag/v3.0.0"
	@echo
	@echo "Here is what I get when trying to evaluate your version of protoc:"
	@echo
	-$(PROTOC) --version
	@echo
	@echo
endif
ifneq ($(HAS_PLUGIN),true)
	@echo " DEPENDENCY ERROR"
	@echo
	@echo "You don't have the grpc c++ protobuf plugin installed in your path."
	@echo "Please install grpc. You can find it here:"
	@echo
	@echo "   https://github.com/grpc/grpc"
	@echo
	@echo "Here is what I get when trying to detect if you have the plugin:"
	@echo
	-which $(GRPC_CPP_PLUGIN)
	@echo
	@echo
endif
ifneq ($(SYSTEM_OK),true)
	@false
endif
           

服务端关键点

  • 如果是异步的话,个人猜测服务端和客户端grpc维护了至少两个消息队列,一个收,一个发
  • RequestRefect 从队列中获取请求消息体
  • responder_.Finish(reply_, Status::OK, this) 将应答的放入队列,我们不可见

客户端关键点

  • PrepareAsyncRefect 函数负责把要发送的消息放入发送队列
  • rpc->StartCall() 开始等待服务端消息到达
  • rpc->Finish(&reply, &status, (void*)1) 消息到达
  • 对状态进行判断并取出应答的消息体

总结

其实异步说白了就是对消息队列的处理,不管客户端还是服务端收到消息和发送都是从队列中取或者放