1. 概要

このチュートリアルでは、KafkaConsumer実装の1つであるMockConsumerについて説明します。

最初に、Kafka Consumerをテストするときに考慮すべき主な事項について説明します。 次に、MockConsumerを使用してテストを実装する方法を説明します。

2. Kafkaコンシューマーのテスト

Kafkaからのデータの消費は、2つの主要なステップで構成されています。 まず、トピックをサブスクライブするか、トピックパーティションを手動で割り当てる必要があります。 次に、pollメソッドを使用してレコードのバッチをポーリングします。

ポーリングは通常、無限ループで実行されます。 これは、通常、データを継続的に消費したいためです。

たとえば、サブスクリプションとポーリングループだけで構成される単純な消費ロジックについて考えてみましょう。

void consume() {
    try {
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            records.forEach(record -> processRecord(record));
        }
    } catch (WakeupException ex) {
        // ignore for shutdown
    } catch (RuntimeException ex) {
        // exception handling
    } finally {
        consumer.close();
    }
}

上記のコードを見ると、テストできることがいくつかあることがわかります。

  • サブスクリプション
  • ポーリングループ
  • 例外処理
  • コンシューマーが正しく閉じられた場合

消費ロジックをテストするための複数のオプションがあります。

メモリ内のKafkaインスタンスを使用できます。 ただし、このアプローチにはいくつかの欠点があります。 一般に、メモリ内のKafkaインスタンスは、テストを非常に重く、遅くします。 さらに、設定は簡単な作業ではなく、不安定なテストにつながる可能性があります。

または、モックフレームワークを使用してモックを作成することもできます。 消費者。 このアプローチを使用するとテストが軽量になりますが、設定には多少注意が必要です。

最後のオプションであり、おそらく最良の方法は、テスト用のConsumer実装であるMockConsumerを使用することです。 軽量テストの構築に役立つだけでなく、セットアップも簡単です

それが提供する機能を見てみましょう。

3. MockConsumerを使用する

MockConsumerは、 kafka-クライアントライブラリは提供しますしたがって、 多くのコードを記述しなくても、実際のコンシューマーの動作全体を模倣します。

MockConsumerの使用例を見てみましょう。 特に、コンシューマーアプリケーションのテスト中に遭遇する可能性のあるいくつかの一般的なシナリオを取り上げ、MockConsumerを使用してそれらを実装します。

この例では、Kafkaトピックからの国の人口の更新を消費するアプリケーションについて考えてみましょう。 更新には、国の名前とその現在の人口のみが含まれます。

class CountryPopulation {

    private String country;
    private Integer population;

    // standard constructor, getters and setters
}

私たちのコンシューマーは、Kafka Consumer インスタンスを使用して更新をポーリングし、それらを処理し、最後にcommitSyncメソッドを使用してオフセットをコミットします。

public class CountryPopulationConsumer {

    private Consumer<String, Integer> consumer;
    private java.util.function.Consumer<Throwable> exceptionConsumer;
    private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;

    // standard constructor

    void startBySubscribing(String topic) {
        consume(() -> consumer.subscribe(Collections.singleton(topic)));
    }

    void startByAssigning(String topic, int partition) {
        consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
    }

