1. 序章

Apache Kafkaはメッセージングプラットフォームです。これを使用すると、さまざまなアプリケーション間で大規模なデータ交換を行うことができます。

Spring Cloud Streamは、メッセージ駆動型アプリケーションを構築するためのフレームワークです。 Kafkaのサービスへの統合を簡素化できます。

従来、KafkaはスキーマレジストリでサポートされているAvroメッセージ形式で使用されていました。 このチュートリアルでは、ConfluentSchemaRegistryを使用します。 SpringによるConfluentスキーマレジストリとの統合の実装と、Confluentネイティブライブラリの両方を試してみます。

2. コンフルエントなスキーマレジストリ

Kafkaはすべてのデータをバイトとして表すため、外部スキーマを使用し、そのスキーマに従ってバイトにシリアル化および逆シリアル化するのが一般的です。 コストのかかるオーバーヘッドとなる各メッセージでそのスキーマのコピーを提供するのではなく、スキーマをレジストリに保持し、各メッセージでIDのみを提供することも一般的です。

Confluent Schema Registryは、スキーマを保存、取得、および管理するための簡単な方法を提供します。 いくつかの便利なRESTfulAPIを公開しています。

スキーマはサブジェクトごとに保存され、デフォルトでは、レジストリは、サブジェクトに対して新しいスキーマをアップロードできるようにする前に、互換性チェックを実行します。

各プロデューサーは、それが作成しているスキーマを知っているはずであり、各コンシューマーは、任意の形式のデータを消費できるか、または読み込むことを好む特定のスキーマを持っている必要があります。 プロデューサーはレジストリを参照して、メッセージの送信時に使用する正しいIDを確立します。 コンシューマーはレジストリを使用して送信者のスキーマをフェッチします。 

コンシューマーが送信者のスキーマと独自の目的のメッセージ形式の両方を知っている場合、Avroライブラリはデータを消費者の目的の形式に変換できます。

3. Apache Avro

ApacheAvroはデータシリアル化システムです。

JSON構造を使用してスキーマを定義し、バイトと構造化データ間のシリアル化を提供します。

Avroの強みの1つは、あるバージョンのスキーマで記述されたメッセージを、互換性のある代替スキーマで定義された形式に進化させることをサポートしていることです。

Avroツールセットは、これらのスキーマのデータ構造を表すクラスを生成することもできるため、POJOの内外でのシリアル化が容易になります。

4. プロジェクトの設定

Spring Cloud Stream でスキーマレジストリを使用するには、 Spring Cloud KafkaBinderschemaレジストリMavenの依存関係が必要です。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-schema</artifactId>
</dependency>

Confluentのシリアライザーの場合、次のものが必要です。

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>4.0.0</version>
</dependency>

そして、Confluentのシリアライザーは彼らのリポジトリにあります:

<repositories>
    <repository>
        <id>confluent</id>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

また、Mavenプラグインを使用してAvroクラスを生成しましょう。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <id>schemas</id>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                        <goal>protocol</goal>
                        <goal>idl-protocol</goal>
                    </goals>
                    <configuration>                        
                        <sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

テストには、既存のKafkaとSchema Registryのセットアップを使用するか、dockerizedConfluentとKafkaを使用できます。

5. 春のクラウドストリーム

プロジェクトの設定が完了したので、次に Spring CloudStreamを使用してプロデューサーを作成しましょう。 トピックに関する従業員の詳細を公開します。

次に、トピックからイベントを読み取り、ログステートメントに書き出すコンシューマーを作成します。

5.1. スキーマ

まず、従業員の詳細のスキーマを定義しましょう。 employee-schema.avscという名前を付けることができます。

スキーマファイルはsrc/ main / resources:に保存できます

{
    "type": "record",
    "name": "Employee",
    "namespace": "com.baeldung.schema",
    "fields": [
    {
        "name": "id",
        "type": "int"
    },
    {
        "name": "firstName",
        "type": "string"
    },
    {
        "name": "lastName",
        "type": "string"
    }]
}

上記のスキーマを作成したら、プロジェクトをビルドする必要があります。 次に、Apache Avroコードジェネレーターは、パッケージcom.baeldung.schemaの下にEmployeeという名前のPOJOを作成します。

5.2. プロデューサー

Spring Cloud Streamは、Processorインターフェースを提供します。 これにより、出力チャネルと入力チャネルが提供されます。

これを使用して、Employeeオブジェクトをemployee-details Kafkaトピックに送信するプロデューサーを作成しましょう。

@Autowired
private Processor processor;

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    employee.setFirstName(firstName);
    employee.setLastName(lastName);

    Message<Employee> message = MessageBuilder.withPayload(employee)
                .build();

    processor.output()
        .send(message);
}

