一、gRPC介绍

  1. gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持。

  2. gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

  3. 各个进程之间可以通过gRPC相互调用,如下图:

    img

二、四种接口类型的服务端实现(Java)

1.标准grpc接口

1
2
3
4
5
6
7
8
9
10
11
//标准grpc接口
@Override
public void queryById(QueryByIdRequest request, StreamObserver<QueryByIdResponse> response) {
response.onNext(QueryByIdResponse.newBuilder()
.setUser(User.newBuilder()
.setId(request.getId())
.setName("测试用户" + request.getId())
.build())
.build());
response.onCompleted();
}

2.服务端流式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
//服务端流式接口
@Override
public void queryByIdList(QueryByIdListRequest request, StreamObserver<QueryByIdListResponse> response) {
List<Integer> requestIdList = request.getIdList();
requestIdList.forEach(
id -> response.onNext(QueryByIdListResponse.newBuilder()
.setUser(User.newBuilder()
.setId(id)
.setName("测试用户" + id)
.build())
.build()));
response.onCompleted();
}

3.客户端流式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//客户端流式接口
@Override
public StreamObserver<BatchInsertUserRequest> batchInsertUser(StreamObserver<BatchInsertUserResponse> response) {
AtomicInteger count = new AtomicInteger(0);
return new StreamObserver<BatchInsertUserRequest>() {

@Override
public void onNext(BatchInsertUserRequest value) {
log.info("value : {}", value);
count.incrementAndGet();
}

@Override
public void onError(Throwable t) {
log.info("batchInsertUser throwable", t);
}

@Override
public void onCompleted() {
response.onNext(BatchInsertUserResponse.newBuilder()
.setInsertCount(count.get())
.build());
response.onCompleted();
}
};
}

4.双向流式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//双向流式接口
@Override
public StreamObserver<BatchCheckUserRequest> batchCheckUser(StreamObserver<BatchCheckUserResponse> response) {
return new StreamObserver<BatchCheckUserRequest>() {

@Override
public void onNext(BatchCheckUserRequest value) {
User user = value.getUser();
log.info("batchCheckUser server receive user:{}", JsonUtil.toJson(user));
response.onNext(BatchCheckUserResponse.newBuilder()
.setUser(User.newBuilder()
.setId(user.getId())
.setName("双向流接口测试对象" + user.getId())
.build())
.build());
}

@Override
public void onError(Throwable t) {
log.info("batchCheckUser server throwable", t);
}

@Override
public void onCompleted() {
log.info("batchCheckUser server dispose over");
response.onCompleted();
}
};
}

三、四种接口类型的客户端实现(Java)

客户端是通过服务端的stub来调用对应的服务,服务端的stub是根据 proto 文件生成的。

每个接口都会生成3种类型的stub来供客户端调用。

  • stub 支持四种形式的grpc服务。
  • futurestub 只支持普通的grpc服务,不支持流的形式。
  • blockingstub 支持普通的和服务端流。
1
2
3
4
5
6
// stub 支持四种形式的grpc服务
private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceStub gRpcInterfaceTestServiceStub;
// blockingstub 支持普通的和服务端流
private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceBlockingStub gRpcInterfaceTestServiceBlockingStub;
// futurestub 只支持普通的grpc服务,不支持流的形式
private GRpcInterfaceTestServiceGrpc.GRpcInterfaceTestServiceFutureStub gRpcInterfaceTestServiceFutureStub;

1.标准grpc接口

1
2
3
4
5
6
7
8
@Test
public void clientQueryById() throws ExecutionException, InterruptedException {
QueryByIdRequest request = QueryByIdRequest.newBuilder()
.setId(1)
.build();
ListenableFuture<QueryByIdResponse> queryByIdResponse = gRpcInterfaceTestServiceFutureStub.queryById(request);
log.info("gRpcInterfaceTestServiceBlockingStub.clientQueryById response:{}", JsonUtil.toJson(queryByIdResponse.get().getUser()));
}

2.服务端流式接口

1
2
3
4
5
6
7
8
9
10
@Test
public void clientQueryByIdList() {
QueryByIdListRequest request = QueryByIdListRequest.newBuilder()
.addAllId(Lists.newArrayList(1, 2, 3))
.build();
Iterator<QueryByIdListResponse> responseIterator = gRpcInterfaceTestServiceBlockingStub.queryByIdList(request);
while (responseIterator.hasNext()) {
log.info("gRpcInterfaceTestServiceBlockingStub.clientQueryByIdList response:{}", JsonUtil.toJson(responseIterator.next()));
}
}

