1. 序章

非同期メッセージングは、イベント駆動型アーキテクチャを実装するためにますます普及している、緩く結合された分散通信の一種です。 幸い、 SpringFrameworkSpringAMQP プロジェクトを提供し、AMQPベースのメッセージングソリューションを構築できるようにします。

一方、このような環境でのエラーの処理は、簡単な作業ではない可能性があります。 したがって、このチュートリアルでは、エラーを処理するためのさまざまな戦略について説明します。

2. 環境設定

このチュートリアルでは、 AMQP標準を実装するRabbitMQを使用します。また、Spring AMQPは、統合を非常に簡単にするspring-rabbitモジュールを提供します。

RabbitMQをスタンドアロンサーバーとして実行してみましょう。 次のコマンドを実行して、Dockerコンテナで実行します。

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

構成とプロジェクトの依存関係の設定の詳細については、SpringAMQPの記事を参照してください。

3. 失敗シナリオ

通常、モノリスまたは単一パッケージのアプリケーションと比較して、メッセージングベースのシステムで発生する可能性のあるエラーの種類は分散しているため、より多くの種類があります。

例外の種類のいくつかを指摘することができます:

  • ネットワーク-またはI/O関連–ネットワーク接続およびI/O操作の一般的な障害
  • プロトコル-またはインフラストラクチャ関連–通常はメッセージングインフラストラクチャの設定ミスを表すエラー
  • ブローカー関連–クライアントとAMQPブローカー間の不適切な構成について警告する障害。 たとえば、定義された制限またはしきい値への到達、認証または無効なポリシー構成
  • アプリケーション-およびメッセージ関連–通常、一部のビジネスまたはアプリケーションルールの違反を示す例外

確かに、この障害のリストは網羅的ではありませんが、最も一般的なタイプのエラーが含まれています。

Spring AMQPは、たとえば再試行または再キューイングポリシーを適用することにより、接続関連の低レベルの問題をすぐに処理できることに注意してください。 さらに、ほとんどの障害と障害は、AmqpExceptionまたはそのサブクラスの1つに変換されます。

次のセクションでは、主にアプリケーション固有の高レベルのエラーに焦点を当て、次にグローバルなエラー処理戦略について説明します。

4. プロジェクトの設定

それでは、開始する単純なキューと交換構成を定義しましょう。

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

次に、簡単なプロデューサーを作成しましょう。

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

そして最後に、例外をスローするコンシューマー:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

デフォルトでは、失敗したすべてのメッセージは、ターゲットキューの先頭に何度も何度もすぐに再キューイングされます。

次のMavenコマンドを実行して、サンプルアプリケーションを実行してみましょう。

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

これで、同様の結果の出力が表示されるはずです。

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

その結果、デフォルトでは、出力にそのようなメッセージが無数に表示されます。

この動作を変更するには、次の2つのオプションがあります。

  • リスナー側でdefault-requeue-rejectedオプションをfalseに設定します– spring.rabbitmq.listener.simple.default-requeue-rejected = false
  • AmqpRejectAndDontRequeueExceptionをスローします– t は、将来意味をなさないメッセージに役立つ可能性があるため、破棄できます。

それでは、失敗したメッセージをよりインテリジェントな方法で処理する方法を見つけましょう。

5. デッドレターキュー

Dead Letter Queue(DLQ)は、未配信または失敗したメッセージを保持するキューです。 DLQを使用すると、障害のあるメッセージや不良メッセージを処理し、障害パターンを監視し、システムの例外から回復できます。

さらに重要なことに、これは、不良メッセージを絶えず処理し、システムパフォーマンスを低下させるキュー内の無限ループを防ぐのに役立ちます。

全体として、Dead Letter Exchange(DLX)とDead Letter Queue(DLQ)自体の2つの主要な概念があります。 実際、 DLXは通常の交換であり、一般的なタイプの1つとして定義できます:直接トピックまたはファンアウト

それを理解することは非常に重要ですプロデューサーはキューについて何も知りません。 交換のみを認識し、生成されたすべてのメッセージは交換構成とメッセージルーティングキーに従ってルーティングされます

次に、DeadLetterQueueアプローチを適用して例外を処理する方法を見てみましょう。

5.1. 基本構成

DLQを構成するには、キューを定義するときに追加の引数を指定する必要があります。

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

上記の例では、x-dead-letter-exchangex-dead-letter-routing-keyの2つの追加引数を使用しています。 x-dead-letter-exchangeオプションの空の文字列値は、ブローカーにデフォルトの交換を使用するように指示します。

2番目の引数は、単純なメッセージのルーティングキーを設定するのと同じくらい重要です。 このオプションは、メッセージの初期ルーティングキーを変更して、DLXでさらにルーティングできるようにします。

5.2. 失敗したメッセージルーティング

したがって、メッセージの配信に失敗すると、メッセージはDeadLetterExchangeにルーティングされます。 しかし、すでに述べたように、 DLXは通常の交換です。 したがって、失敗したメッセージルーティングキーが交換と一致しない場合、DLQに配信されません。

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

したがって、この例で x-dead-letter-routing-key 引数を省略すると、失敗したメッセージは無限の再試行ループでスタックします。

さらに、メッセージの元のメタ情報は、x-deathヘッダーで利用できます。

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