5.2. 消費者

それでは、コンシューマーを作成しましょう。

@StreamListener(Processor.INPUT)
public void consumeEmployeeDetails(Employee employeeDetails) {
    logger.info("Let's process employee details: {}", employeeDetails);
}

このコンシューマーは、employee-detailsトピックで公開されたイベントを読みます。 その出力をログに転送して、それが何をするかを見てみましょう。

5.3. カフカバインディング

これまでのところ、Processorオブジェクトのinputおよびoutputチャネルに対してのみ作業を行ってきました。 これらのチャネルは、正しい宛先で構成する必要があります。

application.yml を使用して、Kafkaバインディングを提供しましょう。

spring:
  cloud:
    stream: 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro

この場合、宛先はKafkaトピックを意味することに注意してください。 この場合は入力ソースであるため、宛先と呼ばれることは少し混乱するかもしれませんが、これは消費者と生産者の間で一貫した用語です。

5.4. エントリーポイント

プロデューサーとコンシューマーができたので、APIを公開して、ユーザーから入力を受け取り、それをプロデューサーに渡します。

@Autowired
private AvroProducer avroProducer;

@PostMapping("/employees/{id}/{firstName}/{lastName}")
public String producerAvroMessage(@PathVariable int id, @PathVariable String firstName, 
  @PathVariable String lastName) {
    avroProducer.produceEmployeeDetails(id, firstName, lastName);
    return "Sent employee details to consumer";
}

5.5. Confluentスキーマレジストリとバインディングを有効にする

最後に、アプリケーションでKafkaとスキーマレジストリバインディングの両方を適用するには、構成クラスの1つに@EnableBinding@EnableSchemaRegistryClientを追加する必要があります。

@SpringBootApplication
@EnableBinding(Processor.class)
// The @EnableSchemaRegistryClient annotation needs to be uncommented to use the Spring native method.
// @EnableSchemaRegistryClient
public class AvroKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(AvroKafkaApplication.class, args);
    }

}

そして、 ConfluentSchemaRegistryClientbeanを提供する必要があります。

@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}")
private String endPoint;

@Bean
public SchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endPoint);
    return client;
}

endPoint は、ConfluentSchemaRegistryのURLです。

5.6. 私たちのサービスをテストする

POSTリクエストを使用してサービスをテストしてみましょう。

curl -X POST localhost:8080/employees/1001/Harry/Potter

ログは、これが機能したことを示しています。

2019-06-11 18:45:45.343  INFO 17036 --- [container-0-C-1] com.baeldung.consumer.AvroConsumer       : Let's process employee details: {"id": 1001, "firstName": "Harry", "lastName": "Potter"}

5.7. 処理中に何が起こったのですか?

サンプルアプリケーションで何が起こったのかを正確に理解してみましょう。

  1. プロデューサーは、Employeeオブジェクトを使用してKafkaメッセージを作成しました
  2. プロデューサーは、従業員スキーマをスキーマレジストリに登録して、スキーマバージョンIDを取得します。これにより、新しいIDが作成されるか、既存のIDがその正確なスキーマに再利用されます。
  3. Avroは、スキーマを使用してEmployeeオブジェクトをシリアル化しました
  4. Spring Cloudは、schema-idをメッセージヘッダーに配置します
  5. メッセージはトピックで公開されました
  6. メッセージがコンシューマーに届くと、ヘッダーからスキーマIDが読み取られます。
  7. コンシューマーはschema-idを使用して、レジストリからEmployeeスキーマを取得しました
  8. コンシューマーは、そのオブジェクトを表すことができるローカルクラスを見つけ、メッセージをそのオブジェクトに逆シリアル化しました

6. ネイティブKafkaライブラリを使用したシリアル化/逆シリアル化

Spring Bootは、すぐに使用できるメッセージコンバーターをいくつか提供します。 デフォルトでは、SpringBootはContent-Typeヘッダーを使用して適切なメッセージコンバーターを選択します。

この例では、Content-Typeapplication/ * + avroであるため、 AvroSchemaMessageConverterを使用してAvro形式を読み書きします。 ただし、Confluentは、メッセージ変換にKafkaAvroSerializerとKafkaAvroDeserializerを使用することを推奨しています。

Spring独自の形式はうまく機能しますが、パーティショニングに関していくつかの欠点があり、Kafkaインスタンスの一部の非Springサービスである必要があるConfluent標準と相互運用できません。

application.yml を更新して、Confluentコンバーターを使用してみましょう。

