1. 概要

Apache Kafka は、分散型でフォールトトレラントなストリーム処理システムです。

このチュートリアルでは、KafkaのSpringサポートと、ネイティブKafkaJavaクライアントAPIを介して提供される抽象化のレベルについて説明します。

Spring Kafkaは、KafkaTemplate@KafkaListenerアノテーションを介したメッセージ駆動型POJOを備えたシンプルで典型的なSpringテンプレートプログラミングモデルをもたらします。

2. インストールとセットアップ

Kafkaをダウンロードしてインストールするには、公式ガイドこちらを参照してください。

また、spring-kafka依存関係をpom.xmlに追加する必要があります。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

このアーティファクトの最新バージョンはここにあります。

サンプルアプリケーションは、SpringBootアプリケーションになります。

この記事では、サーバーがデフォルト構成を使用して起動され、サーバーポートが変更されていないことを前提としています。

3. トピックの構成

以前は、コマンドラインツールを実行してKafkaでトピックを作成していました。

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

しかし、Kafkaに AdminClient が導入されたことで、プログラムでトピックを作成できるようになりました。

KafkaAdmin Spring Beanを追加する必要があります。これにより、タイプNewTopicのすべてのBeanのトピックが自動的に追加されます。

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("baeldung", 1, (short) 1);
    }
}

4. メッセージの作成

メッセージを作成するには、最初にProducerFactoryを構成する必要があります。 これにより、Kafka Producerインスタンスを作成するための戦略が設定されます。

次に、 KafkaTemplate が必要です。これは、 Producer インスタンスをラップし、Kafkaトピックにメッセージを送信するための便利なメソッドを提供します。

Producerインスタンスはスレッドセーフです。 したがって、アプリケーションコンテキスト全体で単一のインスタンスを使用すると、パフォーマンスが向上します。 したがって、 KakfaTemplate インスタンスもスレッドセーフであり、1つのインスタンスを使用することをお勧めします。

4.1. プロデューサー構成

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. メッセージの公開

KafkaTemplateクラスを使用してメッセージを送信できます。

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

送信APIはListenableFutureオブジェクトを返します。送信スレッドをブロックして送信メッセージに関する結果を取得する場合は、ListenableFuture[のgetAPIを呼び出すことができます。 X199X]オブジェクト。 スレッドは結果を待ちますが、プロデューサーの速度が低下します。

Kafkaは高速ストリーム処理プラットフォームです。 したがって、後続のメッセージが前のメッセージの結果を待たないように、結果を非同期で処理することをお勧めします。

これは、コールバックを介して行うことができます。

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. メッセージの消費

5.1. コンシューマー構成

メッセージを消費するには、ConsumerFactoryKafkaListenerContainerFactoryを構成する必要があります。 これらのBeanがSpringBeanファクトリで利用可能になると、@KafkaListenerアノテーションを使用してPOJOベースのコンシューマーを構成できます。

@EnableKafkaアノテーションは、Spring管理のBeanで@KafkaListenerアノテーションを検出できるようにするために、構成クラスに必要です

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. メッセージの消費

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

トピックに複数のリスナーを実装でき、それぞれに異なるグループIDがあります。 さらに、1人の消費者はさまざまなトピックからのメッセージを聞くことができます。

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Springは、リスナーで@Headerアノテーションを使用した1つ以上のメッセージヘッダーの取得もサポートしています。

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. 特定のパーティションからのメッセージの消費

トピックbaeldungを1つのパーティションのみで作成したことに注意してください。

ただし、複数のパーティションを持つトピックの場合、 @KafkaListener は、最初のオフセットを使用してトピックの特定のパーティションを明示的にサブスクライブできます。

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

このリスナーではinitialOffsetが0に設定されているため、このリスナーが初期化されるたびに、パーティション0および3から以前に消費されたすべてのメッセージが再消費されます。

オフセットを設定する必要がない場合は、@TopicPartitionアノテーションのpartitionsプロパティを使用して、オフセットのないパーティションのみを設定できます。

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. リスナー用のメッセージフィルターの追加

We can configure listeners to consume specific message content by adding a custom filter. これは、RecordFilterStrategyKafkaListenerContainerFactoryに設定することで実行できます。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

次に、このコンテナファクトリを使用するようにリスナーを構成できます。

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

このリスナーでは、フィルターに一致するすべてのメッセージが破棄されます。

6. カスタムメッセージコンバータ

これまでは、文字列をメッセージとして送受信する方法についてのみ説明してきました。 ただし、カスタムJavaオブジェクトを送受信することもできます。 これには、 ProducerFactory で適切なシリアライザーを構成し、ConsumerFactoryでデシリアライザーを構成する必要があります。

メッセージとして送信する単純なBeanクラスを見てみましょう。

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

6.1. カスタムメッセージの作成

この例では、JsonSerializerを使用します。

ProducerFactoryKafkaTemplateのコードを見てみましょう。

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

