1. 概要

gRPC は、プロセス間リモートプロシージャコール(RPC)を実行するためのプラットフォームです。 パフォーマンスが高く、どのような環境でも実行できます。

このチュートリアルでは、Javaを使用したgRPCエラー処理に焦点を当てます。 gRPCはレイテンシーが非常に低く、スループットが高いため、マイクロサービスアーキテクチャなどの複雑な環境で使用するのが理想的です。 これらのシステムでは、ネットワークのさまざまなコンポーネントの状態、パフォーマンス、および障害を十分に理解することが重要です。 したがって、以前の目標を達成するには、適切なエラー処理の実装が不可欠です。

2. gRPCでのエラー処理の基本

gRPCのエラーはファーストクラスのエンティティです。つまり、 gRPCのすべての呼び出しは、ペイロードメッセージまたはステータスエラーメッセージのいずれかです。

エラーはステータスメッセージにコード化され、サポートされているすべての言語に実装されています

一般に、応答ペイロードにエラーを含めないでください。 そのためには、常に StreamObserver :: OnError、を使用して、ステータスエラーを末尾のヘッダーに内部的に追加します。 以下に示すように、唯一の例外は、ストリームを操作している場合です。

すべてのクライアントまたはサーバーのgRPCライブラリは、公式gRPCエラーモデルをサポートしています。 Javaは、このエラーモデルをクラスio.grpc.Statusでカプセル化します。 このクラスには、追加情報を提供するために、標準のエラーステータスコードとオプションの文字列エラーメッセージが必要です。 このエラーモデルには、使用されるデータエンコーディング(プロトコルバッファ、RESTなど)とは関係なくサポートされるという利点があります。 ただし、ステータスにエラーの詳細を含めることができないため、かなり制限されています。

gRPCアプリケーションがデータエンコーディング用のプロトコルバッファを実装している場合は、GoogleAPI用のより豊富なエラーモデルを使用できます。 com.google.rpc.Status クラスは、このエラーモデルをカプセル化します。 このクラスは、com.google.rpc.Code値、エラーメッセージ、および追加のエラー詳細protobufメッセージとして追加します。 さらに、 error_details.proto で定義されている、最も一般的なケースをカバーするprotobufエラーメッセージの事前定義されたセットを利用できます。  パッケージcom.google.rpcには、 RetryInfo DebugInfo QuotaFailure ErrorInfoのクラスがあります。 、 PrecondicionFailure BadRequest RequestInfo ResourceInfo、 Help は、すべてのエラーメッセージをにカプセル化します。 ]error_details.proto

2つのエラーモデルに加えて、RPCメタデータにキーと値のペアとして追加できるカスタムエラーメッセージを定義できます。

クライアントが商品名を送信し、サーバーが価格設定値を提供する価格設定サービスでこれらのエラーモデルを使用する方法を示す非常に単純なアプリケーションを作成します。

3. UnaryRPC呼び出し

commodity_price.protoで定義されている次のサービスインターフェイスの検討を始めましょう。

service CommodityPriceProvider {
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message CommodityQuote {
    string commodity_name = 1;
    string producer_name = 2;
    double price = 3;
}

message ErrorResponse {
    string commodity_name = 1;
    string access_token = 2;
    string expected_token = 3;
    string expected_value = 4;
}

サービスの入力は商品メッセージです。 リクエストでは、クライアントはaccess_tokencommodity_nameを提供する必要があります。

サーバーは、コモディティ名プロデューサー名、、およびコモディティに関連する価格を示すコモディティクォートと同期的に応答します。 ]。

説明のために、カスタムErrorResponseも定義します。  これは、メタデータとしてクライアントに送信するカスタムエラーメッセージの例です。

3.1. io.grpc.Statusを使用した応答

サーバーのサービスコールでは、有効なコモディティのリクエストを確認します。

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {

    if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
 
        Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
        ErrorResponse errorResponse = ErrorResponse.newBuilder()
          .setCommodityName(request.getCommodityName())
          .setAccessToken(request.getAccessToken())
          .setExpectedValue("Only Commodity1, Commodity2 are supported")
          .build();
        Metadata metadata = new Metadata();
        metadata.put(errorResponseKey, errorResponse);
        responseObserver.onError(io.grpc.Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
          .asRuntimeException(metadata));
    } 
    // ...
}

