1. 序章
Apache Kafkaでのメッセージの送信中に、クライアントとサーバーは共通の構文形式の使用に同意します。 Apache Kafkaは、デフォルトのコンバーター(StringやLongなど)を提供しますが、特定のユースケース向けのカスタムシリアライザーもサポートします。 このチュートリアルでは、それらを実装する方法を説明します。
2. ApacheKafkaのシリアライザー
シリアル化は、オブジェクトをバイトに変換するプロセスです。 デシリアライズは逆のプロセスであり、バイトのストリームをオブジェクトに変換します。 一言で言えば、それはコンテンツを読み取り可能で解釈可能な情報に変換します。
前述したように、Apache Kafkaは、いくつかの基本的なタイプのデフォルトのシリアライザーを提供し、カスタムシリアライザーを実装できるようにします。
上の図は、ネットワークを介してKafkaトピックにメッセージを送信するプロセスを示しています。 このプロセスでは、プロデューサーがメッセージをトピックに送信する前に、カスタムシリアライザーがオブジェクトをバイトに変換します。 同様に、デシリアライザーがバイトをオブジェクトに変換して戻し、コンシューマーが適切に処理できるようにする方法も示しています。
2.1. カスタムシリアライザー
Apache Kafkaは、いくつかの基本的なタイプに対応するビルド済みのシリアライザーとデシリアライザーを提供します。
ただし、カスタム(デ)シリアライザーを実装する機能も提供します。 独自のオブジェクトをシリアル化するために、 Serializer インターフェイスを実装します。 同様に、カスタムデシリアライザーを作成するには、デシリアライザーインターフェイスを実装します。
両方のインターフェイスでオーバーライドできるメソッドがあります。
- configure :構成の詳細を実装するために使用されます
- シリアル化/逆シリアル化:これらのメソッドには、カスタムのシリアル化と逆シリアル化の実際の実装が含まれます。
- close :このメソッドを使用してKafkaセッションを閉じます
3. ApacheKafkaでのカスタムシリアライザーの実装
Apache Kafkaは、シリアライザーをカスタマイズする機能を提供します。 メッセージ値だけでなく、キーに対しても特定のコンバーターを実装することが可能です。
3.1. 依存関係
例を実装するには、 Kafka ConsumerAPIの依存関係をpom.xmlに追加するだけです。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
3.2. カスタムシリアライザー
まず、 Lombok を使用して、Kafkaを介して送信するカスタムオブジェクトを指定します。
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
private String message;
private String version;
}
次に、プロデューサーがメッセージを送信するためにKafkaが提供するSerializerインターフェイスを実装します。
@Slf4j
public class CustomSerializer implements Serializer {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, MessageDto data) {
try {
if (data == null){
System.out.println("Null received at serializing");
return null;
}
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
インターフェースのシリアル化メソッドをオーバーライドします。 したがって、この実装では、 JacksonObjectMapperを使用してカスタムオブジェクトを変換します。 次に、バイトのストリームを返し、メッセージをネットワークに適切に送信します。
3.3. カスタムデシリアライザー
同様に、コンシューマー用にDeserializerインターフェースを実装します。
@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public MessageDto deserialize(String topic, byte[] data) {
try {
if (data == null){
System.out.println("Null received at deserializing");
return null;
}
System.out.println("Deserializing...");
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
} catch (Exception e) {
throw new SerializationException("Error when deserializing byte[] to MessageDto");
}
}
@Override
public void close() {
}
}
前のセクションと同様に、インターフェイスの逆シリアル化メソッドをオーバーライドします。 したがって、同じ Jackson ObjectMapper を使用して、バイトのストリームをカスタムオブジェクトに変換します。
3.4. サンプルメッセージの消費
カスタムシリアライザーとデシリアライザーを使用してサンプルメッセージを送受信する実際の例を見てみましょう。
まず、Kafkaプロデューサーを作成して構成します。
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");
return new KafkaProducer(props);
}
カスタムクラスを使用して値シリアライザープロパティを構成し、デフォルトStringSerializerを使用してキーシリアライザーを構成します。
次に、Kafkaコンシューマーを作成します。
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");
return new KafkaConsumer<>(props);
}
カスタムクラスを使用したキーと値のデシリアライザーに加えて、グループIDを含める必要があります。 それとは別に、コンシューマーが開始する前にプロデューサーがすべてのメッセージを送信したことを確認するために、自動オフセットリセット構成を最も早いに設定しました。
プロデューサークライアントとコンシューマークライアントを作成したら、次のメッセージの例を送信します。
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();
そして、トピックを購読することで、消費者とのメッセージを受け取ることができます。
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
msgCons.set(record.value());
System.out.println("Message received " + record.value());
});
consumer.close();
コンソールでの結果は次のとおりです。
Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)
4. 結論
このチュートリアルでは、プロデューサーがApacheKafkaのシリアライザーを使用してネットワーク経由でメッセージを送信する方法を示しました。 同様に、消費者が受信したメッセージを解釈するためにデシリアライザーを使用する方法も示しました。
さらに、利用可能なデフォルトのシリアライザー、そして最も重要なこととして、カスタムシリアライザーとデシリアライザーを実装する機能について学びました。
いつものように、コードはGitHubでから入手できます。