1. 序章

このチュートリアルでは、SSL認証を使用してSpring Bootクライアントを ApacheKafkaブローカーに接続するための基本的なセットアップについて説明します。

Secure Sockets Layer(SSL)は実際には非推奨になり、2015年以降Transport Layer Security(TLS)に置き換えられました。 ただし、歴史的な理由から、Kafka(およびJava)は引き続き「SSL」を指し、この記事でもこの規則に従います。

2. SSLの概要

デフォルトでは、ApacheKafkaはすべてのデータを認証なしでクリアテキストとして送信します。

まず、ブローカーとクライアント間の暗号化用にSSLを構成できます。 これには、デフォルトで、クライアントがサーバー証明書を認証する公開鍵暗号化を使用した一方向認証が必要です。

さらに、サーバーは別のメカニズム(SSLやSASLなど)を使用してクライアントを認証することもできるため、双方向認証または相互TLS(mTLS)が可能になります。 基本的に、双方向SSL認証は、クライアントとサーバーの両方がSSL証明書を使用して互いのIDを検証し、双方向で相互に信頼することを保証します

この記事では、ブローカーはSSLを使用してクライアントを認証し、キーストアトラストストアは証明書とキーを保持するために使用されます。

各ブローカーには、秘密鍵と公開証明書を含む独自の鍵ストアが必要です。 クライアントはそのトラストストアを使用してこの証明書を認証し、サーバーを信頼します。 同様に、各クライアントには、秘密鍵と公開証明書を含む独自の鍵ストアも必要です。 サーバーはそのトラストストアを使用して、クライアントの証明書を認証および信頼し、安全な接続を確立します。

トラストストアには、証明書に署名できる認証局(CA)を含めることができます。 この場合、ブローカーまたはクライアントは、トラストストアに存在するCAによって署名された証明書を信頼します。 これにより、新しいクライアントやブローカーを追加するためにトラストストアを変更する必要がないため、証明書の認証が簡素化されます。

3. 依存関係と設定

サンプルアプリケーションは、単純なSpringBootアプリケーションになります。

Kafkaに接続するために、spring-kafka依存関係をPOMファイルに追加しましょう。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

また、 Docker Compose ファイルを使用して、Kafkaサーバーのセットアップを構成およびテストします。 最初に、SSL構成なしでこれを実行しましょう。

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.2.0
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

それでは、コンテナを起動しましょう。

docker-compose up

これにより、ブローカーがデフォルト構成で起動します。

4. ブローカーの構成

安全な接続を確立するためにブローカーに必要な最小構成を確認することから始めましょう。

4.1. スタンドアロンブローカー

この例ではブローカーのスタンドアロンインスタンスを使用していませんが、SSL認証を有効にするために必要な構成の変更を知っておくと便利です。

まず、server.propertiesのポート9093でSSL接続をリッスンするようにブローカーを構成する必要があります。

listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

次に、キーストアとトラストストアに関連するプロパティを証明書の場所と資格情報で構成する必要があります。

ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password

最後に、双方向認証を実現するには、ブローカーをクライアントを認証するように構成する必要があります。

ssl.client.auth=required

4.2. DockerCompose

Composeを使用してブローカー環境を管理しているので、上記のすべてのプロパティをdocker-compose.ymlファイルに追加しましょう。

kafka:
  image: confluentinc/cp-kafka:6.2.0
  depends_on:
    - zookeeper
  ports:
    - 9092:9092
    - 9093:9093
  environment:
    ...
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
    KAFKA_SSL_CLIENT_AUTH: 'required'
    KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
    KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
    KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
    KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
    KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
  volumes:
    - ./certs/:/etc/kafka/secrets/certs

ここでは、構成の ports セクションでSSLポート(9093)を公開しました。 さらに、構成のvolumeセクションにcertsプロジェクトフォルダーをマウントしました。 これには、必要な証明書と関連する資格情報が含まれています。

ここで、Composeを使用してスタックを再起動すると、ブローカーログに関連するSSLの詳細が表示されます。

...
kafka_1      | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1      | ===> Configuring ...
<strong>kafka_1      | SSL is enabled.</strong>
....
kafka_1      | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1      |  advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1      |  ssl.client.auth = required</strong>
<strong>kafka_1      |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1      |  ssl.endpoint.identification.algorithm = https
kafka_1      |  ssl.key.password = [hidden]
kafka_1      |  ssl.keymanager.algorithm = SunX509
<strong>kafka_1      |  ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1      |  ssl.keystore.password = [hidden]
kafka_1      |  ssl.keystore.type = JKS
kafka_1      |  ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1      |  ssl.protocol = TLSv1.3</strong>
kafka_1      |  ssl.trustmanager.algorithm = PKIX
kafka_1      |  ssl.truststore.certificates = null
<strong>kafka_1      |  ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1      |  ssl.truststore.password = [hidden]
kafka_1      |  ssl.truststore.type = JKS
....

