1. 概要

gRPCは、プロセス間リモートプロシージャコール(RPC)を実行するためのプラットフォームです。 これはクライアントサーバーモデルに従い、パフォーマンスが高く、最も重要なコンピューター言語をサポートします。 良いレビューのために私たちの記事gRPCの紹介をチェックしてください。

このチュートリアルでは、gRPCストリームに焦点を当てます。 ストリーミングにより、サーバーとクライアント間のメッセージの多重化が可能になり、非常に効率的で柔軟なプロセス間通信が作成されます

2. gRPCストリーミングの基本

gRPCはHTTP/2ネットワークプロトコルを使用してサービス間通信を行いますOne HTTP / 2の主な利点は、ストリームをサポートすることです。[ X161X]各ストリームは、単一の接続を共有する複数の双方向メッセージを多重化できます。

gRPCでは、次の3つの機能的な呼び出しタイプでストリーミングを行うことができます。

  1. サーバーストリーミングRPC:クライアントはサーバーに単一の要求を送信し、順次読み取る複数のメッセージを返します。
  2. クライアントストリーミングRPC:クライアントは一連のメッセージをサーバーに送信します。 クライアントは、サーバーがメッセージを処理するのを待ち、返された応答を読み取ります。
  3. 双方向ストリーミングRPC:クライアントとサーバーは複数のメッセージを送受信できます。 メッセージは、送信されたのと同じ順序で受信されます。 ただし、サーバーまたはクライアントは、受信したメッセージに選択した順序で応答できます。

これらの手続き型呼び出しの使用方法を示すために、株式証券に関する情報を交換する簡単なクライアントサーバーアプリケーションの例を記述します。

3. サービス定義

stock_quote.proto を使用して、サービスインターフェイスとペイロードメッセージの構造を定義します。

service StockQuoteProvider {
  
  rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}

  rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
  
  rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
   string ticker_symbol = 1;
   string company_name = 2;
   string description = 3;
}
message StockQuote {
   double price = 1;
   int32 offer_number = 2;
   string description = 3;
}

StockQuoteProviderサービスには、メッセージストリーミングをサポートする3つのメソッドタイプがあります。次のセクションでは、それらの実装について説明します。

サービスのメソッドシグネチャから、クライアントがStockメッセージを送信してサーバーにクエリを実行していることがわかります。 サーバーは、StockQuoteメッセージを使用して応答を送り返します。

pom.xmlファイルで定義されているprotobuf-maven-pluginを使用して、stock-quote.protoIDLファイルからJavaコードを生成します。

プラグインは、 target /generated-sources / protobuf /javaおよび/grpc-javaディレクトリにクライアント側のスタブとサーバー側のコードを生成します。

生成されたコードを活用してサーバーとクライアントを実装します

4. サーバーの実装

StockServer コンストラクターは、gRPC Server を使用して、着信要求をリッスンしてディスパッチします。

public class StockServer {
    private int port;
    private io.grpc.Server server;

    public StockServer(int port) throws IOException {
        this.port = port;
        server = ServerBuilder.forPort(port)
          .addService(new StockService())
          .build();
    }
    //...
}

StockServiceio.grpc.Serverに追加します。 StockServiceStockQuoteProviderImplBaseを拡張します。これは、protobufプラグインがprotoファイルから生成したものです。 したがって、 StockQuoteProviderImplBaseには、3つのストリーミングサービスメソッドのスタブがあります。

StockServiceは、サービスの実際の実装を行うために、これらのスタブメソッドをオーバーライドする必要があります。

次に、3つのストリーミングケースでこれがどのように行われるかを見ていきます。

4.1. サーバーサイドストリーミング

クライアントは見積もりの単一の要求を送信し、商品に対して提供される価格がそれぞれ異なる複数の応答を返します。

@Override
public void serverSideStreamingGetListStockQuotes(Stock request, StreamObserver<StockQuote> responseObserver) {
    for (int i = 1; i <= 5; i++) {
        StockQuote stockQuote = StockQuote.newBuilder()
          .setPrice(fetchStockPriceBid(request))
          .setOfferNumber(i)
          .setDescription("Price for stock:" + request.getTickerSymbol())
          .build();
        responseObserver.onNext(stockQuote);
    }
    responseObserver.onCompleted();
}

このメソッドは、 StockQuote を作成し、価格を取得して、オファー番号をマークします。 オファーごとに、 responseObserver ::onNextを呼び出すメッセージをクライアントに送信します。 reponseObserver :: onCompleted を使用して、RPCで完了したことを通知します。

4.2. クライアント側のストリーミング

クライアントは複数の株を送信し、サーバーは単一のStockQuoteを返します。

@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        int count;
        double price = 0.0;
        StringBuffer sb = new StringBuffer();

        @Override
        public void onNext(Stock stock) {
            count++;
            price = +fetchStockPriceBid(stock);
            sb.append(":")
                .append(stock.getTickerSymbol());
        }

        @Override
        public void onCompleted() {
            responseObserver.onNext(StockQuote.newBuilder()
                .setPrice(price / count)
                .setDescription("Statistics-" + sb.toString())
                .build());
            responseObserver.onCompleted();
        }

        // handle onError() ...
    };
}

