天天看點

gRPC 異步雙向流伺服器代碼例子

gRPC Cpp 例子

官方例子: https://github.com/grpc/grpc/tree/master/examples/cpp

特别是 https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_async_server.cc ,給出的是一進制異步伺服器代碼例子

實際上,官方沒有給出異步雙向流伺服器代碼例子

是以,必須自己探索下

異步雙向流伺服器代碼例子

異步雙向流, Async bidirectional streaming for gRPC C++

這裡直接上代碼:

/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include <grpc/support/log.h>
#include <grpcpp/grpcpp.h>

#ifdef BAZEL_BUILD
#include "examples/protos/helloworld.grpc.pb.h"
#else
#include "helloworld.grpc.pb.h"
#endif

using grpc::Server;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;
using rpc::Greeter;
using rpc::HelloReply;
using rpc::HelloRequest;

class ServerImpl final
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        // Always shutdown the completion queue after the server.
        cq_->Shutdown();
    }

    // There is no shutdown handling in this code.
    void Run()
    {
        std::string server_address("0.0.0.0:12121");

        ServerBuilder builder;
        // Listen on the given address without any authentication mechanism.
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        // Register "service_" as the instance through which we'll communicate with
        // clients. In this case it corresponds to an *asynchronous* service.
        builder.RegisterService(&service_);
        // Get hold of the completion queue used for the asynchronous communication
        // with the gRPC runtime.
        cq_ = builder.AddCompletionQueue();
        // Finally assemble the server.
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        // Proceed to the server's main loop.
        HandleRpcs();
    }

private:
    // Class encompasing the state and logic needed to serve a request.
    class CallData
    {
    public:
        // Take in the "service" instance (in this case representing an asynchronous
        // server) and the completion queue "cq" used for asynchronous communication
        // with the gRPC runtime.
        CallData(Greeter::AsyncService *service, ServerCompletionQueue *cq)
            : service_(service), cq_(cq), stream_(&ctx_), status_(CREATE)
        {
            // Invoke the serving logic right away.
            Proceed();
        }

        void Proceed()
        {
            if (status_ == CREATE)
            {
                // Make this instance progress to the PROCESS state.
                status_ = INIT_READ;

                // As part of the initial CREATE state, we *request* that the system
                // start processing SayHello requests. In this request, "this" acts are
                // the tag uniquely identifying the request (so that different CallData
                // instances can serve different requests concurrently), in this case
                // the memory address of this CallData instance.
                service_->RequestSayHello(&ctx_, &stream_, cq_, cq_, this);
            }
            else if (status_ == INIT_READ)
            {
                // Spawn a new CallData instance to serve new clients while we process
                // the one for this CallData. The instance will deallocate itself as
                // part of its FINISH state.
                new CallData(service_, cq_);

                stream_.Read(&request_, this);
                status_ = WRITE;
            }
            else if (status_ == READ)
            {
                stream_.Read(&request_, this);
                status_ = WRITE;
            }
            else if (status_ == WRITE)
            {
                HelloReply rsp;
                rsp.set_message(request_.name());
                stream_.Write(rsp, this);

                status_ = READ;
            }
            else
            {
                // GPR_ASSERT(status_ == FINISH);
                // Once in the FINISH state, deallocate ourselves (CallData).

                stream_.Finish(Status::OK, this);

                delete this;
            }
        }

    private:
        // The means of communication with the gRPC runtime for an asynchronous
        // server.
        Greeter::AsyncService *service_;
        // The producer-consumer queue where for asynchronous server notifications.
        ServerCompletionQueue *cq_;
        // Context for the rpc, allowing to tweak aspects of it such as the use
        // of compression, authentication, as well as to send metadata back to the
        // client.
        ServerContext ctx_;

        // What we get from the client.
        HelloRequest request_;

        // The means to get back to the client.
        ServerAsyncReaderWriter<HelloReply, HelloRequest> stream_;

        // Let's implement a tiny state machine with the following states.
        enum CallStatus
        {
            CREATE,
            INIT_READ,
            READ,
            WRITE,
            FINISH
        };
        CallStatus status_; // The current serving state.
    };

    // This can be run in multiple threads if needed.
    void HandleRpcs()
    {
        // Spawn a new CallData instance to serve new clients.
        new CallData(&service_, cq_.get());
        void *tag; // uniquely identifies a request.
        bool ok;
        while (true)
        {
            // Block waiting to read the next event from the completion queue. The
            // event is uniquely identified by its tag, which in this case is the
            // memory address of a CallData instance.
            // The return value of Next should always be checked. This return value
            // tells us whether there is any kind of event or cq_ is shutting down.
            GPR_ASSERT(cq_->Next(&tag, &ok));
            if (ok)
            {
                static_cast<CallData *>(tag)->Proceed();
            }
        }
    }

    std::unique_ptr<ServerCompletionQueue> cq_;
    Greeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};

int main(int argc, char **argv)
{
    ServerImpl server;
    server.Run();

    return 0;
}
           

注意事項

  • 編譯 gRPC Cpp , CMake 時,必須指定 -DCMAKE_BUILD_TYPE=Release,編譯 Realse 版