    private void consume(Runnable beforePollingTask) {
        try {
            beforePollingTask.run();
            while (true) {
                ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
                StreamSupport.stream(records.spliterator(), false)
                    .map(record -> new CountryPopulation(record.key(), record.value()))
                    .forEach(countryPopulationConsumer);
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println("Shutting down...");
        } catch (RuntimeException ex) {
            exceptionConsumer.accept(ex);
        } finally {
            consumer.close();
        }
    }

    public void stop() {
        consumer.wakeup();
    }
}

3.1. MockConsumerインスタンスの作成

次に、MockConsumerのインスタンスを作成する方法を見てみましょう。

@BeforeEach
void setUp() {
    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    updates = new ArrayList<>();
    countryPopulationConsumer = new CountryPopulationConsumer(consumer, 
      ex -> this.pollException = ex, updates::add);
}

基本的に、提供する必要があるのはオフセットリセット戦略だけです。

アップデートを使用して、countryPopulationConsumerが受信するレコードを収集することに注意してください。 これは、期待される結果を主張するのに役立ちます。

同様に、 pollException を使用して、例外を収集してアサートします。

すべてのテストケースで、上記の設定方法を使用します。 それでは、コンシューマーアプリケーションのいくつかのテストケースを見てみましょう。

3.2. トピックパーティションの割り当て

まず、startByAssigningメソッドのテストを作成しましょう。

@Test
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

まず、 MockConsumer。 まず、を使用して消費者にレコードを追加します addRecord 方法

最初に覚えておくべきことは、トピックを割り当てたりサブスクライブしたりする前にレコードを追加することはできないということです。 そのため、 schedulePollTask method を使用してポーリングタスクをスケジュールします。スケジュールするタスクは、レコードがフェッチされる前の最初のポーリングで実行されます。 したがって、レコードの追加は、割り当てが行われた後に行われます。

同様に重要なのは、トピックとそれに割り当てられたパーティションに属していないMockConsumerレコードに追加できないことです

次に、コンシューマーが無期限に実行されないようにするために、2回目のポーリングでシャットダウンするように構成します。

さらに、開始オフセットを設定する必要があります。 これは、updateBeginningOffsetsメソッドを使用して行います。

最後に、アップデートが正しく消費されたかどうかを確認し、コンシューマーを閉じます。

3.3. トピックの購読

それでは、startBySubscribingメソッドのテストを作成しましょう。

@Test
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> {
        consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
        consumer.addRecord(record("Romania", 1000, TOPIC, 0));
    });
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(updates).hasSize(1);
    assertThat(consumer.closed()).isTrue();
}

この場合、レコードを追加する前に最初に行うことは、リバランスです。 これを行うには、リバランスをシミュレートするリバランスメソッドを呼び出します。

残りはstartByAssigningテストケースと同じです。

3.4. ポーリングループの制御

ポーリングループは複数の方法で制御できます。

最初のオプションは、上記のテストで行ったように、ポーリングタスクをスケジュールすることです。 これは、 schedulePollTask、を介して行います。これは、Runnableをパラメーターとして受け取ります。 私たちがスケジュールする各タスクは、pollメソッドを呼び出すときに実行されます。

2番目のオプションは、ウェイクアップメソッドを呼び出すことです。 通常、これは長いポーリング呼び出しを中断する方法です。 実際、これがCountryPopulationConsumer。stopメソッドを実装した方法です。

最後に、setPollException メソッドを使用して、スローされる例外を設定できます。

@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
    // GIVEN
    consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
    consumer.schedulePollTask(() -> countryPopulationConsumer.stop());

    HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
    TopicPartition tp = new TopicPartition(TOPIC, 0);
    startOffsets.put(tp, 0L);
    consumer.updateBeginningOffsets(startOffsets);

    // WHEN
    countryPopulationConsumer.startBySubscribing(TOPIC);

    // THEN
    assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
    assertThat(consumer.closed()).isTrue();
}

3.5. モッキングエンドオフセットとパーティション情報

消費ロジックがエンドオフセットまたはパーティション情報に基づいている場合は、MockConsumerを使用してこれらをモックすることもできます。

終了オフセットをモックする場合は、addEndOffsetsおよびupdateEndOffsetsメソッドを使用できます。

また、パーティション情報をモックしたい場合は、updatePartitionsメソッドを使用できます。

4. 結論

この記事では、MockConsumerを使用してKafkaコンシューマーアプリケーションをテストする方法について説明しました。

最初に、消費者ロジックの例を見てきました。これはテストに不可欠な部分です。 次に、MockConsumerを使用して単純なKafkaコンシューマーアプリケーションをテストしました。

その過程で、MockConsumerの機能とその使用方法を確認しました。

いつものように、これらのコードサンプルはすべてGitHub利用できます。