1. 概要

Apache Kafka は、強力で分散型のフォールトトレラントなストリーム処理システムです。 前のチュートリアルでは、SpringとKafkaの操作方法を学びました。

このチュートリアルでは、前のチュートリアルに基づいて、実行中の外部Kafkaサーバーに依存しない信頼性の高い自己完結型の統合テストを作成する方法を学習します。

まず、Kafkaの組み込みインスタンスを使用および構成する方法を確認することから始めます。

次に、人気のあるフレームワークをどのように利用できるかを見ていきます テストコンテナ 私たちのテストから。

2. 依存関係

もちろん、標準のspring-kafka依存関係pom.xmlに追加する必要があります。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

次に、特にテスト用にさらに2つの依存関係が必要になります。

まず、spring-kafka-テストアーティファクトを追加します。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.6.3.RELEASE</version>
    <scope>test</scope>
</dependency>

最後に、Testcontainers Kafka依存関係を追加します。これは、 MavenCentralでも利用できます。

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.15.3</version>
    <scope>test</scope>
</dependency>

必要な依存関係がすべて構成されたので、Kafkaを使用して簡単なSpringBootアプリケーションを作成できます。

3. シンプルなKafkaプロデューサー-コンシューマーアプリケーション

このチュートリアル全体を通して、テストの焦点は、単純なプロデューサーとコンシューマーのSpringBootKafkaアプリケーションになります。

アプリケーションのエントリポイントを定義することから始めましょう。

@SpringBootApplication
public class KafkaProducerConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerConsumerApplication.class, args);
    }
}

ご覧のとおり、これは標準のSpringBootアプリケーションです。

3.1. プロデューサーの設定

次に、特定のKafkaトピックにメッセージを送信するために使用するプロデューサーbeanについて考えてみましょう。

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

上で定義したKafkaProducer Beanは、KafkaTemplateクラスの単なるラッパーです。 このクラスは、提供されたトピックへのデータの送信など、高レベルのスレッドセーフ操作を提供します。これは、sendメソッドで行うこととまったく同じです。

3.2. 消費者設定

同様に、Kafkaトピックをリッスンしてメッセージを受信する単純なコンシューマーbeanを定義します。

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }
}

私たちの単純なコンシューマーは、receiveメソッドの@KafkaListenerアノテーションを使用して、特定のトピックに関するメッセージをリッスンします。 テストからtest.topicを構成する方法については後で説明します。

さらに、receiveメソッドはメッセージの内容をBeanに格納し、ラッチ変数のカウントを減らします。 この変数は、メッセージを正常に受信したことを確認するために、後でテストから使用する単純なスレッドセーフカウンターフィールドです。

Spring Bootを使用した単純なKafkaアプリケーションが実装されたので、統合テストを作成する方法を見てみましょう。

4. テストに関する一言

一般に、クリーンな統合テストを作成する場合、制御できない、または突然動作を停止する可能性のある外部サービスに依存しないでください。これはテスト結果に悪影響を与える可能性があります。

同様に、外部サービス(この場合は実行中のKafkaブローカー)に依存している場合、テストから希望する方法でセットアップ、制御、および破棄できない可能性があります。

4.1. アプリケーションのプロパティ

テストから得られた非常に軽いアプリケーション構成プロパティのセットを使用します。

これらのプロパティは、 src / test / resources /application.ymlファイルで定義します。

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: baeldung
test:
  topic: embedded-test-topic

これは、Kafkaの組み込みインスタンスまたはローカルブローカーを操作するときに必要な最小限のプロパティセットです。

これらのほとんどは自明ですが、強調する必要があるのは、コンシューマープロパティauto-offset-reset:earliestです。このプロパティは、コンテナーが後に開始する可能性があるため、コンシューマーグループが送信するメッセージを確実に取得するようにします。送信が完了しました。

さらに、値 embedded-test-topic を使用してトピックプロパティを構成します。これは、テストで使用するトピックです。

5. EmbeddedKafkaを使用したテスト

このセクションでは、メモリ内のKafkaインスタンスを使用してテストを実行する方法を見ていきます。 これは、EmbeddedKafkaとも呼ばれます。

以前に追加した依存関係spring-kafka-testには、アプリケーションのテストに役立ついくつかの便利なユーティリティが含まれています。 特に、EmbeddedKafkaBrokerクラスが含まれています。

それを念頭に置いて、先に進み、最初の統合テストを作成しましょう。

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaIntegrationTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own simple KafkaProducer");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

テストの重要な部分を見ていきましょう。

まず、テストクラスを2つのかなり標準的なSpringアノテーションで装飾することから始めます。

  • @ Spring BootTest アノテーションは、テストがSpringアプリケーションコンテキストをブートストラップすることを保証します。
  • また、 @DirtiesContext アノテーションを使用して、このコンテキストがクリーンアップされ、異なるテスト間でリセットされるようにします。

