Client Stub
1 | ServiceGrpc.ServiceFutureStub stub; |
异步调用
Client1
2
3ListenableFuture<Response> response = stub.describe(Request.newBuilder()
.setType(1);
response.get().getData().getStringValue();
Server1
2
3
4
5public void execute(Request request, StreamObserver<Response> responseObserver) {
Response.Builder responseBuilder = Response.newBuilder();
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
Client流
Client1
2
3
4
5
6
7
8
9
10
11
12
13
14
15StreamObserver<Request> responseStream = streamStub.fetchStream(new StreamObserver<Response>() {
public void onNext(Response response) {
latch.countDown();
}
public void onError(Throwable throwable) {
log.error("fetchStream client", throwable);
}
public void onCompleted() {
}
});
Server1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public StreamObserver<Request> execute(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
Response.Builder responseBuilder;
public void onNext(Request request) {
responseBuilder = Response.newBuilder();
}
public void onError(Throwable t) {
log.error("error", t);
responseObserver.onError(t);
}
public void onCompleted() {
responseObserver.onNext(responseBuilder.build());
}
};
}
双流
Client1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 CountDownLatch latch = new CountDownLatch(1);
StreamObserver<Request> responseStream = streamStub.fetch2Stream(new StreamObserver<Response>() {
AtomicInteger i = new AtomicInteger(0);
public void onNext(Response response) { response.getData().getStructValue();
}
public void onError(Throwable throwable) {
}
public void onCompleted() {
latch.countDown();
}
});
//onNext
for (int i = 0; i < 1000; i++) {
responseStream.onNext(requestBuilder.build());
}
responseStream.onCompleted();
latch.await();
Server1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 public StreamObserver<Request> execute(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
AtomicInteger times = new AtomicInteger(0);
public void onNext(Request request) {
log.info("fetch2Stream server receive:{}", times.incrementAndGet());
responseObserver.onNext(null);
}
public void onError(Throwable t) {
log.error("error", t);
responseObserver.onError(t);
}
public void onCompleted() {
log.info("fetch2Stream server send,total:{}", times.get());
responseObserver.onCompleted();
}
};
}