1. 序章

Apache Kafkaでのメッセージの送信中に、クライアントとサーバーは共通の構文形式の使用に同意します。 Apache Kafkaは、デフォルトのコンバーター(StringLongなど)を提供しますが、特定のユースケース向けのカスタムシリアライザーもサポートします。 このチュートリアルでは、それらを実装する方法を説明します。

2. ApacheKafkaのシリアライザー

シリアル化は、オブジェクトをバイトに変換するプロセスです。 デシリアライズは逆のプロセスであり、バイトのストリームをオブジェクトに変換します。 一言で言えば、それはコンテンツを読み取り可能で解釈可能な情報に変換します。

前述したように、Apache Kafkaは、いくつかの基本的なタイプのデフォルトのシリアライザーを提供し、カスタムシリアライザーを実装できるようにします。

 

上の図は、ネットワークを介してKafkaトピックにメッセージを送信するプロセスを示しています。 このプロセスでは、プロデューサーがメッセージをトピックに送信する前に、カスタムシリアライザーがオブジェクトをバイトに変換します。 同様に、デシリアライザーがバイトをオブジェクトに変換して戻し、コンシューマーが適切に処理できるようにする方法も示しています。

2.1. カスタムシリアライザー

Apache Kafkaは、いくつかの基本的なタイプに対応するビルド済みのシリアライザーとデシリアライザーを提供します。

ただし、カスタム(デ)シリアライザーを実装する機能も提供します。 独自のオブジェクトをシリアル化するために、 Serializer インターフェイスを実装します。 同様に、カスタムデシリアライザーを作成するには、デシリアライザーインターフェイスを実装します。

両方のインターフェイスでオーバーライドできるメソッドがあります。

  • configure :構成の詳細を実装するために使用されます
  • シリアル化/逆シリアル化これらのメソッドには、カスタムのシリアル化と逆シリアル化の実際の実装が含まれます。
  • close :このメソッドを使用してKafkaセッションを閉じます

3. ApacheKafkaでのカスタムシリアライザーの実装

Apache Kafkaは、シリアライザーをカスタマイズする機能を提供します。 メッセージ値だけでなく、キーに対しても特定のコンバーターを実装することが可能です。

3.1. 依存関係

例を実装するには、 Kafka ConsumerAPIの依存関係をpom.xmlに追加するだけです。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

3.2. カスタムシリアライザー

まず、 Lombok を使用して、Kafkaを介して送信するカスタムオブジェクトを指定します。

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDto {
    private String message;
    private String version;
}

次に、プロデューサーがメッセージを送信するためにKafkaが提供するSerializerインターフェイスを実装します。

@Slf4j
public class CustomSerializer implements Serializer {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, MessageDto data) {
        try {
            if (data == null){
                System.out.println("Null received at serializing");
                return null;
            }
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

インターフェースのシリアル化メソッドをオーバーライドします。 したがって、この実装では、 JacksonObjectMapperを使用してカスタムオブジェクトを変換します。 次に、バイトのストリームを返し、メッセージをネットワークに適切に送信します。

3.3. カスタムデシリアライザー

同様に、コンシューマー用にDeserializerインターフェースを実装します。

@Slf4j
public class CustomDeserializer implements Deserializer<MessageDto> {
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public MessageDto deserialize(String topic, byte[] data) {
        try {
            if (data == null){
                System.out.println("Null received at deserializing");
                return null;
            }
            System.out.println("Deserializing...");
            return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to MessageDto");
        }
    }

    @Override
    public void close() {
    }
}

前のセクションと同様に、インターフェイスの逆シリアル化メソッドをオーバーライドします。 したがって、同じ Jackson ObjectMapper を使用して、バイトのストリームをカスタムオブジェクトに変換します。

3.4. サンプルメッセージの消費

カスタムシリアライザーとデシリアライザーを使用してサンプルメッセージを送受信する実際の例を見てみましょう。

まず、Kafkaプロデューサーを作成して構成します。

private static KafkaProducer<String, MessageDto> createKafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");

    return new KafkaProducer(props);
}

カスタムクラスを使用して値シリアライザープロパティを構成し、デフォルトStringSerializerを使用してキーシリアライザーを構成します。

次に、Kafkaコンシューマーを作成します。

private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");

    return new KafkaConsumer<>(props);
}

カスタムクラスを使用したキーと値のデシリアライザーに加えて、グループIDを含める必要があります。 それとは別に、コンシューマーが開始する前にプロデューサーがすべてのメッセージを送信したことを確認するために、自動オフセットリセット構成を最も早いに設定しました。

プロデューサークライアントとコンシューマークライアントを作成したら、次のメッセージの例を送信します。

MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();

KafkaProducer<String, MessageDto> producer = createKafkaProducer();
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
System.out.println("Message sent " + msgProd);
producer.close();

そして、トピックを購読することで、消費者とのメッセージを受け取ることができます。

AtomicReference<MessageDto> msgCons = new AtomicReference<>();

KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
consumer.subscribe(Arrays.asList(TOPIC));

ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
    msgCons.set(record.value());
    System.out.println("Message received " + record.value());
});

consumer.close();

コンソールでの結果は次のとおりです。

Serializing...
Message sent MessageDto(message=test, version=1.0)
Deserializing...
Message received MessageDto(message=test, version=1.0)

4. 結論

このチュートリアルでは、プロデューサーがApacheKafkaのシリアライザーを使用してネットワーク経由でメッセージを送信する方法を示しました。 同様に、消費者が受信したメッセージを解釈するためにデシリアライザーを使用する方法も示しました。

さらに、利用可能なデフォルトのシリアライザー、そして最も重要なこととして、カスタムシリアライザーとデシリアライザーを実装する機能について学びました。

いつものように、コードはGitHubから入手できます。