上記の情報は、通常ポート15672でローカルに実行されているRabbitMQ管理コンソールで利用できます。

この構成に加えて、 Spring Cloud Stream を使用している場合は、構成プロパティrepublishToDlqおよびautoBindDlqを利用して、構成プロセスを簡素化することもできます。

5.3. デッドレターエクスチェンジ

前のセクションでは、メッセージがデッドレターエクスチェンジにルーティングされるときにルーティングキーが変更されることを確認しました。 ただし、この動作が常に望ましいとは限りません。 DLXを自分で構成し、ファンアウトタイプを使用して定義することで変更できます。

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

今回はファンアウトタイプのカスタム交換を定義したので、メッセージはすべての制限付きキューに送信されます。 さらに、x-dead-letter-exchange引数の値をDLXの名前に設定しました。 同時に、x-dead-letter-routing-key引数を削除しました。

ここで、この例を実行すると、失敗したメッセージがDLQに配信されますが、最初のルーティングキーは変更されません。

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

5.4. デッドレターキューメッセージの処理

もちろん、それらをDead Letter Queueに移動した理由は、別のときに再処理できるようにするためです。

DeadLetterQueueのリスナーを定義しましょう。

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

ここでコード例を実行すると、ログ出力が表示されます。

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

失敗したメッセージが表示されましたが、次に何をすべきですか?答えは、特定のシステム要件、例外の種類、またはメッセージの種類によって異なります。

たとえば、メッセージを元の宛先に再キューイングするだけです。

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

ただし、このような例外ロジックは、デフォルトの再試行ポリシーと同じです。

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:

一般的な戦略では、メッセージの処理を n 回再試行してから、拒否する必要があります。 メッセージヘッダーを活用して、この戦略を実装しましょう。

public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

最初に、 x-retries-count ヘッダーの値を取得してから、この値を最大許容値と比較します。 その後、カウンターが試行制限数に達すると、メッセージは破棄されます。

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding message

x-message-ttl ヘッダーを使用して、メッセージを破棄するまでの時間を設定することもできることを追加する必要があります。 これは、キューが無限に大きくなるのを防ぐのに役立つ場合があります。

5.5. 駐車場キュー

一方、メッセージを破棄するだけでは不十分な状況を考えてみましょう。たとえば、銀行のドメインでのトランザクションである可能性があります。 または、メッセージを手動で処理する必要がある場合や、n回以上失敗したメッセージを記録する必要がある場合もあります。

このような状況では、駐車場キューの概念があります。 許可された回数を超えて失敗したDLQからのすべてのメッセージを転送して、さらに処理するために駐車場キューに送ることができます

このアイデアを実装しましょう:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

次に、リスナーロジックをリファクタリングして、駐車場のキューにメッセージを送信しましょう。

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

最終的には、駐車場のキューに到着したメッセージも処理する必要があります。

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}

これで、失敗したメッセージをデータベースに保存したり、電子メール通知を送信したりできます。

アプリケーションを実行して、このロジックをテストしてみましょう。

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queue

出力からわかるように、何度か試行に失敗した後、メッセージは駐車場キューに送信されました。

6. カスタムエラー処理

前のセクションでは、専用のキューと交換を使用して障害を処理する方法を説明しました。 ただし、データベースへのログ記録や永続化など、すべてのエラーをキャッチする必要がある場合があります。

6.1. グローバルErrorHandler

これまで、デフォルトの SimpleRabbitListenerContainerFactory を使用してきましたが、このファクトリはデフォルトでConditionalRejectingErrorHandlerを使用します。 このハンドラーはさまざまな例外をキャッチし、それらをAmqpException階層内の例外の1つに変換します。

接続エラーを処理する必要がある場合は、ApplicationListenerインターフェイスを実装する必要があることに注意してください。

簡単に言うと、 ConditionalRejectingErrorHandlerは、特定のメッセージを拒否するかどうかを決定します。例外の原因となったメッセージが拒否された場合、メッセージは再キューイングされません。

BusinessExceptionのみを再キューイングするカスタムErrorHandlerを定義しましょう。

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

さらに、リスナーメソッド内で例外をスローしているため、ListenerExecutionFailedExceptionでラップされています。 したがって、ソース例外を取得するには、getCauseメソッドを呼び出す必要があります。

6.2. FatalExceptionStrategy

内部的には、このハンドラーは FatalExceptionStrategy を使用して、例外を致命的と見なす必要があるかどうかを確認します。 その場合、失敗したメッセージは拒否されます。

デフォルトでは、これらの例外は致命的です。

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • MethodArgumentTypeMismatchException
  • NoSuchMethodException
  • ClassCastException

ErrorHandlerインターフェースを実装する代わりに、FatalExceptionStrategyを提供するだけです。

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}

最後に、カスタム戦略をConditionalRejectingErrorHandlerコンストラクターに渡す必要があります。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

7. 結論

このチュートリアルでは、Spring AMQP、特にRabbitMQを使用しているときにエラーを処理するさまざまな方法について説明しました。

すべてのシステムには、特定のエラー処理戦略が必要です。 イベント駆動型アーキテクチャでのエラー処理の最も一般的な方法について説明しました。 さらに、複数の戦略を組み合わせて、より包括的で堅牢なソリューションを構築できることもわかりました。

いつものように、記事の完全なソースコードは、GitHubから入手できます。