1. 序章

デフォルトでは、 Spring AMQP では、失敗したメッセージは次の消費ラウンドのために再キューイングされます。 その結果、無限の消費ループが発生し、不安定な状況とリソースの浪費を引き起こす可能性があります。

デッドレターキューの使用は失敗したメッセージを処理する標準的な方法ですが、メッセージの消費を再試行してシステムを通常の状態に戻したい場合があります。

このチュートリアルでは、 ExponentialBackoffという名前の再試行戦略を実装する2つの異なる方法を紹介します。

2. 前提条件

このチュートリアル全体を通して、人気のあるAMQP実装であるRabbitMQを使用します。 したがって、SpringでRabbitMQを構成および使用する方法の詳細については、この SpringAMQPの記事を参照してください。

簡単にするために、RabbitMQインスタンスにもdockerイメージを使用しますが、ポート5672でリッスンしているRabbitMQインスタンスならどれでも使用できます。

RabbitMQdockerコンテナを起動しましょう。

docker run -p 5672:5672 -p 15672:15672 --name rabbit rabbitmq:3-management

例を実装するには、spring-boot-starter-amqpへの依存関係を追加する必要があります。 最新バージョンはMavenCentralで入手できます。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.4.RELEASE</version>
    </dependency>
</dependencies>

3. ブロッキングウェイ

最初の方法では、 SpringRetryフィクスチャを使用します。 単純なキューと、失敗したメッセージの再試行の間にしばらく待機するように構成されたコンシューマーを作成します。

まず、キューを作成しましょう。

@Bean
public Queue blockingQueue() {
    return QueueBuilder.nonDurable("blocking-queue").build();
}

次に、 RetryOperationsInterceptor でバックオフ戦略を構成し、カスタムRabbitListenerContainerFactoryに配線します。

@Bean
public RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
      .backOffOptions(1000, 3.0, 10000)
      .maxAttempts(5)
      .recoverer(observableRecoverer())
      .build();
}

@Bean
public SimpleRabbitListenerContainerFactory retryContainerFactory(
  ConnectionFactory connectionFactory, RetryOperationsInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

上に示したように、1000msの初期間隔と3.0の乗数を構成し、最大待機時間は10000msです。 さらに、5回試行すると、メッセージはドロップされます。

コンシューマーを追加し、例外をスローして失敗したメッセージを強制しましょう。

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory")
public void consumeBlocking(String payload) throws Exception {
    logger.info("Processing message from blocking-queue: {}", payload);

    throw new Exception("exception occured!");
}

最後に、テストを作成して、2つのメッセージをキューに送信しましょう。

@Test
public void whenSendToBlockingQueue_thenAllMessagesProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    observableRecoverer.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("blocking-queue", "blocking message " + i);
    }

    latch.await();
}

CountdownLatchはテストフィクスチャとしてのみ使用されることに注意してください。

テストを実行して、ログ出力を確認しましょう。

2020-02-18 21:17:55.638  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:56.641  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:17:59.644  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:08.654  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.657  INFO : Processing message from blocking-queue: blocking message 1
2020-02-18 21:18:18.875  ERROR : java.lang.Exception: exception occured!
2020-02-18 21:18:18.858  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:19.860  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:22.863  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:31.867  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.871  INFO : Processing message from blocking-queue: blocking message 2
2020-02-18 21:18:41.875 ERROR : java.lang.Exception: exception occured!

ご覧のとおり、このログには、各再試行間の指数関数的な待機時間が正しく表示されます。 バックオフ戦略が機能している間、再試行が完了するまでコンシューマーはブロックされます。ささいな改善は、@RabbitListenerの同時実行属性を設定することでコンシューマーを同時に実行させることです。

@RabbitListener(queues = "blocking-queue", containerFactory = "retryContainerFactory", concurrency = "2")

でも、 再試行しましたメッセージは引き続きコンシューマーインスタンスをブロックします。 したがって、アプリケーションは遅延の問題に悩まされる可能性があります。

次のセクションでは、同様の戦略を実装するためのノンブロッキングの方法を紹介します。

4. ノンブロッキングの方法

別の方法には、メッセージの有効期限と組み合わせた多数の再試行キューが含まれます。 実際のところ、メッセージの有効期限が切れると、メッセージはデッドレターキューに入れられます。 つまり、 DLQコンシューマーがメッセージを元のキューに送り返す場合、基本的に再試行ループを実行しています。

その結果、使用される再試行キューの数は、発生する試行の数になります。

まず、再試行キューのデッドレターキューを作成しましょう。

@Bean
public Queue retryWaitEndedQueue() {
    return QueueBuilder.nonDurable("retry-wait-ended-queue").build();
}

再試行デッドレターキューにコンシューマーを追加しましょう。 このコンシューマーの唯一の責任は、メッセージを元のキューに送り返すことです

@RabbitListener(queues = "retry-wait-ended-queue", containerFactory = "defaultContainerFactory")
public void consumeRetryWaitEndedMessage(String payload, Message message, Channel channel) throws Exception{
    MessageProperties props = message.getMessageProperties();

    rabbitTemplate().convertAndSend(props.getHeader("x-original-exchange"), 
      props.getHeader("x-original-routing-key"), message);
}

