Springを使ったApache Kafkaの紹介
1概要
Apache Kafka
は分散型でフォールトトレラントなストリーム処理システムです。
この記事では、Kafkaに対するSpringのサポートと、ネイティブのKafka JavaクライアントAPIに対して提供される抽象化のレベルについて説明します。
Spring Kafkaは、
@ KafkaListener
アノテーションを介して、
KafkaTemplate
とMessage-driven POJOで、シンプルで典型的なSpringテンプレートプログラミングモデルをもたらします。
2インストールと設定
Kafkaをダウンロードしてインストールするには、公式ガイドhttps://kafka.apache.org/quickstart[here]を参照してください。 Kafkaサーバーが起動したら、次のコマンドを使用してトピック
baeldung
を作成しましょう。
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic baeldung
この記事では、サーバーがデフォルトの設定で起動され、サーバーポートが変更されていないことを前提としています。
3 Mavenの依存関係
まず、
pom.xmlに
spring-kafka__依存関係を追加する必要があります。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
このアーティファクトの最新バージョンはhttps://search.maven.org/classic/#search%7Cga%7C1%7Cg%3A%22org.springframework.kafka%22%20AND%20a%3A%22spring-kafka%にあります。 22[ここ]
4メッセージを作成する
メッセージを作成するには、まず、Kafka httpsを作成するための戦略を設定するhttp://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/ProducerFactory.html[
ProducerFactory
]を設定する必要があります。//kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Producer.html[
Producer
]インスタンス。
それから
Producer
インスタンスをラップし、Kafkaトピックにメッセージを送信するための便利なメソッドを提供するhttp://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[
KafkaTemplate
]が必要です。
Producer
インスタンスはスレッドセーフであるため、アプリケーションコンテキスト全体で単一のインスタンスを使用するとパフォーマンスが向上します。
したがって、
KakfaTemplate
インスタンスもスレッドセーフであり、1つのインスタンスの使用をお勧めします。
4.1. プロデューサー構成
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP__SERVERS__CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY__SERIALIZER__CLASS__CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE__SERIALIZER__CLASS__CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4.2. 公開メッセージ
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}
5メッセージの消費
メッセージを消費するには、
ConsumerFactory
を設定する必要があります。]
およびhttp://docs.spring.io/autorepo/docs/spring-kafka-dist/1.1.3.RELEASE/api/org/springframework/kafka/config/KafkaListenerContainerFactory.html[
KafkaListenerContainerFactory
]。
これらのBeanがSpring Beanファクトリーで使用可能になると、POJOベースのコンシューマーはhttp://docs.spring.io/autorepo/docs/spring-kafka-dist/1.1.3.RELEASE/api/org/springframework/kafkaを使用して構成できます/annotation/KafkaListener.html[
@KafkaListener
]注釈。
@EnableKafka
注釈は設定に必要ですSpringマネージドBeanの
@ KafkaListener
アノテーションの検出を有効にするクラス。
5.1. コンシューマ構成
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP__SERVERS__CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP__ID__CONFIG,
groupId);
props.put(
ConsumerConfig.KEY__DESERIALIZER__CLASS__CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE__DESERIALIZER__CLASS__CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
5.2. メッセージの消費
@KafkaListener(topics = "topicName", group = "foo")
public void listen(String message) {
System.out.println("Received Messasge in group foo: " + message);
}
-
トピックごとに複数のリスナーを実装でき、それぞれが異なるグループIDを持ちます。さらに、1人の消費者がさまざまなトピックからのメッセージを聞くことができます。
@KafkaListener(topics = "topic1, topic2", group = "foo")
Springはhttp://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/messaging/handler/annotation/Header.html[
@Header
]を使用した1つ以上のメッセージヘッダーの取得もサポートしています。リスナーのアノテーション:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED__PARTITION__ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}
** 5.3. 特定のパーティションからのメッセージの消費
お気づきかもしれませんが、トピック「
baeldung
」を作成したパーティションは1つだけです。ただし、複数のパーティションを持つトピックの場合、
@ KafkaListener
は、初期オフセットを使用してトピックの特定のパーティションに明示的に登録できます。
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED__PARTITION__ID) int partition) {
System.out.println(
"Received Messasge: " + message"
+ "from partition: " + partition);
}
このリスナーでは
initialOffset
が0に送信されているので、パーティション0と3からの以前に消費されたメッセージはすべて、このリスナーが初期化されるたびに再消費されます。オフセットを設定する必要がない場合は、
@ TopicPartition
アノテーションの
partitions
プロパティを使用して、オフセットのないパーティションのみを設定できます。
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
5.4. リスナー用のメッセージフィルタの追加
カスタムフィルタを追加することで、リスナーは特定の種類のメッセージを消費するように設定できます。これは
RecordFilterStrategy
を
KafkaListenerContainerFactory
に設定することで実行できます。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(
record -> record.value().contains("World"));
return factory;
}
このコンテナファクトリを使用するようにリスナを設定できます。
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listen(String message) {
//handle message
}
このリスナーでは、フィルタに一致するすべてのメッセージは破棄されます。
6. カスタムメッセージコンバータ
これまでのところ、メッセージとしての文字列の送受信についてのみ説明しました。
ただし、カスタムJavaオブジェクトを送受信することもできます。これには、
ProducerFactory
に適切なシリアライザを設定し、
ConsumerFactory
にデシリアライザを設定する必要があります。
メッセージとして送信する単純なBeanクラス
__、
__を見てみましょう。
public class Greeting {
private String msg;
private String name;
//standard getters, setters and constructor
}
6.1. カスタムメッセージの作成
この例では、
JsonSerializer
を使用します。
ProducerFactory
と
KafkaTemplate
のコードを見てみましょう。
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
//...
configProps.put(
ProducerConfig.VALUE__SERIALIZER__CLASS__CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
return new KafkaTemplate<>(greetingProducerFactory());
}
この新しい
KafkaTemplate
を使用して
Greeting
メッセージを送信できます。
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2. カスタムメッセージの消費
同様に、Greetingメッセージを正しくシリアル化解除するために
ConsumerFactory
と
KafkaListenerContainerFactory
を修正しましょう:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
//...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
spring-kafka JSONシリアライザおよびデシリアライザは、link-/jackson[Jackson]ライブラリを使用します。これも、spring-kafkaプロジェクトのオプションのMaven依存関係です。それでは、それを
pom.xml
に追加しましょう。
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.7</version>
</dependency>
最新バージョンのJacksonを使用する代わりに、spring-kafkaの__pom.xmlに追加されたバージョンを使用することをお勧めします。
最後に、
Greeting
メッセージを消費するリスナーを書く必要があります。
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
//process greeting message
}
7. 結論
この記事では、Apache Kafkaに対するSpringサポートの基本について説明しました。メッセージの送受信に使用されるクラスについて簡単に説明しました。
この記事の完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/spring-kafka[over on GitHub]にあります。コードを実行する前に、Kafkaサーバーが実行中で、トピックが手動で作成されていることを確認してください。