この新しいKafkaTemplateを使用して、Greetingメッセージを送信できます。

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. カスタムメッセージの消費

同様に、ConsumerFactoryKafkaListenerContainerFactoryを変更して、グリーティングメッセージを正しく逆シリアル化します。

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSONシリアライザーおよびデシリアライザーは、 Jackson ライブラリーを使用します。これは、spring-kafkaプロジェクトのオプションのMaven依存関係でもあります。

それでは、それをpom.xmlに追加しましょう。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

最新バージョンのJacksonを使用する代わりに、spring-kafkaのpom.xmlに追加されたバージョンを使用することをお勧めします。

最後に、Greetingメッセージを消費するリスナーを作成する必要があります。

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. Multi-Method Listeners

Let’s now see how we can configure our application to send various kinds of objects to the same topic and then consume them.

First, we’ll add a new class, Farewell:

public class Farewell {

    private String message;
    private Integer remainingMinutes;

    // standard getters, setters and constructor
}

We’ll need some extra configuration to be able to send both Greeting and Farewell objects to the same topic.

7.1. Set Mapping Types in the Producer

In the producer, we have to configure the JSON type mapping:

configProps.put(JsonSerializer.TYPE_MAPPINGS, "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");

This way, the library will fill in the type header with the corresponding class name.

As a result, the ProducerFactory and KafkaTemplate look like this:

@Bean
public ProducerFactory<String, Object> multiTypeProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(JsonSerializer.TYPE_MAPPINGS, 
      "greeting:com.baeldung.spring.kafka.Greeting, farewell:com.baeldung.spring.kafka.Farewell");
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> multiTypeKafkaTemplate() {
    return new KafkaTemplate<>(multiTypeProducerFactory());
}

We can use this KafkaTemplate to send a Greeting, Farewell, or any Object to the topic:

multiTypeKafkaTemplate.send(multiTypeTopicName, new Greeting("Greetings", "World!"));
multiTypeKafkaTemplate.send(multiTypeTopicName, new Farewell("Farewell", 25));
multiTypeKafkaTemplate.send(multiTypeTopicName, "Simple string message");

7.2. Use a Custom MessageConverter in the Consumer

To be able to deserialize the incoming message, we’ll need to provide our Consumer with a custom MessageConverter.

Behind the scene, the MessageConverter relies on a Jackson2JavaTypeMapper. By default, the mapper infers the type of the received objects: on the contrary, we need to tell it explicitly to use the type header to determine the target class for deserialization:

typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);

We also need to provide the reverse mapping information. Finding “greeting” in the type header identifies a Greeting object, whereas “farewell” corresponds to a Farewell object:

Map<String, Class<?>> mappings = new HashMap<>(); 
mappings.put("greeting", Greeting.class);
mappings.put("farewell", Farewell.class);
typeMapper.setIdClassMapping(mappings);

Lastly, we need to configure the packages trusted by the mapper. We have to make sure that it contains the location of the target classes:

typeMapper.addTrustedPackages("com.baeldung.spring.kafka");

As a result, here is the final definition of this MessageConverter:

@Bean
public RecordMessageConverter multiTypeConverter() {
    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("com.baeldung.spring.kafka");
    Map<String, Class<?>> mappings = new HashMap<>();
    mappings.put("greeting", Greeting.class);
    mappings.put("farewell", Farewell.class);
    typeMapper.setIdClassMapping(mappings);
    converter.setTypeMapper(typeMapper);
    return converter;
}

We now need to tell our ConcurrentKafkaListenerContainerFactory to use the MessageConverter and a rather basic ConsumerFactory:

@Bean
public ConsumerFactory<String, Object> multiTypeConsumerFactory() {
    HashMap<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(multiTypeConsumerFactory());
    factory.setMessageConverter(multiTypeConverter());
    return factory;
}

7.3. Use @KafkaHandler in the Listener

Last but not least, in our KafkaListener, we’ll create a handler method to retrieve every possible kind of object. Each handler will need to be annotated with @KafkaHandler.

As a final note, let’s point out that we can also define a default handler for objects that can’t be bound to one of the Greeting or Farewell class:

@Component
@KafkaListener(id = "multiGroup", topics = "multitype")
public class MultiTypeKafkaListener {

    @KafkaHandler
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }

    @KafkaHandler
    public void handleF(Farewell farewell) {
        System.out.println("Farewell received: " + farewell);
    }

    @KafkaHandler(isDefault = true)
    public void unknown(Object object) {
        System.out.println("Unkown type received: " + object);
    }
}

8. 結論

この記事では、ApacheKafkaのSpringサポートの基本について説明しました。 メッセージの送受信に使用されるクラスについて簡単に説明しました。

この記事の完全なソースコードは、GitHubにあります。

コードを実行する前に、Kafkaサーバーが実行されていること、およびトピックが手動で作成されていることを確認してください。