在上一篇博客中我使用grpc实现了一个同步的回射服务器,点击查看
废话不多说,直接上代码
echo_async_sample_server.cc
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/echosample.grpc.pb.h"
#else
#include "echo_sample.grpc.pb.h"
#endif
using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerCompletionQueue;
using grpc::Status;
using echosample::EchoRequest;
using echosample::EchoReply;
using echosample::Reflecter;
class EchoAnsyServiceImpl final {
public:
~EchoAnsyServiceImpl() {
for(int i=0;i<10;++i) {
thread_pool_[i].join();
}
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void Start() {
for(int i=0;i<10;++i) {
thread_pool_.push_back(std::thread(&EchoAnsyServiceImpl::RunInThread,this,i));
}
}
void RunInThread(int i) {
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
std::cout<<thread_pool_[i].get_id()<<std::endl;
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
mtx_.lock();
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
mtx_.unlock();
static_cast<CallData*>(tag)->Proceed();
}
}
// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
Start();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Reflecter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
//std::cout<<"11111"<<std::endl;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestRefect(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
reply_.set_reply_msg(request_.request_msg());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
new CallData(service_, cq_);
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Reflecter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
EchoRequest request_;
// What we send back to the client.
EchoReply reply_;
// The means to get back to the client.
ServerAsyncResponseWriter<EchoReply> responder_;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_; // The current serving state.
};
void HandleRpcs() {
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
std::cout<<"22222"<<std::endl;
GPR_ASSERT(cq_->Next(&tag, &ok));
std::cout<<"333333"<<std::endl;
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Reflecter::AsyncService service_;
std::unique_ptr<Server> server_;
std::vector<std::thread> thread_pool_;
std::mutex mtx_;
};
// Logic and data behind the server's behavior.
class GreeterServiceImpl final : public Reflecter::Service {
Status Refect(ServerContext* context, const EchoRequest* request,
EchoReply* reply) override {
reply->set_reply_msg(request->request_msg());
return Status::OK;
}
};
void RunServer() {
std::string server_address("0.0.0.0:50051");
GreeterServiceImpl service;
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
EchoAnsyServiceImpl server;
server.Run();
return 0;
}
echo_async_sample_client.cc的代码实现
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#ifdef BAZEL_BUILD
#include "examples/protos/echo_sample.grpc.pb.h"
#else
#include "echo_sample.grpc.pb.h"
#endif
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using echosample::EchoRequest;
using echosample::EchoReply;
using echosample::Reflecter;
class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Reflecter::NewStub(channel)) {}
// Assembles the client's payload, sends it and presents the response back
// from the server.
std::string Refect(const std::string& msg) {
// Data we are sending to the server.
EchoRequest request;
request.set_request_msg(msg);
// Container for the data we expect from the server.
EchoReply reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// The actual RPC.
Status status = stub_->Refect(&context, request, &reply);
// Act upon its status.
if (status.ok()) {
return reply.reply_msg();
} else {
std::cout << status.error_code() << ": " << status.error_message()
<< std::endl;
return "RPC failed";
}
}
private:
std::unique_ptr<Reflecter::Stub> stub_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the actual RPCs
// are created. This channel models a connection to an endpoint (in this case,
// localhost at port 50051). We indicate that the channel isn't authenticated
// (use of InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
char sendline[4096] = {0};
while(fgets(sendline, 4096, stdin) != NULL) {
std::string reply = greeter.Refect(sendline);
std::cout << "Greeter received: " << reply << std::endl;
memset(sendline,0x00,4096);
}
return 0;
}
proto文件在我上篇同步echo demo中获取
Makefile文件
#
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
HOST_SYSTEM = $(shell uname | cut -f 1 -d_)
SYSTEM ?= $(HOST_SYSTEM)
CXX = g++
CPPFLAGS += `pkg-config --cflags protobuf grpc`
CXXFLAGS += -std=c++11
ifeq ($(SYSTEM),Darwin)
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\
-pthread\
-lgrpc++_reflection\
-ldl
else
LDFLAGS += -L/usr/local/lib `pkg-config --libs protobuf grpc++`\
-pthread\
-Wl,--no-as-needed -lgrpc++_reflection -Wl,--as-needed\
-ldl
endif
PROTOC = protoc
GRPC_CPP_PLUGIN = grpc_cpp_plugin
GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)`
PROTOS_PATH = ../../protos
vpath %.proto $(PROTOS_PATH)
all: system-check echo_sample_client echo_sample_server echo_async_sample_server echo_async_sample_client
echo_sample_client: echo_sample.pb.o echo_sample.grpc.pb.o echo_sample_client.o
$(CXX) $^ $(LDFLAGS) -o [email protected]
echo_sample_server: echo_sample.pb.o echo_sample.grpc.pb.o echo_sample_server.o
$(CXX) $^ $(LDFLAGS) -o [email protected]
echo_async_sample_server: echo_sample.pb.o echo_sample.grpc.pb.o echo_async_sample_server.o
$(CXX) $^ $(LDFLAGS) -o [email protected]
echo_async_sample_client: echo_sample.pb.o echo_sample.grpc.pb.o echo_async_sample_client.o
$(CXX) $^ $(LDFLAGS) -o [email protected]
.PRECIOUS: %.grpc.pb.cc
%.grpc.pb.cc: %.proto
$(PROTOC) -I $(PROTOS_PATH) --grpc_out=. --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $<
.PRECIOUS: %.pb.cc
%.pb.cc: %.proto
$(PROTOC) -I $(PROTOS_PATH) --cpp_out=. $<
clean:
rm -f *.o *.pb.cc *.pb.h echo_sample_client echo_sample_server echo_async_sample_server echo_async_sample_client
# The following is to test your system and ensure a smoother experience.
# They are by no means necessary to actually compile a grpc-enabled software.
PROTOC_CMD = which $(PROTOC)
PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3
PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN)
HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false)
ifeq ($(HAS_PROTOC),true)
HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false)
endif
HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false)
SYSTEM_OK = false
ifeq ($(HAS_VALID_PROTOC),true)
ifeq ($(HAS_PLUGIN),true)
SYSTEM_OK = true
endif
endif
system-check:
ifneq ($(HAS_VALID_PROTOC),true)
@echo " DEPENDENCY ERROR"
@echo
@echo "You don't have protoc 3.0.0 installed in your path."
@echo "Please install Google protocol buffers 3.0.0 and its compiler."
@echo "You can find it here:"
@echo
@echo " https://github.com/google/protobuf/releases/tag/v3.0.0"
@echo
@echo "Here is what I get when trying to evaluate your version of protoc:"
@echo
-$(PROTOC) --version
@echo
@echo
endif
ifneq ($(HAS_PLUGIN),true)
@echo " DEPENDENCY ERROR"
@echo
@echo "You don't have the grpc c++ protobuf plugin installed in your path."
@echo "Please install grpc. You can find it here:"
@echo
@echo " https://github.com/grpc/grpc"
@echo
@echo "Here is what I get when trying to detect if you have the plugin:"
@echo
-which $(GRPC_CPP_PLUGIN)
@echo
@echo
endif
ifneq ($(SYSTEM_OK),true)
@false
endif
服务端关键点
- 如果是异步的话,个人猜测服务端和客户端grpc维护了至少两个消息队列,一个收,一个发
- RequestRefect 从队列中获取请求消息体
- responder_.Finish(reply_, Status::OK, this) 将应答的放入队列,我们不可见
客户端关键点
- PrepareAsyncRefect 函数负责把要发送的消息放入发送队列
- rpc->StartCall() 开始等待服务端消息到达
- rpc->Finish(&reply, &status, (void*)1) 消息到达
- 对状态进行判断并取出应答的消息体
总结
其实异步说白了就是对消息队列的处理,不管客户端还是服务端收到消息和发送都是从队列中取或者放