3.客户端流式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Test
public void clientBatchInsertUser() throws InterruptedException, ExecutionException {
SettableFuture<Integer> countFuture = SettableFuture.create();
final Integer[] returnValue = new Integer[1];
StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse> responseStreamObserver = new StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse>() {
@Override
public void onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserResponse value) {
log.info("server return insert user count :{}", value.getInsertCount());
returnValue[0] = value.getInsertCount();
}

@Override
public void onError(Throwable t) {
log.warn("batchInsertUser#onError", t);
}

@Override
public void onCompleted() {
countFuture.set(returnValue[0]);
log.info("server dispose over");
}
};
StreamObserver<GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest> requestStreamObserver = gRpcInterfaceTestServiceStub.batchInsertUser(responseStreamObserver);
List<GRpcInterfaceTestServiceOuterClass.User> firstBatch = Lists.newArrayList(GRpcInterfaceTestServiceOuterClass.User.newBuilder()
.setId(1)
.setName("sys1")
.build(),
GRpcInterfaceTestServiceOuterClass.User.newBuilder()
.setId(2)
.setName("sys2")
.build());
List<GRpcInterfaceTestServiceOuterClass.User> secondBatch = Lists.newArrayList(GRpcInterfaceTestServiceOuterClass.User.newBuilder()
.setId(3)
.setName("sys3")
.build());
requestStreamObserver.onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest.newBuilder()
.addAllUser(firstBatch)
.build());
requestStreamObserver.onNext(GRpcInterfaceTestServiceOuterClass.BatchInsertUserRequest.newBuilder()
.addAllUser(secondBatch)
.build());
requestStreamObserver.onCompleted();
log.info("clientBatchInsertUser batch insert {} user ", countFuture.get());
}

4.双向流式接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Test
public void batchCheckUser() throws ExecutionException, InterruptedException {
SettableFuture<List<User>> future = SettableFuture.create();
List<User> userList = Lists.newArrayList();
StreamObserver<BatchCheckUserRequest> requestStreamObserver = gRpcInterfaceTestServiceStub.batchCheckUser(new StreamObserver<BatchCheckUserResponse>() {
@Override
public void onNext(BatchCheckUserResponse value) {
User user = value.getUser();
log.info("batchCheckUser client receive server dispose User : {}", JsonUtil.toJson(user));
userList.add(user);
}

@Override
public void onError(Throwable t) {
log.info("batchCheckUser onError", t);
}

@Override
public void onCompleted() {
future.set(userList);
log.info("batchCheckUser client over");
}
});
requestStreamObserver.onNext(BatchCheckUserRequest.newBuilder()
.setUser(User.newBuilder()
.setId(1)
.build())
.build());
requestStreamObserver.onNext(BatchCheckUserRequest.newBuilder()
.setUser(User.newBuilder()
.setId(2)
.build())
.build());
requestStreamObserver.onCompleted();
log.info("batchCheckUser client receive server return userList : {}", JsonUtil.toJson(future.get()));
}

5.小结

客户端流接口和双向流接口其实很类似,区别在于根据接口类型来控制,response.onNext 方法的调用次数和调用场景。

客户端流接口的 response.onNext 在服务端只能调用一次,因此我们通常是在 request.onCompleted 方法里调用。而在双向流接口里request.onNext 方法可以调用多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//客户端流式接口
@Override
public StreamObserver<BatchInsertUserRequest> batchInsertUser(StreamObserver<BatchInsertUserResponse> response) {
AtomicInteger count = new AtomicInteger(0);
return new StreamObserver<BatchInsertUserRequest>() {

@Override
public void onNext(BatchInsertUserRequest value) {
log.info("value : {}", value);
count.addAndGet(value.getUserList().size());
}

@Override
public void onError(Throwable t) {
log.info("batchInsertUser throwable", t);
}

@Override
public void onCompleted() {
response.onNext(BatchInsertUserResponse.newBuilder()
.setInsertCount(count.get())
.build());
response.onCompleted();
}
};
}

四、gprcurl 工具使用

1.查询服务列表

通过-plaintext参数忽略tls证书的验证过程

1
grpcurl -plaintext localhost:8506 list 
1
com.lwj.item.proto.GRpcInterfaceTestService

