1. 概要

Kafkaは、分散メッセージングキューを中心に構築されたメッセージ処理システムです。 これは、アプリケーションがKafkaトピックにデータを書き込んだりKafkaトピックからデータを読み取ったりできるようにJavaライブラリを提供します。

現在、ほとんどのビジネスドメインロジックは単体テストによって検証されているため、アプリケーションは通常、JUnitのすべてのI/O操作を模倣します。 Kafkaは、プロデューサーアプリケーションをモックするためのMockProducerも提供しています。

このチュートリアルでは、最初にKafkaプロデューサーアプリケーションを実装します。 後で、MockProducerを使用して一般的なプロデューサー操作を検証する単体テストを実装します。

2. Mavenの依存関係

プロデューサーアプリケーションを実装する前に、kafka-clientsのMaven依存関係を追加します。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>

3.  MockProducer

kafka-clients ライブラリには、Kafkaでメッセージを公開および消費するためのJavaライブラリが含まれています。 プロデューサーアプリケーションは、これらのAPIを使用して、KafkaトピックにKey-Valueレコードを送信できます。

public class KafkaProducer {

    private final Producer<String, String> producer;

    public KafkaProducer(Producer<String, String> producer) {
        this.producer = producer;
    }

    public Future<RecordMetadata> send(String key, String value) {
        ProducerRecord record = new ProducerRecord("topic_sports_news", key, value);
        return producer.send(record);
    }
}

Kafkaプロデューサーは、クライアントのライブラリにProducerインターフェースを実装する必要があります。 Kafkaは、 KafkaProducer クラスも提供します。これは、Kafkaブローカーに対してI/O操作を実行する具体的な実装です。

さらに、Kafkaは、同じ Producer インターフェイスを実装し、KafkaProducerに実装されているすべてのI/O操作をモックするMockProducerを提供します。

@Test
void givenKeyValue_whenSend_thenVerifyHistory() {

    MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().size() == 1);
}

このようなI/O操作は、 Mockito でモックすることもできますが、 MockProducerを使用すると、モックの上に実装する必要のある多くの機能にアクセスできます。そのような機能の1つは history()メソッド。  MockProducer は、 send()が呼び出されたレコードをキャッシュします。これにより、プロデューサーの公開動作を検証できます。

さらに、トピック名、パーティション、レコードキー、値などのメタデータを検証することもできます。

assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);

4. Kafkaクラスターのモック

これまでの模擬テストでは、パーティションが1つしかないトピックを想定しています。 ただし、プロデューサースレッドとコンシューマースレッドの間で最大の同時実行性を実現するために、Kafkaトピックは通常複数のパーティションに分割されます。

これにより、プロデューサーは複数のパーティションにデータを書き込むことができます。 これは通常、キーに基づいてレコードをパーティション化し、特定のキーを特定のパーティションにマッピングすることによって実現されます。

public class EvenOddPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
      byte[] valueBytes, Cluster cluster) {
        if (((String)key).length() % 2 == 0) {
            return 0;
        }
        return 1;
    }
}

このため、すべての偶数の長さのキーはパーティション「0」に公開され、同様に、奇数の長さのキーはパーティション「1」に公開されます。

MockProducer を使用すると、Kafkaクラスターを複数のパーティションでモックすることにより、このようなパーティション割り当てアルゴリズムを検証できます。

@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() 
  throws ExecutionException, InterruptedException {
    PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
    List<PartitionInfo> list = new ArrayList<>();
    list.add(partitionInfo0);
    list.add(partitionInfo1);

    Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
    this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), 
      new StringSerializer(), new StringSerializer());

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", 
      "{\"site\" : \"baeldung\"}");

    assertTrue(recordMetadataFuture.get().partition() == 1);
}

0と1の2つのパーティションを持つクラスターをモックしました。 次に、EvenOddPartitionerがレコードをパーティション1に公開することを確認できます。

5. MockProducerでのモッキングエラー

これまでのところ、Kafkaトピックにレコードを正常に送信するためにプロデューサーを嘲笑しただけです。 しかし、レコードの書き込み時に例外が発生した場合はどうなりますか?

アプリケーションは通常、クライアントに例外を再試行またはスローすることによって、このような例外を処理します。

MockProducer を使用すると、 send()中に例外をモックして、例外処理コードを検証できます。

@Test
void givenKeyValue_whenSend_thenReturnException() {
    MockProducer<String, String> mockProducer = new MockProducer<>(false, 
      new StringSerializer(), new StringSerializer())

    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
    RuntimeException e = new RuntimeException();
    mockProducer.errorNext(e);

    try {
        record.get();
    } catch (ExecutionException | InterruptedException ex) {
        assertEquals(e, ex.getCause());
    }
    assertTrue(record.isDone());
}

このコードには2つの注目すべき点があります。

まず、 MockProducer コンストラクターオートコンプリートなので間違い。 これは、 MockProducer 完了する前に入力を待つ送信() 方法。

次に、 mockProducer.errorNext(e)、を呼び出して、 MockProducerが最後のsend()呼び出しの例外を返すようにします。

6. MockProducerを使用したトランザクション書き込みのモック

Kafka 0.11は、Kafkaブローカー、プロデューサー、およびコンシューマー間のトランザクションを導入しました。 これにより、KafkaでエンドツーエンドのExactly-Onceメッセージ配信セマンティックが可能になりました。 つまり、トランザクションプロデューサーは、2フェーズコミットプロトコルを使用するブローカーにのみレコードを公開できます。

MockProducer はトランザクション書き込みもサポートしており、この動作を検証できます。

@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
    MockProducer<String, String> mockProducer = new MockProducer<>(true, 
      new StringSerializer(), new StringSerializer())

    kafkaProducer = new KafkaProducer(mockProducer);
    kafkaProducer.initTransaction();
    kafkaProducer.beginTransaction();
    Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");

    assertTrue(mockProducer.history().isEmpty());
    kafkaProducer.commitTransaction();
    assertTrue(mockProducer.history().size() == 1);
}

MockProducerは具象KafkaProducerと同じAPIもサポートしているため、トランザクションをコミットした後にのみ履歴を更新します。このようなモック動作は、アプリケーションが commitTransaction()がすべてに対して呼び出されることを検証するのに役立ちます。取引。

7. 結論

この記事では、kafka-clientライブラリのMockProducerクラスについて説明しました。 MockProducer は、具象 KafkaProducer と同じ階層を実装しているため、Kafkaブローカーを使用してすべてのI/O操作をモックできることを説明しました。

また、いくつかの複雑なモックシナリオについても説明し、 MockProducer。を使用して、例外、パーティショニング、およびトランザクションをテストすることができました。

いつものように、すべてのコード例はGitHubから入手できます。