Kafka、Apache Avro、Confluent Schema Registryを使用したSpring Cloud Streamのガイド

1. 前書き

  • Apache Kafkaはメッセージングプラットフォームです。*これにより、さまざまなアプリケーション間で大規模にデータを交換できます。

    Spring Cloud Streamは、メッセージ駆動型アプリケーションを構築するためのフレームワークです。 * Kafkaのサービスへの統合を簡素化できます。*
    従来、KafkaはスキーマレジストリでサポートされているAvroメッセージ形式で使用されていました。 このチュートリアルでは、Confluent Schema Registryを使用します。 SpringのConfluent Schema Registryとの統合の実装とConfluentネイティブライブラリの両方を試します。

2. コンフルエントスキーマレジストリ

Kafkaはすべてのデータをバイトとして表します。そのため、外部スキーマを使用し、そのスキーマに従ってバイトにシリアライズおよびデシリアライズするのが一般的です。 スキーマのコピーを各メッセージで提供するのではなく、オーバーヘッドが高くなりますが、レジストリにスキーマを保持し、各メッセージでIDのみを提供することも一般的です。
Confluent Schema Registryは、スキーマを保存、取得、管理する簡単な方法を提供します。 いくつかの便利なhttps://docs.confluent.io/current/schema-registry/develop/api.html[RESTful API]を公開しています。
スキーマはサブジェクトごとに格納され、デフォルトでは、レジストリはサブジェクトに対して新しいスキーマをアップロードする前に互換性チェックを行います。
各プロデューサーは、それが作成しているスキーマを知っており、各コンシューマーは、任意の形式のデータを消費するか、読み込むことを好む特定のスキーマを持つ必要があります。 *プロデューサーはレジストリを参照して、メッセージを送信するときに使用する正しいIDを確立します。 コンシューマはレジストリを使用して、送信者のスキーマを取得します。 *
コンシューマーが送信者のスキーマと独自の望ましいメッセージ形式の両方を知っている場合、Avroライブラリはデータをコンシューマーの望ましい形式に変換できます。

3. アパッチアブロ

  • https://www.baeldung.com/java-apache-avro [Apache Avro]はデータシリアル化システム*です。

    JSON構造を使用してスキーマを定義し、バイトと構造化データ間のシリアル化を提供します。
    Avroの強みの1つは、あるバージョンのスキーマで、互換性のある代替スキーマで定義された形式に書き込まれたメッセージをサポートすることです。
    Avroツールセットは、これらのスキーマのデータ構造を表すクラスを生成することもできるため、POJOの内外でのシリアル化が容易になります。

4. プロジェクトのセットアップ

https://search.maven.org/search?q=g:org.springframework.cloud%20AND%20a:spring-cloud-dependenciesでスキーマレジストリを使用するには
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>
https://docs.confluent.io/1.0/installation.html?highlight=maven#installation-maven[Confluent's serializer]の場合、次のものが必要です。
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>
ConfluentのSerializerがレポにあります:
<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>
また、https://search.maven.org/search?q = g:org.apache.avro%20AND%20a:avro-maven-pluginを使用してみましょう
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
テストには、既存のKafkaおよびスキーマレジストリのセットアップを使用するか、https://hub.docker.com/r/confluent/kafka [dockerized Confluent and Kafka。]を使用できます。

5. 春のクラウドストリーム

プロジェクトのセットアップが完了したので、次にlink:/spring-cloud-stream[Spring Cloud Stream]を使用してプロデューサーを作成しましょう。 トピックに関する従業員の詳細を公開します。
次に、トピックからイベントを読み取り、それらをログステートメントに書き込むコンシューマーを作成します。

5.1. スキーマ

まず、従業員の詳細のスキーマを定義しましょう。 _employee-schema.avsc_という名前を付けることができます。
スキーマファイルを_src / main / resources:_に保存できます
{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}
上記のスキーマを作成したら、プロジェクトをビルドする必要があります。 次に、Apache Avroコードジェネレーターは、_com.baeldung.schema_パッケージの下に_Employee_という名前のPOJOを作成します。

5.2. プロデューサー

Spring Cloud Streamは_Processor_インターフェースを提供します。 これにより、出力チャネルと入力チャネルが提供されます。
これを使用して、_employee-details_ Kafka topic __:__に_Employee_オブジェクトを送信するプロデューサーを作成しましょう。
@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message<Employee> message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. 消費者

それでは、コンシューマーを作成しましょう。
@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}
このコンシューマは、_employee-details_トピックで公開されたイベントを読み取ります。 その出力をログに送信して、その機能を確認しましょう。

5.3. カフカバインディング

これまでは、_Processor_オブジェクトの_input_および_output_チャネルに対してのみ作業を行ってきました。 これらのチャネルは、正しい宛先で構成する必要があります。
_application.yml_を使用して、Kafkaバインディングを提供しましょう。
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
この場合、__ destination ___はKafkaトピックを意味することに注意してください。 この場合の入力ソースであるため、_destination_と呼ばれるのは少し混乱するかもしれませんが、コンシューマとプロデューサ全体で一貫した用語です。

5.4. エントリーポイント

プロデューサーとコンシューマーができたので、ユーザーから入力を取得してプロデューサーに渡すためのAPIを公開しましょう。
@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName,
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

5.5. Confluent Schema Registry and Bindingsを有効にする

最後に、アプリケーションでKafkaとスキーマレジストリバインディングの両方を適用するには、構成クラスの1つに_ @ EnableBinding_と_ @ EnableSchemaRegistryClient_を追加する必要があります。
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}
そして、_ConfluentSchemaRegistryClient_ Beanを提供する必要があります。
@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}
_endPoint_はConfluent Schema RegistryのURLです。