2.查询服务的方法列表

1
grpcurl -plaintext localhost:8506 list com.lwj.item.proto.GRpcInterfaceTestService
1
2
3
4
com.lwj.item.proto.GRpcInterfaceTestService.batchCheckUser
com.lwj.item.proto.GRpcInterfaceTestService.batchInsertUser
com.lwj.item.proto.GRpcInterfaceTestService.queryById
com.lwj.item.proto.GRpcInterfaceTestService.queryByIdList

3.查询服务的详细的方法列表

1
grpcurl -plaintext localhost:8506 describe com.lwj.item.proto.GRpcInterfaceTestService
1
2
3
4
5
6
7
com.lwj.item.proto.GRpcInterfaceTestService is a service:
service GRpcInterfaceTestService {
rpc batchCheckUser ( stream .com.lwj.item.proto.BatchCheckUserRequest ) returns ( stream .com.lwj.item.proto.BatchCheckUserResponse );
rpc batchInsertUser ( stream .com.lwj.item.proto.BatchInsertUserRequest ) returns ( .com.lwj.item.proto.BatchInsertUserResponse );
rpc queryById ( .com.lwj.item.proto.QueryByIdRequest ) returns ( .com.lwj.item.proto.QueryByIdResponse );
rpc queryByIdList ( .com.lwj.item.proto.QueryByIdListRequest ) returns ( stream .com.lwj.item.proto.QueryByIdListResponse );
}

4.查询类型信息

1
grpcurl -plaintext localhost:8506 describe com.lwj.item.proto.QueryByIdRequest
1
2
3
4
com.lwj.item.proto.QueryByIdRequest is a message:
message QueryByIdRequest {
int32 id = 1;
}

5.调用方法

在获取gRPC服务的详细信息之后就可以json调用gRPC方法了。

下面命令通过-d参数传入一个json字符串作为输入参数,调用的是GRpcInterfaceTestService服务的queryById方法:

1
grpcurl -plaintext -d '{"id": 1}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryById 
1
2
3
4
5
6
{
"user": {
"id": 1,
"name": "测试用户1"
}
}

这种方式不止可以操作标准的gRpc接口,还可以调用服务端流式接口,比如 queryByIdList 就是一个服务端流式接口,可以分批来进行返回数据。

1
grpcurl -plaintext -d '{"id": 1}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryByIdList 
1
2
3
4
5
6
{
"user": {
"id": 1,
"name": "测试用户1"
}
}
1
grpcurl -plaintext -d '{"id": [1,2]}' localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/queryByIdList 
1
2
3
4
5
6
7
8
9
10
11
12
{
"user": {
"id": 1,
"name": "测试用户1"
}
}
{
"user": {
"id": 2,
"name": "测试用户2"
}
}

如果-d参数是@则表示从标准输入读取json输入参数,这一般用于比较输入复杂的json数据,也可以用于测试流方法。

下面命令是batchInsertUser流式接口方法,通过从标准输入读取输入流参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
grpcurl -plaintext -d @ localhost:8506 com.lwj.item.proto.GRpcInterfaceTestService/batchInsertUser 
{
"user":{
"id":1,
"name":"lwj"
}
}
{
"user":{
"id":2,
"name":"lws"
}
}

调用客户端流式接口时,终端界面是与服务端一直保持连接,相当于流一直未关闭,那么接口就一直不会返回,所以我们可以直接在服务端去看日志打印。

1
2
3
4
5
6
7
8
9
[grpc-default-executor-5]  INFO grpc.GRpcInterfaceTestServiceGrpcGrpcImpl - value : user {
id: 1
name: "lwj"
}

[grpc-default-executor-5] INFO grpc.GRpcInterfaceTestServiceGrpcGrpcImpl - value : user {
id: 2
name: "lws"
}

调用双向流接口时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
grpcurl -plaintext -d @ 172.16.108.197:8506 com.lwj.item.proto.GRpcInterfaceTestService/batchCheckUser
//客户端输入
{
"user":{
"id":1
}
}
// 输入完毕 回车
// 服务端返回
{
"user": {
"id": 1,
"name": "双向流接口测试对象1"
}
}
//客户端输入
{
"user":{
"id":2
}
}
// 输入完毕 回车
// 服务端返回
{
"user": {
"id": 2,
"name": "双向流接口测试对象2"
}
}