一、gRPC介绍
gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。
gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
各个进程之间可以通过gRPC相互调用,如下图:

二、四种接口类型的服务端实现(Java)
1.标准grpc接口
1 2 3 4 5 6 7 8 9 10 11
| @Override public void queryById(QueryByIdRequest request, StreamObserver<QueryByIdResponse> response) { response.onNext(QueryByIdResponse.newBuilder() .setUser(User.newBuilder() .setId(request.getId()) .setName("测试用户" + request.getId()) .build()) .build()); response.onCompleted(); }
|
2.服务端流式接口
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void queryByIdList(QueryByIdListRequest request, StreamObserver<QueryByIdListResponse> response) { List<Integer> requestIdList = request.getIdList(); requestIdList.forEach( id -> response.onNext(QueryByIdListResponse.newBuilder() .setUser(User.newBuilder() .setId(id) .setName("测试用户" + id) .build()) .build())); response.onCompleted(); }
|
3.客户端流式接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public StreamObserver<BatchInsertUserRequest> batchInsertUser(StreamObserver<BatchInsertUserResponse> response) { AtomicInteger count = new AtomicInteger(0); return new StreamObserver<BatchInsertUserRequest>() {
@Override public void onNext(BatchInsertUserRequest value) { log.info("value : {}", value); count.incrementAndGet(); }
@Override public void onError(Throwable t) { log.info("batchInsertUser throwable", t); }
@Override public void onCompleted() { response.onNext(BatchInsertUserResponse.newBuilder() .setInsertCount(count.get()) .build()); response.onCompleted(); } }; }
|
4.双向流式接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Override public StreamObserver<BatchCheckUserRequest> batchCheckUser(StreamObserver<BatchCheckUserResponse> response) { return new StreamObserver<BatchCheckUserRequest>() {
@Override public void onNext(BatchCheckUserRequest value) { User user = value.getUser(); log.info("batchCheckUser server receive user:{}", JsonUtil.toJson(user)); response.onNext(BatchCheckUserResponse.newBuilder() .setUser(User.newBuilder() .setId(user.getId()) .setName("双向流接口测试对象" + user.getId()) .build()) .build()); }
@Override public void onError(Throwable t) { log.info("batchCheckUser server throwable", t); }
@Override public void onCompleted() { log.info("batchCheckUser server dispose over"); response.onCompleted(); } }; }
|
三、四种接口类型的客户端实现(Java)
客户端是通过服务端的stub来调用对应的服务,服务端的stub是根据 proto 文件生成的。
每个接口都会生成3种类型的stub来供客户端调用。
- stub 支持四种形式的grpc服务。
- futurestub 只支持普通的grpc服务,不支持流的形式。
- blockingstub 支持普通的和服务端流。
1 2 3 4 5 6
| private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceStub gRpcInterfaceTestServiceStub;
private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceBlockingStub gRpcInterfaceTestServiceBlockingStub;
private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceFutureStub gRpcInterfaceTestServiceFutureStub;
|
1.标准grpc接口
1 2 3 4 5 6 7 8
| @Test public void clientQueryById() throws ExecutionException, InterruptedException { QueryByIdRequest request = QueryByIdRequest.newBuilder() .setId(1) .build(); ListenableFuture<QueryByIdResponse> queryByIdResponse = gRpcInterfaceTestServiceFutureStub.queryById(request); log.info("gRpcInterfaceTestServiceBlockingStub.clientQueryById response:{}", JsonUtil.toJson(queryByIdResponse.get().getUser())); }
|
2.服务端流式接口
1 2 3 4 5 6 7 8 9 10
| @Test public void clientQueryByIdList() { QueryByIdListRequest request = QueryByIdListRequest.newBuilder() .addAllId(Lists.newArrayList(1, 2, 3)) .build(); Iterator<QueryByIdListResponse> responseIterator = gRpcInterfaceTestServiceBlockingStub.queryByIdList(request); while (responseIterator.hasNext()) { log.info("gRpcInterfaceTestServiceBlockingStub.clientQueryByIdList response:{}", JsonUtil.toJson(responseIterator.next())); } }
|
3.客户端流式接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Test public void clientBatchInsertUser() throws InterruptedException, ExecutionException { SettableFuture<Integer> countFuture = SettableFuture.create(); final Integer[] returnValue = new Integer[1]; StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse> responseStreamObserver = new StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse>() { @Override public void onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse value) { log.info("server return insert user count :{}", value.getInsertCount()); returnValue[0] = value.getInsertCount(); }
@Override public void onError(Throwable t) { log.warn("batchInsertUser#onError", t); }
@Override public void onCompleted() { countFuture.set(returnValue[0]); log.info("server dispose over"); } }; StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest> requestStreamObserver = gRpcInterfaceTestServiceStub.batchInsertUser(responseStreamObserver); List<GRpcInterfaceTestServiceOuterClass.User> firstBatch = Lists.newArrayList(GRpcInterfaceTestServiceOuterClass.User.newBuilder() .setId(1) .setName("sys1") .build(), GRpcInterfaceTestServiceOuterClass.User.newBuilder() .setId(2) .setName("sys2") .build()); List<GRpcInterfaceTestServiceOuterClass.User> secondBatch = Lists.newArrayList(GRpcInterfaceTestServiceOuterClass.User.newBuilder() .setId(3) .setName("sys3") .build()); requestStreamObserver.onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest.newBuilder() .addAllUser(firstBatch) .build()); requestStreamObserver.onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest.newBuilder() .addAllUser(secondBatch) .build()); requestStreamObserver.onCompleted(); log.info("clientBatchInsertUser batch insert {} user ", countFuture.get()); }
|
4.双向流式接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Test public void batchCheckUser() throws ExecutionException, InterruptedException { SettableFuture<List<User>> future = SettableFuture.create(); List<User> userList = Lists.newArrayList(); StreamObserver<BatchCheckUserRequest> requestStreamObserver = gRpcInterfaceTestServiceStub.batchCheckUser(new StreamObserver<BatchCheckUserResponse>() { @Override public void onNext(BatchCheckUserResponse value) { User user = value.getUser(); log.info("batchCheckUser client receive server dispose User : {}", JsonUtil.toJson(user)); userList.add(user); }
@Override public void onError(Throwable t) { log.info("batchCheckUser onError", t); }
@Override public void onCompleted() { future.set(userList); log.info("batchCheckUser client over"); } }); requestStreamObserver.onNext(BatchCheckUserRequest.newBuilder() .setUser(User.newBuilder() .setId(1) .build()) .build()); requestStreamObserver.onNext(BatchCheckUserRequest.newBuilder() .setUser(User.newBuilder() .setId(2) .build()) .build()); requestStreamObserver.onCompleted(); log.info("batchCheckUser client receive server return userList : {}", JsonUtil.toJson(future.get())); }
|
5.小结
客户端流接口和双向流接口其实很类似,区别在于根据接口类型来控制,response.onNext 方法的调用次数和调用场景。
客户端流接口的 response.onNext 在服务端只能调用一次,因此我们通常是在 request.onCompleted 方法里调用。而在双向流接口里request.onNext 方法可以调用多次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public StreamObserver<BatchInsertUserRequest> batchInsertUser(StreamObserver<BatchInsertUserResponse> response) { AtomicInteger count = new AtomicInteger(0); return new StreamObserver<BatchInsertUserRequest>() {
@Override public void onNext(BatchInsertUserRequest value) { log.info("value : {}", value); count.addAndGet(value.getUserList().size()); }
@Override public void onError(Throwable t) { log.info("batchInsertUser throwable", t); }
@Override public void onCompleted() { response.onNext(BatchInsertUserResponse.newBuilder() .setInsertCount(count.get()) .build()); response.onCompleted(); } }; }
|
四、gprcurl 工具使用
1.查询服务列表
通过-plaintext参数忽略tls证书的验证过程
1
| grpcurl -plaintext localhost:8506 list
|
1
| com.lwj.item.proto.GRpcInterfaceTestService
|
2.查询服务的方法列表
1
| grpcurl -plaintext localhost:8506 list com.lwj.item.proto.GRpcInterfaceTestService
|
1 2 3 4
| com.lwj.item.proto.GRpcInterfaceTestService.batchCheckUser com.lwj.item.proto.GRpcInterfaceTestService.batchInsertUser com.lwj.item.proto.GRpcInterfaceTestService.queryById com.lwj.item.proto.GRpcInterfaceTestService.queryByIdList
|
3.查询服务的详细的方法列表
1
| grpcurl -plaintext localhost:8506 describe com.lwj.item.proto.GRpcInterfaceTestService
|
1 2 3 4 5 6 7
| com.lwj.item.proto.GRpcInterfaceTestService is a service: service GRpcInterfaceTestService { rpc batchCheckUser ( stream .com.lwj.item.proto.BatchCheckUserRequest ) returns ( stream .com.lwj.item.proto.BatchCheckUserResponse ); rpc batchInsertUser ( stream .com.lwj.item.proto.BatchInsertUserRequest ) returns ( .com.lwj.item.proto.BatchInsertUserResponse ); rpc queryById ( .com.lwj.item.proto.QueryByIdRequest ) returns ( .com.lwj.item.proto.QueryByIdResponse ); rpc queryByIdList ( .com.lwj.item.proto.QueryByIdListRequest ) returns ( stream .com.lwj.item.proto.QueryByIdListResponse ); }
|
4.查询类型信息
1
| grpcurl -plaintext localhost:8506 describe com.lwj.item.proto.QueryByIdRequest
|
1 2 3 4
| com.lwj.item.proto.QueryByIdRequest is a message: message QueryByIdRequest { int32 id = 1; }
|
5.调用方法
在获取gRPC服务的详细信息之后就可以json调用gRPC方法了。
下面命令通过-d
参数传入一个json字符串作为输入参数,调用的是GRpcInterfaceTestService服务的queryById方法:
1
| grpcurl -plaintext -d '{"id": 1}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryById
|
1 2 3 4 5 6
| { "user": { "id": 1, "name": "测试用户1" } }
|
这种方式不止可以操作标准的gRpc接口,还可以调用服务端流式接口,比如 queryByIdList 就是一个服务端流式接口,可以分批来进行返回数据。
1
| grpcurl -plaintext -d '{"id": 1}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryByIdList
|
1 2 3 4 5 6
| { "user": { "id": 1, "name": "测试用户1" } }
|
1
| grpcurl -plaintext -d '{"id": [1,2]}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryByIdList
|
1 2 3 4 5 6 7 8 9 10 11 12
| { "user": { "id": 1, "name": "测试用户1" } } { "user": { "id": 2, "name": "测试用户2" } }
|
如果-d
参数是@
则表示从标准输入读取json输入参数,这一般用于比较输入复杂的json数据,也可以用于测试流方法。
下面命令是batchInsertUser流式接口方法,通过从标准输入读取输入流参数:
1 2 3 4 5 6 7 8 9 10 11 12 13
| grpcurl -plaintext -d @ localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/batchInsertUser { "user":{ "id":1, "name":"lwj" } } { "user":{ "id":2, "name":"lws" } }
|
调用客户端流式接口时,终端界面是与服务端一直保持连接,相当于流一直未关闭,那么接口就一直不会返回,所以我们可以直接在服务端去看日志打印。
1 2 3 4 5 6 7 8 9
| [grpc-default-executor-5] INFO grpc.GRpcInterfaceTestServiceGrpcGrpcImpl - value : user { id: 1 name: "lwj" }
[grpc-default-executor-5] INFO grpc.GRpcInterfaceTestServiceGrpcGrpcImpl - value : user { id: 2 name: "lws" }
|
调用双向流接口时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| grpcurl -plaintext -d @ 172.16.108.197:8506 com.lwj.item.proto.GRpcInterfaceTestService/batchCheckUser //客户端输入 { "user":{ "id":1 } } // 输入完毕 回车 // 服务端返回 { "user": { "id": 1, "name": "双向流接口测试对象1" } } //客户端输入 { "user":{ "id":2 } } // 输入完毕 回车 // 服务端返回 { "user": { "id": 2, "name": "双向流接口测试对象2" } }
|