5.6. サービスのテスト

POSTリクエストでサービスをテストしましょう:
curl -X POST localhost:8080/employees/1001/Harry/Potter
ログから、これが機能していることがわかります。
2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. 処理中に何が起きましたか?

サンプルアプリケーションで何が起こったのかを正確に理解してみましょう。
  1. プロデューサーは_Employee_オブジェクトを使用してKafkaメッセージを作成しました

  2. プロデューサーは従業員スキーマをスキーマレジストリに登録しました
    スキーマバージョンIDを取得するには、新しいIDを作成するか、その正確なスキーマに対して既存のIDを再利用します

  3. Avroはスキーマを使用して_Employee_オブジェクトをシリアル化しました

  4. Spring Cloudはメッセージヘッダーにschema-idを入れました

  5. メッセージはトピックで公開されました

  6. メッセージがコンシューマーに届くと、スキーマIDを読み取ります
    ヘッダ

  7. コンシューマはschema-idを使用して、_Employee_スキーマを
    レジストリ

  8. 消費者は、そのオブジェクトを表すことができるローカルクラスを見つけ、
    メッセージをデシリアライズしました

6. ネイティブKafkaライブラリを使用したシリアライゼーション/デシリアライゼーション

Spring Bootは、いくつかのすぐに使えるメッセージコンバータを提供します。 *デフォルトでは、Spring Bootは_Content-Type_ヘッダーを使用して適切なメッセージコンバーターを選択します。*
この例では、_Content-Type_は_application / * avro、_です。したがって、__ AvroSchemaMessageConverter __を使用してAvro形式の読み取りと書き込みを行いました。 ただし、Confluentでは、メッセージ変換に_KafkaAvroSerializer_および_KafkaAvroDeserializer_を使用することをお勧めします*。
Springの独自のフォーマットはうまく機能しますが、パーティショニングの点でいくつかの欠点があり、Kafkaインスタンスの一部の非Springサービスが必要とするConfluent標準と相互運用できません。
Confluentコンバーターを使用するように_application.yml_を更新しましょう:
spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true
_useNativeEncoding_を有効にしました。 Spring Cloud Streamは、シリアル化を提供されたクラスに委任します。
また、_kafka.binder.producer-properties_および_kafka.binder.consumer-properties._を使用して、Spring Cloud内でKafkaのネイティブ設定プロパティを提供する方法を知っている必要があります。

7. 消費者グループとパーティション

*消費者グループは、同じアプリケーションに属する消費者のセットです*。 同じ消費者グループの消費者は同じグループ名を共有します。
_application.yml_を更新して、コンシューマグループ名を追加しましょう。
spring:
  cloud:
    stream:
      // ...
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...
すべてのコンシューマは、それらの間でトピックパーティションを均等に分散します。 異なるパーティションのメッセージは並行して処理されます。
*コンシューマーグループでは、一度にメッセージを読むコンシューマーの最大数はパーティションの数に等しくなります。*パーティションとコンシューマーの数を設定して、目的の並列処理を実現できます。 一般に、サービスのすべてのレプリカにあるコンシューマの総数よりも多くのパーティションが必要です。

7.1. パーティションキー

メッセージを処理する場合、メッセージの処理順序が重要になる場合があります。 メッセージが並行して処理される場合、処理のシーケンスを制御するのは困難です。
Kafkaは、*特定のパーティションで、メッセージは常に到着した順序で処理される*というルールを提供します。 そのため、特定のメッセージが正しい順序で処理されることが重要な場合、メッセージが互いに同じパーティションに到着するようにします。
トピックにメッセージを送信するときにパーティションキーを提供できます。 *同じパーティションキーを持つメッセージは常に同じパーティションに送られます*。 パーティションキーが存在しない場合、メッセージはラウンドロビン方式でパーティション分割されます。
これを例で理解してみましょう。 従業員に対して複数のメッセージを受信して​​おり、従業員のすべてのメッセージを順番に処理したいとします。 部門名と従業員IDは、従業員を一意に識別できます。
それでは、従業員のIDと部門名でパーティションキーを定義しましょう:
{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}
プロジェクトをビルドすると、_EmployeeKey_ POJOが_com.baeldung.schema_パッケージの下に生成されます。
_EmployeeKey_をパーティションキーとして使用するようにプロデューサーを更新しましょう。
public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message<Employee> message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}
ここでは、メッセージヘッダーにパーティションキーを配置しています。
これで、同じパーティションが同じ従業員IDと部門名を持つメッセージを受信します。

7.2消費者の並行性

Spring Cloud Streamにより、_application.yml_でコンシューマーの同時実行性を設定できます。
spring:
  cloud:
    stream:
      // ...
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3
これで、消費者はトピックから3つのメッセージを同時に読み取ります。 つまり、Springは3つの異なるスレッドを生成して、独立して消費します。

8. 結論

この記事では、AvroスキーマとConfluent Schema Registry *を使用して、* Apache Kafkaに対してプロデューサーとコンシューマーを統合しました。
これを単一のアプリケーションで実行しましたが、プロデューサーとコンシューマーは異なるアプリケーションにデプロイでき、独自のバージョンのスキーマを持つことができ、レジストリを介して同期を維持できました。
  • SpringのAvroおよびSchema Registryクライアントの実装を使用する方法を調べ、次に相互運用性の目的でシリアライズおよびデシリアライズの* Confluent標準実装*に切り替える方法を確認しました。

    最後に、トピックを分割し、メッセージの安全な並列処理を可能にする正しいメッセージキーを確保する方法を検討しました。
    この記事で使用する完全なコードは、https://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-kafka [GitHub]で見つけることができます。