ここで重要な部分があります。@EmbeddedKafkaアノテーションを使用して、EmbeddedKafkaBrokerのインスタンスをテストに挿入します。

さらに、組み込みのKafkaノードを構成するために使用できるいくつかのプロパティがあります。

  • partitions –これはトピックごとに使用されるパーティションの数です。 物事を素晴らしくシンプルに保つために、テストで使用するのは1つだけにします。
  • brokerProperties –Kafkaブローカーの追加プロパティ。 繰り返しになりますが、物事をシンプルに保ち、プレーンテキストリスナーとポート番号を指定します。

次に、Consumerクラスとproducerクラスを自動配線し、application.propertiesの値を使用するようにトピックを構成します。

パズルの最後のピースとして、テストトピックにメッセージを送信し、メッセージが受信され、テストトピックの名前が含まれていることを確認します。

テストを実行すると、冗長なSpring出力の中に次のように表示されます。

...
12:45:35.099 [main] INFO  c.b.kafka.embedded.KafkaProducer -
  sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic'
...
12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]
  INFO  c.b.kafka.embedded.KafkaConsumer - received payload=
  'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1,
  CreateTime = 1605267935099, serialized key size = -1, 
  serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),
  key = null, value = Sending with our own simple KafkaProducer)'

これにより、テストが正しく機能していることが確認されます。 素晴らしい! メモリ内のKafkaブローカーを使用して、自己完結型の独立した統合テストを作成する方法があります。

6. TestContainersを使用したKafkaのテスト

実際の外部サービスと、テスト目的で特別に提供されたサービスの組み込みメモリ内インスタンスとの間に小さな違いが見られる場合があります。 可能性は低いですが、テストで使用したポートが占有されていて、障害が発生している可能性もあります。

このことを念頭に置いて、このセクションでは、Testcontainersフレームワークを使用したテストに対する以前のアプローチのバリエーションを紹介します。 統合テストから、Dockerコンテナ内でホストされている外部ApacheKafkaブローカーをインスタンス化して管理する方法を説明します。

前のセクションで見たものと非常によく似た別の統合テストを定義しましょう。

@RunWith(SpringRunner.class)
@Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = KafkaProducerConsumerApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

    @ClassRule
    public static KafkaContainer kafka = 
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {
        producer.send(topic, "Sending with own controller");
        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        
        assertThat(consumer.getLatch().getCount(), equalTo(0L));
        assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
    }
}

違いを見てみましょう。 標準のJUnit@ClassRuleであるkafkaフィールドを宣言しています。 このフィールドは、Kafkaを実行しているコンテナのライフサイクルを準備および管理するKafkaContainerクラスのインスタンスです。

ポートの衝突を回避するために、Testcontainersは、Dockerコンテナーの起動時にポート番号を動的に割り当てます。

このため、クラス KafkaTestContainersConfiguration を使用して、カスタムのコンシューマーおよびプロデューサーのファクトリ構成を提供します。

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
    // more standard configuration
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    // more standard configuration
    return new DefaultKafkaProducerFactory<>(configProps);
}

次に、テストの開始時に@Importアノテーションを介してこの構成を参照します。

これは、前述のように動的に生成されるサーバーアドレスをアプリケーションに挿入する方法が必要なためです。

これを実現するには、getBootstrapServers()メソッドを呼び出します。これにより、ブートストラップサーバーの場所が返されます

bootstrap.servers = [PLAINTEXT://localhost:32789]

テストを実行すると、Testcontainersがいくつかのことを実行していることがわかります。

  • ローカルのDockerセットアップを確認します
  • 必要に応じて、 confluentinc / cp-kafka:5.4.3Dockerイメージをプルします
  • 新しいコンテナを起動し、準備が整うのを待ちます
  • 最後に、テストが終了した後、シャットダウンしてコンテナを削除します

繰り返しますが、これはテスト出力を調べることによって確認されます。

13:33:10.396 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Creating container for image: confluentinc/cp-kafka:5.4.3
13:33:10.454 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3
13:33:10.785 [main] INFO  ? [confluentinc/cp-kafka:5.4.3]
  - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

プレスト! Kafkadockerコンテナを使用した統合テスト。

7. 結論

この記事では、KafkaアプリケーションをSpring Bootでテストするためのいくつかのアプローチについて学びました。

最初のアプローチでは、ローカルのメモリ内Kafkaブローカーを構成して使用する方法を確認しました。

次に、Testcontainersを使用して、テストからDockerコンテナー内で実行される外部Kafkaブローカーをセットアップする方法を確認しました。

いつものように、記事の完全なソースコードは、GitHubから入手できます。