次に、再試行キューのラッパーオブジェクトを作成しましょう。 このオブジェクトは、指数バックオフ構成を保持します。

public class RetryQueues {
    private Queue[] queues;
    private long initialInterval;
    private double factor;
    private long maxWait;

    // constructor, getters and setters

第三に、3つの再試行キューを定義しましょう。

@Bean
public Queue retryQueue1() {
    return QueueBuilder.nonDurable("retry-queue-1")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue2() {
    return QueueBuilder.nonDurable("retry-queue-2")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public Queue retryQueue3() {
    return QueueBuilder.nonDurable("retry-queue-3")
      .deadLetterExchange("")
      .deadLetterRoutingKey("retry-wait-ended-queue")
      .build();
}

@Bean
public RetryQueues retryQueues() {
    return new RetryQueues(1000, 3.0, 10000, retryQueue1(), retryQueue2(), retryQueue3());
}

次に、メッセージの消費を処理するためのインターセプターが必要です。

public class RetryQueuesInterceptor implements MethodInterceptor {

    // fields and constructor

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        return tryConsume(invocation, this::ack, (messageAndChannel, e) -> {
            try {
                int retryCount = tryGetRetryCountOrFail(messageAndChannel, e);
                sendToNextRetryQueue(messageAndChannel, retryCount);
            } catch (Throwable t) {
                // ...
                throw new RuntimeException(t);
            }
        });
    }

消費者が正常に戻ってきた場合、私たちは単にメッセージを確認します。

ただし、コンシューマーが例外をスローし、試行が残っている場合は、次の再試行キューにメッセージを送信します。

private void sendToNextRetryQueue(MessageAndChannel mac, int retryCount) throws Exception {
    String retryQueueName = retryQueues.getQueueName(retryCount);

    rabbitTemplate.convertAndSend(retryQueueName, mac.message, m -> {
        MessageProperties props = m.getMessageProperties();
        props.setExpiration(String.valueOf(retryQueues.getTimeToWait(retryCount)));
        props.setHeader("x-retried-count", String.valueOf(retryCount + 1));
        props.setHeader("x-original-exchange", props.getReceivedExchange());
        props.setHeader("x-original-routing-key", props.getReceivedRoutingKey());

        return m;
    });

    mac.channel.basicReject(mac.message.getMessageProperties()
      .getDeliveryTag(), false);
}

繰り返しになりますが、インターセプターをカスタムRabbitListenerContainerFactoryに配線しましょう。

@Bean
public SimpleRabbitListenerContainerFactory retryQueuesContainerFactory(
  ConnectionFactory connectionFactory, RetryQueuesInterceptor retryInterceptor) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);

    Advice[] adviceChain = { retryInterceptor };
    factory.setAdviceChain(adviceChain);

    return factory;
}

最後に、メインキューと、失敗したメッセージをシミュレートするコンシューマーを定義します。

@Bean
public Queue nonBlockingQueue() {
    return QueueBuilder.nonDurable("non-blocking-queue")
      .build();
}

@RabbitListener(queues = "non-blocking-queue", containerFactory = "retryQueuesContainerFactory", 
  ackMode = "MANUAL")
public void consumeNonBlocking(String payload) throws Exception {
    logger.info("Processing message from non-blocking-queue: {}", payload);

    throw new Exception("Error occured!");
}

別のテストを作成して、2つのメッセージを送信しましょう。

@Test
public void whenSendToNonBlockingQueue_thenAllMessageProcessed() throws Exception {
    int nb = 2;

    CountDownLatch latch = new CountDownLatch(nb);
    retryQueues.setObserver(() -> latch.countDown());

    for (int i = 1; i <= nb; i++) {
        rabbitTemplate.convertAndSend("non-blocking-queue", "non-blocking message " + i);
    }

    latch.await();
}

次に、テストを開始してログを確認しましょう。

2020-02-19 10:31:40.640  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:40.656  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:41.620  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:41.623  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:44.415  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:44.420  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.751  INFO : Processing message from non-blocking-queue: non blocking message 1
2020-02-19 10:31:52.774 ERROR : java.lang.Exception: Error occured!
2020-02-19 10:31:52.829  INFO : Processing message from non-blocking-queue: non blocking message 2
2020-02-19 10:31:52.841 ERROR : java.lang.Exception: Error occured!

ここでも、各再試行の間に指数関数的な待機時間があります。 ただし、すべての試行が行われるまでブロックする代わりに、メッセージは同時に処理されます

この設定は非常に柔軟性があり、遅延の問題を軽減するのに役立ちますが、一般的な落とし穴があります。 実際、 RabbitMQは、キューの先頭に到達したときにのみ期限切れのメッセージを削除します。 したがって、メッセージの有効期限が長い場合、キュー内の他のすべてのメッセージがブロックされます。 このため、応答キューには、同じ有効期限値を持つメッセージのみを含める必要があります。

4. 結論

上に示したように、イベントベースのシステムは、回復力を向上させるために指数バックオフ戦略を実装できます。 このようなソリューションの実装は簡単ですが、特定のソリューションは小規模なシステムにうまく適応できるが、高スループットのエコシステムでは遅延の問題を引き起こす可能性があることを理解することが重要です。

ソースコードは、GitHubから入手できます。