メソッドは StreamObserver クライアントに応答するためのパラメータとして。 それは StreamObserver 、クライアント要求メッセージを処理します。

返された StreamObserver オーバーライド onNext() クライアントがリクエストを送信するたびに通知を受け取るため。

方法 StreamObserver .onCompleted() クライアントがすべてのメッセージの送信を終了したときに呼び出されます。 受信したすべてのStockメッセージを使用して、取得した株価の平均を求め、 StockQuote を作成し、 responseObserver ::onNextを呼び出して配信します。クライアントへの結果。

最後に、オーバーライドします StreamObserver .onError() 異常終了を処理します。

4.3. 双方向ストリーミング

クライアントは複数の株を送信し、サーバーはリクエストごとに一連の価格を返します

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(StreamObserver<StockQuote> responseObserver) {
    return new StreamObserver<Stock>() {
        @Override
        public void onNext(Stock request) {
            for (int i = 1; i <= 5; i++) {
                StockQuote stockQuote = StockQuote.newBuilder()
                  .setPrice(fetchStockPriceBid(request))
                  .setOfferNumber(i)
                  .setDescription("Price for stock:" + request.getTickerSymbol())
                  .build();
                responseObserver.onNext(stockQuote);
            }
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }

        //handle OnError() ...
    };
}

前の例と同じメソッドシグネチャがあります。 実装の変更点:クライアントがすべてのメッセージを送信するのを待たずに応答します。

この場合、各着信メッセージを受信した直後に、受信したのと同じ順序で responseObserver ::onNextを呼び出します。

必要に応じて、応答の順序を簡単に変更できたことに注意することが重要です。

5. クライアントの実装

StockClient のコンストラクターは、gRPCチャネルを取得し、gRPCMavenプラグインによって生成されたスタブクラスをインスタンス化します。

public class StockClient {
    private StockQuoteProviderBlockingStub blockingStub;
    private StockQuoteProviderStub nonBlockingStub;

    public StockClient(Channel channel) {
        blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
        nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
    }
    // ...
}

StockQuoteProviderBlockingStubおよびStockQuoteProviderStubは、同期および非同期のクライアントメソッド要求の作成をサポートします

次に、3つのストリーミングRPCのクライアント実装を確認します。

5.1. サーバーサイドストリーミングを使用したクライアントRPC

クライアントはサーバーに1回呼び出して株価を要求し、見積もりのリストを返します。

public void serverSideStreamingListOfStockPrices() {
    Stock request = Stock.newBuilder()
      .setTickerSymbol("AU")
      .setCompanyName("Austich")
      .setDescription("server streaming example")
      .build();
    Iterator<StockQuote> stockQuotes;
    try {
        logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
        stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
        for (int i = 1; stockQuotes.hasNext(); i++) {
            StockQuote stockQuote = stockQuotes.next();
            logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
        }
    } catch (StatusRuntimeException e) {
        logInfo("RPC failed: {0}", e.getStatus());
    }
}

blockingStub ::serverSideStreamingGetListStockを使用して同期リクエストを作成します。 StockQuotesIteratorのリストが返されます。

5.2. クライアント側ストリーミングを使用したクライアントRPC

クライアントはStockのストリームをサーバーに送信し、StockQuoteをいくつかの統計とともに返します。

public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote summary) {
            logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
        }

        // Override OnError ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

サーバーの例で行ったように、StreamObserversを使用してメッセージを送受信します。

requestObserver は、非ブロッキングスタブを使用してStockのリストをサーバーに送信します。

responseObserver を使用すると、StockQuoteにいくつかの統計が返されます。

5.3. 双方向ストリーミングを使用するクライアントRPC

クライアントはStockのストリームを送信し、各Stockの価格のリストを返します。

public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
    StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
        @Override
        public void onNext(StockQuote stockQuote) {
            logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
        }

        @Override
        public void onCompleted() {
            logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
        }

        //Override onError() ...
    };
    
    StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
    try {
        for (Stock stock : stocks) {
            logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
            requestObserver.onNext(stock);
            Thread.sleep(200);
        }
    } catch (RuntimeException e) {
        requestObserver.onError(e);
        throw e;
    }
    requestObserver.onCompleted();
}

実装は、クライアント側のストリーミングの場合と非常によく似ています。 StockrequestObserverで送信します。唯一の違いは、responseObserverで複数の応答を取得することです。 応答は要求から切り離されています—それらは任意の順序で到着できます。

6. サーバーとクライアントの実行

Mavenを使用してコードをコンパイルした後、2つのコマンドウィンドウを開く必要があります。

サーバーを実行するには:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer

クライアントを実行するには:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient

7. 結論

この記事では、gRPCでストリーミングを使用する方法を見てきました。 ストリーミングは、単一の接続を介して複数のメッセージを送信することにより、クライアントとサーバーが通信できるようにする強力な機能です。 さらに、メッセージは送信されたのと同じ順序で受信されますが、どちらの側もメッセージを任意の順序で読み書きできます

例のソースコードは、GitHubにあります。