開始前必讀:基于grpc從零開始搭建一個準生産分布式應用(0) - quickStart
本章開始大概會有三篇文章來詳細講解下grpc原生和springGrpc框的使用。并不會所有方面全講解,隻講一下實際項目中會用到的一些重要内容。PS:為了友善了解,筆者會重寫一些DEMO例子(本文檔中的例子與之前的代碼無關),後續開始第5篇對象傳輸和mybastis時會再優化。
本章代碼任務:以一個例子掌握原生GRPC的用法。
一、proto定義
syntax = "proto3";
package ecommerce;
service ProductInfo {
rpc addProduct(Product) returns (ProductID);
rpc getProduct(ProductID) returns (Product);
}
message Product {
string id = 1;
string name = 2;
string description = 3;
float price = 4;
}
message ProductID {
string value = 1;
}
二、java實作
2.1、server端實作
public class ProductInfoImpl extends ProductInfoGrpc.ProductInfoImplBase {
private Map productMap = new HashMap<String, ProductInfoOuterClass.Product>();
@Override
public void addProduct(ProductInfoOuterClass.Product request,
io.grpc.stub.StreamObserver<ProductInfoOuterClass.ProductID> responseObserver) {
UUID uuid = UUID.randomUUID();
String randomUUIDString = uuid.toString();
request = request.toBuilder().setId(randomUUIDString).build();
productMap.put(randomUUIDString, request);
ProductInfoOuterClass.ProductID id
= ProductInfoOuterClass.ProductID.newBuilder().setValue(randomUUIDString).build();
responseObserver.onNext(id);
responseObserver.onCompleted();
}
@Override
public void getProduct(ProductInfoOuterClass.ProductID request,
io.grpc.stub.StreamObserver<ProductInfoOuterClass.Product> responseObserver) {
String id = request.getValue();
if (productMap.containsKey(id)) {
responseObserver.onNext((ProductInfoOuterClass.Product) productMap.get(id));
responseObserver.onCompleted();
} else {
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
}
}
2.2、client端實作
public class ProductInfoClient {
private static final Logger logger = Logger.getLogger(ProductInfoClient.class.getName());
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
ProductInfoGrpc.ProductInfoBlockingStub stub = ProductInfoGrpc.newBlockingStub(channel);
ProductInfoOuterClass.ProductID productID = stub.addProduct(
ProductInfoOuterClass.Product.newBuilder()
.setName("Samsung S10")
.setDescription("Samsung Galaxy S10 is the latest smart phone, " +
"launched in February 2019")
.setPrice(700.0f)
.build());
logger.info("Product ID: " + productID.getValue() + " added successfully.");
ProductInfoOuterClass.Product product = stub.getProduct(productID);
logger.info("Product: " + product.toString());
channel.shutdown();
}
}
2.3、測試程式
public class ProductInfoServer {
private static final Logger logger = Logger.getLogger(ProductInfoServer.class.getName());
private Server server;
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new ProductInfoImpl())//注冊服務
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
logger.info("*** shutting down gRPC server since JVM is shutting down");
ProductInfoServer.this.stop();
logger.info("*** server shut down");
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final ProductInfoServer server = new ProductInfoServer();
server.start();
server.blockUntilShutdown();
}
}
三、python實作
3.1、server端實作
from concurrent import futures
import logging
import uuid
import grpc
import time
import product_info_pb2
import product_info_pb2_grpc
class ProductInfoServicer(product_info_pb2_grpc.ProductInfoServicer):
def __init__(self):
self.productMap = {}
def addProduct(self, request, context):
id = uuid.uuid1()
request.id = str(id)
print("addProduct:request", request)
self.productMap[str(id)] = request
response = product_info_pb2.ProductID(value = str(id))
print("addProduct:response", response)
return response
def getProduct(self, request, context):
print("getProduct:request", request)
id = request.value
response = self.productMap[str(id)]
print("getProduct:response", response)
return response
# create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# use the generated function `add_CalculatorServicer_to_server`
# to add the defined class to the server
product_info_pb2_grpc.add_ProductInfoServicer_to_server(
ProductInfoServicer(), server)
# listen on port 50051
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# since server.start() will not block,
# a sleep-loop is added to keep alive
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
3.2、client端實作
import grpc
import product_info_pb2
import product_info_pb2_grpc
import time;
def run():
# open a gRPC channel
channel = grpc.insecure_channel('localhost:50051')
# create a stub (client)
stub = product_info_pb2_grpc.ProductInfoStub(channel)
response = stub.addProduct(product_info_pb2.Product(name = "Apple iPhone 11", description = "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode.", price = 699.0 ))
print("add product: response", response)
productInfo = stub.getProduct(product_info_pb2.ProductID(value = response.value))
print("get product: response", productInfo)
run()