Java实现gRPC的单和双流等服务调用

Client Stub

1
2
3
4
5
ServiceGrpc.ServiceFutureStub stub;
ServiceGrpc.ServiceStub streamStub;
ManagedChannel channel = ManagedChannelBuilder.forAddress("192.168.1.100", 20000).usePlaintext().build();
stub = ServiceGrpc.newFutureStub(channel).withExecutor(executor);
streamStub = ServiceGrpc.newStub(channel);

异步调用

Client

1
2
3
ListenableFuture<Response> response = stub.describe(Request.newBuilder()
.setType(1);
response.get().getData().getStringValue();

Server

1
2
3
4
5
public void execute(Request request, StreamObserver<Response> responseObserver) {
Response.Builder responseBuilder = Response.newBuilder();
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}

Client流

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StreamObserver<Request> responseStream = streamStub.fetchStream(new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
latch.countDown();
}

@Override
public void onError(Throwable throwable) {
log.error("fetchStream client", throwable);
}

@Override
public void onCompleted() {
}
});

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public StreamObserver<Request> execute(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
Response.Builder responseBuilder;

@Override
public void onNext(Request request) {
responseBuilder = Response.newBuilder();
}

@Override
public void onError(Throwable t) {
log.error("error", t);
responseObserver.onError(t);
}

@Override
public void onCompleted() {
responseObserver.onNext(responseBuilder.build());
}
};
}

双流

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
        CountDownLatch latch = new CountDownLatch(1);

StreamObserver<Request> responseStream = streamStub.fetch2Stream(new StreamObserver<Response>() {
AtomicInteger i = new AtomicInteger(0);

@Override
public void onNext(Response response) { response.getData().getStructValue();
}

@Override
public void onError(Throwable throwable) {
}

@Override
public void onCompleted() {
latch.countDown();
}
});

//onNext
for (int i = 0; i < 1000; i++) {
responseStream.onNext(requestBuilder.build());
}

responseStream.onCompleted();

latch.await();

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  public StreamObserver<Request> execute(StreamObserver<Response> responseObserver) {
return new StreamObserver<Request>() {
AtomicInteger times = new AtomicInteger(0);

@Override
public void onNext(Request request) {
log.info("fetch2Stream server receive:{}", times.incrementAndGet());
responseObserver.onNext(null);
}

@Override
public void onError(Throwable t) {
log.error("error", t);
responseObserver.onError(t);
}

@Override
public void onCompleted() {
log.info("fetch2Stream server send,total:{}", times.get());
responseObserver.onCompleted();
}
};
}

------ 本文结束------

本文标题:Java实现gRPC的单和双流等服务调用

文章作者:Perkins

发布时间:2020年08月17日

原始链接:https://perkins4j2.github.io/posts/43712/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。