5. SpringBootクライアント

サーバーのセットアップが完了したので、必要なSpring Bootコンポーネントを作成します。 これらは、双方向認証にSSLを必要とするブローカーと相互作用します。

5.1. プロデューサー

まず、 KafkaTemplate を使用して、指定したトピックにメッセージを送信しましょう。

public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message, String topic) {
        log.info("Producing message: {}", message);
        kafkaTemplate.send(topic, "key", message)
          .addCallback(
            result -> log.info("Message sent to topic: {}", message),
            ex -> log.error("Failed to send message", ex)
          );
    }
}

sendメソッドは非同期操作です。 したがって、ブローカーがメッセージを受信すると、いくつかの情報をログに記録する単純なコールバックを添付しました。

5.2. 消費者

次に、@KafkaListenerを使用して単純なコンシューマーを作成しましょう。  これはブローカーに接続し、プロデューサーが使用するのと同じトピックからのメッセージを消費します。

public class KafkaConsumer {

    public static final String TOPIC = "test-topic";

    public final List<String> messages = new ArrayList<>();

    @KafkaListener(topics = TOPIC)
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        log.info("Received payload: '{}'", consumerRecord.toString());
        messages.add(consumerRecord.value());
    }
}

私たちのデモアプリケーションでは、物事をシンプルに保ち、コンシューマーはメッセージをリストに保存するだけです。 実際のシステムでは、コンシューマーはメッセージを受信し、アプリケーションのビジネスロジックに従ってメッセージを処理します。

5.3. 構成

最後に、必要な構成をapplication.ymlに追加しましょう。

spring:
  kafka:
    security:
      protocol: "SSL"
    bootstrap-servers: localhost:9093
    ssl:
      trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
      trust-store-password: <password>
      key-store-location:  classpath:/client-certs/kafka.client.keystore.jks
      key-store-password: <password>
    
    # additional config for producer/consumer 

ここでは、プロデューサーとコンシューマーを構成するためにSpring Bootによって提供される必要なプロパティを設定しました。 これらのコンポーネントは両方とも同じブローカーに接続しているため、spring.kafkaセクションですべての重要なプロパティを宣言できます。 ただし、プロデューサーとコンシューマーが異なるブローカーに接続している場合は、spring.kafka.producerセクションとspring.kafka.consumerセクションでそれぞれ指定します。

構成のsslセクションでは、Kafkaブローカーを認証するためにJKSトラストストアをポイントします。 これには、ブローカー証明書にも署名したCAの証明書が含まれています。 さらに、は、ブローカー側のトラストストアに存在する必要があるCAによって署名された証明書を含むSpringクライアントキーストアのパスも提供しました。

5.4. テスト

作成ファイルを使用しているので、 Testcontainers フレームワークを使用して、ProducerおよびConsumerでエンドツーエンドのテストを作成しましょう。

@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {

    private static final String KAFKA_SERVICE = "kafka";
    private static final int SSL_PORT = 9093;  

    @Container
    public DockerComposeContainer<?> container =
      new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
        .withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());

    @Autowired
    private KafkaProducer kafkaProducer;

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Test
    void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
        String message = generateSampleMessage();
        kafkaProducer.sendMessage(message, TOPIC);

        await().atMost(Duration.ofMinutes(2))
          .untilAsserted(() -> assertThat(kafkaConsumer.messages).containsExactly(message));
    }

    private static String generateSampleMessage() {
        return UUID.randomUUID().toString();
    }
}

テストを実行すると、TestcontainersはSSL構成を含むComposeファイルを使用してKafkaブローカーを起動します。 また、アプリケーションはSSL構成で開始し、暗号化および認証された接続を介してブローカーに接続します。 これはイベントの非同期シーケンスであるため、 Awaitlity を使用して、コンシューマーメッセージストアで予期されるメッセージをポーリングしました。 これにより、すべての構成と、ブローカーとクライアント間の双方向認証が成功したことが確認されます。

6. 結論

この記事では、KafkaブローカーとSpring Bootクライアントの間で必要なSSL認証設定の基本について説明しました。

最初に、双方向認証を有効にするために必要なブローカーのセットアップを確認しました。 次に、暗号化および認証された接続を介してブローカーに接続するためにクライアント側で必要な構成を確認しました。 最後に、統合テストを使用して、ブローカーとクライアント間の安全な接続を検証しました。

いつものように、完全なソースコードはGitHubから入手できます。