この簡単な例では、CommoditycommodityLookupBasePrice HashTableに存在しない場合にエラーを返します。

まず、カスタム ErrorResponse を作成し、 metadata.put(errorResponseKey、errorResponse)のメタデータに追加するキーと値のペアを作成します。

io.grpc.Statusを使用してエラーステータスを指定します。 関数responseObserver:: onErrorはパラメーターとしてThrowableを受け取るため、asRuntimeException(metadata)を使用してステータスをThrowableに変換します。 asRuntimeException は、オプションでメタデータパラメーター(この場合は ErrorResponse キーと値のペア)を受け取ることができます。これにより、メッセージの予告編が追加されます。

クライアントが無効な要求を行うと、例外が返されます。

@Test
public void whenUsingInvalidCommodityName_thenReturnExceptionIoRpcStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("123validToken")
      .setCommodityName("Commodity5")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class, () -> blockingStub.getBestCommodityPrice(request));

    assertEquals("INVALID_ARGUMENT", thrown.getStatus().getCode().toString());
    assertEquals("INVALID_ARGUMENT: The commodity is not supported", thrown.getMessage());
    Metadata metadata = Status.trailersFromThrowable(thrown);
    ErrorResponse errorResponse = metadata.get(ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance()));
    assertEquals("Commodity5",errorResponse.getCommodityName());
    assertEquals("123validToken", errorResponse.getAccessToken());
    assertEquals("Only Commodity1, Commodity2 are supported", errorResponse.getExpectedValue());
}

blockingStub :: getBestCommodityPrice を呼び出すと、リクエストに無効な商品名が含まれているため、StatusRuntimeExeptionがスローされます。

Status ::trailerFromThrowableを使用してメタデータにアクセスします。 ProtoUtils :: keyForProto は、ErrorResponseのメタデータキーを提供します。

3.2. com.google.rpc.Statusを使用した応答

次のサーバーコードの例を考えてみましょう。

public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
    // ...
    if (request.getAccessToken().equals("123validToken") == false) {

        com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
          .setCode(com.google.rpc.Code.NOT_FOUND.getNumber())
          .setMessage("The access token not found")
          .addDetails(Any.pack(ErrorInfo.newBuilder()
            .setReason("Invalid Token")
            .setDomain("com.baeldung.grpc.errorhandling")
            .putMetadata("insertToken", "123validToken")
            .build()))
          .build();
        responseObserver.onError(StatusProto.toStatusRuntimeException(status));
    }
    // ...
}

実装では、リクエストに有効なトークンがない場合、getBestCommodityPriceはエラーを返します。

さらに、ステータスコード、メッセージ、詳細をcom.google.rpc.Statusに設定します。

この例では、カスタム ErrorDetails の代わりに、事前定義された com.google.rpc.ErrorInfo を使用しています(ただし、必要に応じて両方を使用することもできます)。 Any :: pack()を使用してErrorInfoをシリアル化します。

クラスStatusProto:: toStatusRuntimeException は、com.google.rpc.StatusThrowableに変換します。

原則として、 error_details.proto で定義されている他のメッセージを追加して、応答をさらにカスタマイズすることもできます。

クライアントの実装は簡単です。

@Test
public void whenUsingInvalidRequestToken_thenReturnExceptionGoogleRPCStatus() throws Exception {
 
    Commodity request = Commodity.newBuilder()
      .setAccessToken("invalidToken")
      .setCommodityName("Commodity1")
      .build();

    StatusRuntimeException thrown = Assertions.assertThrows(StatusRuntimeException.class,
      () -> blockingStub.getBestCommodityPrice(request));
    com.google.rpc.Status status = StatusProto.fromThrowable(thrown);
    assertNotNull(status);
    assertEquals("NOT_FOUND", Code.forNumber(status.getCode()).toString());
    assertEquals("The access token not found", status.getMessage());
    for (Any any : status.getDetailsList()) {
        if (any.is(ErrorInfo.class)) {
            ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
            assertEquals("Invalid Token", errorInfo.getReason());
            assertEquals("com.baeldung.grpc.errorhandling", errorInfo.getDomain());
            assertEquals("123validToken", errorInfo.getMetadataMap().get("insertToken"));
        }
    }
}