spring:
  cloud:
    stream:
      default: 
        producer: 
          useNativeEncoding: true
        consumer:  
          useNativeEncoding: true     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
        output:
          destination: employee-details
          content-type: application/*+avro
      kafka:
         binder:        
           producer-properties:
             key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
             schema.registry.url: http://localhost:8081 
           consumer-properties:
             key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
             schema.registry.url: http://localhost:8081
             specific.avro.reader: true

useNativeEncodingを有効にしました。 これにより、SpringCloudStreamは提供されたクラスにシリアル化を委任します。

また、 kafka.binder.producer-propertiesおよびkafka.binder.consumer-properties。を使用して、SpringCloud内でKafkaのネイティブ設定プロパティを提供する方法も知っておく必要があります。

7. コンシューマーグループとパーティション

コンシューマーグループは、同じアプリケーションに属するコンシューマーのセットです。 同じコンシューマーグループのコンシューマーは、同じグループ名を共有します。

application.yml を更新して、コンシューマーグループ名を追加しましょう。

spring:
  cloud:
    stream:
      // ...     
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
      // ...

すべてのコンシューマーは、トピックパーティションをそれらの間で均等に分散します。 異なるパーティションのメッセージは並行して処理されます。

コンシューマーグループでは、一度にメッセージを読み取るコンシューマーの最大数はパーティションの数と同じです。したがって、パーティションとコンシューマーの数を構成して、目的の並列処理を実現できます。 一般に、サービスのすべてのレプリカ全体で、コンシューマーの総数よりも多くのパーティションを用意する必要があります。

7.1. パーティションキー

メッセージを処理するとき、メッセージが処理される順序が重要になる場合があります。 メッセージが並行して処理される場合、処理の順序を制御するのは困難です。

Kafkaは、特定のパーティションで、メッセージは常に到着した順序で処理されるというルールを提供します。 したがって、特定のメッセージが正しい順序で処理されることが重要な場合は、それらが互いに同じパーティションに配置されるようにします。

トピックにメッセージを送信するときにパーティションキーを提供できます。 同じパーティションキーを持つメッセージは常に同じパーティションに送られます。 パーティションキーが存在しない場合、メッセージはラウンドロビン方式でパーティション化されます。

例を挙げてこれを理解してみましょう。 従業員に対して複数のメッセージを受信していて、従業員のすべてのメッセージを順番に処理したいとします。 部門名と従業員IDは、従業員を一意に識別できます。

それでは、従業員のIDと部門名を使用してパーティションキーを定義しましょう。

{
    "type": "record",
    "name": "EmployeeKey",
    "namespace": "com.baeldung.schema",
    "fields": [
     {
        "name": "id",
        "type": "int"
    },
    {
        "name": "departmentName",
        "type": "string"
    }]
}

プロジェクトをビルドした後、 EmployeeKeyPOJOがパッケージcom.baeldung.schemaの下に生成されます。

EmployeeKeyをパーティションキーとして使用するようにプロデューサーを更新しましょう。

public void produceEmployeeDetails(int empId, String firstName, String lastName) {

    // creating employee details
    Employee employee = new Employee();
    employee.setId(empId);
    // ...

    // creating partition key for kafka topic
    EmployeeKey employeeKey = new EmployeeKey();
    employeeKey.setId(empId);
    employeeKey.setDepartmentName("IT");

    Message<Employee> message = MessageBuilder.withPayload(employee)
        .setHeader(KafkaHeaders.MESSAGE_KEY, employeeKey)
        .build();

    processor.output()
        .send(message);
}

ここでは、メッセージヘッダーにパーティションキーを配置しています。

これで、同じパーティションが同じ従業員IDと部門名のメッセージを受信します。

7.2. 消費者の並行性

Spring Cloud Streamを使用すると、application.ymlでコンシューマーの同時実行性を設定できます。

spring:
  cloud:
    stream:
      // ... 
      bindings:
        input:
          destination: employee-details
          content-type: application/*+avro
          group: group-1
          concurrency: 3

これで、消費者はトピックから3つのメッセージを同時に読むことになります。 つまり、Springは3つの異なるスレッドを生成して、独立して消費します。

8. 結論

この記事では、ApacheKafkaとAvroスキーマおよびConfluentSchemaRegistryに対してプロデューサーとコンシューマーを統合しました。

これは単一のアプリケーションで行いましたが、プロデューサーとコンシューマーは異なるアプリケーションにデプロイされ、レジストリを介して同期された独自のバージョンのスキーマを持つことができたはずです。

SpringのAvroおよびSchemaRegistryクライアントの実装の使用方法を確認した後、Confluent標準実装のシリアル化および逆シリアル化に切り替える方法を確認しました。相互運用性の目的。

最後に、トピックを分割し、メッセージの安全な並列処理を可能にする正しいメッセージキーがあることを確認する方法を確認しました。

この記事で使用されている完全なコードは、GitHubにあります。