StatusProto.fromThrowable は、例外からcom.google.rpc.Statusを直接取得するためのユーティリティメソッドです。

status :: getDetailsList から、com.google.rpc.ErrorInfoの詳細を取得します。

4. gRPCストリームのエラー

gRPCストリームを使用すると、サーバーとクライアントは1回のRPC呼び出しで複数のメッセージを送信できます。

エラーの伝播に関して、これまで使用してきたアプローチは、gRPCストリームでは無効です。 理由は、onError()がRPC で呼び出される最後のメソッドである必要があるためです。これは、この呼び出しの後、フレームワークがクライアントとサーバー間の通信を切断するためです。

ストリームを使用している場合、これは望ましい動作ではありません。代わりに、RPC を介して送信される可能性のある他のメッセージに応答するために、接続を開いたままにしておきます。

この問題に対する適切な解決策は、 commodity_price.proto に示すように、メッセージ自体にエラーを追加することです。

service CommodityPriceProvider {
  
    rpc getBestCommodityPrice(Commodity) returns (CommodityQuote) {}
  
    rpc bidirectionalListOfPrices(stream Commodity) returns (stream StreamingCommodityQuote) {}
}

message Commodity {
    string access_token = 1;
    string commodity_name = 2;
}

message StreamingCommodityQuote{
    oneof message{
        CommodityQuote comodity_quote = 1;
        google.rpc.Status status = 2;
   }   
}

関数bidirectionListOfPricesは、StreamingCommodityQuoteを返します。 このメッセージには、CommodityQuoteまたはgoogle.rpc.Statusのいずれかを使用できることを示すoneofキーワードが含まれています。

次の例では、クライアントが無効なトークンを送信すると、サーバーは応答の本文にステータスエラーを追加します。

public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {

    return new StreamObserver<Commodity>() {
        @Override
        public void onNext(Commodity request) {

            if (request.getAccessToken().equals("123validToken") == false) {

                com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
                  .setCode(Code.NOT_FOUND.getNumber())
                  .setMessage("The access token not found")
                  .addDetails(Any.pack(ErrorInfo.newBuilder()
                    .setReason("Invalid Token")
                    .setDomain("com.baeldung.grpc.errorhandling")
                    .putMetadata("insertToken", "123validToken")
                    .build()))
                  .build();
                StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
                  .setStatus(status)
                  .build();
                responseObserver.onNext(streamingCommodityQuote);
            }
            // ...
        }
    }
}

コードはcom.google.rpc.Statusのインスタンスを作成し、それをStreamingCommodityQuote応答メッセージに追加します。 onError()、を呼び出さないため、フレームワークはクライアントとの接続を中断しません。

クライアントの実装を見てみましょう。

public void onNext(StreamingCommodityQuote streamingCommodityQuote) {

    switch (streamingCommodityQuote.getMessageCase()) {
        case COMODITY_QUOTE:
            CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
            logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
            break;
        case STATUS:
            com.google.rpc.Status status = streamingCommodityQuote.getStatus();
            logger.info("Status code:" + Code.forNumber(status.getCode()));
            logger.info("Status message:" + status.getMessage());
            for (Any any : status.getDetailsList()) {
                if (any.is(ErrorInfo.class)) {
                    ErrorInfo errorInfo;
                    try {
                        errorInfo = any.unpack(ErrorInfo.class);
                        logger.info("Reason:" + errorInfo.getReason());
                        logger.info("Domain:" + errorInfo.getDomain());
                        logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
                    } catch (InvalidProtocolBufferException e) {
                        logger.error(e.getMessage());
                    }
                }
            }
            break;
        // ...
    }
}

クライアントはonNext(StreamingCommodityQuote)で返されたメッセージを取得し、はswitchステートメントを使用してCommodityQuoteまたはcom.google.rpc.Statusを区別します。

5. 結論

このチュートリアルでは、単項およびストリームベースのRPC呼び出しのgRPCでエラー処理を実装する方法を示しました

gRPCは、分散システムでのリモート通信に使用できる優れたフレームワークです。 これらのシステムでは、システムの監視に役立つ非常に堅牢なエラー処理の実装が重要です。 これは、マイクロサービスのような複雑なアーキテクチャではさらに重要です。

例のソースコードは、GitHubにあります。