1. 概要

Spring Cloud Streamは、SpringBootとSpringIntegrationの上に構築されたフレームワークであり、はイベント駆動型またはメッセージ駆動型のマイクロサービスの作成に役立ちます。

この記事では、Spring CloudStreamの概念と構成をいくつかの簡単な例とともに紹介します。

2. Mavenの依存関係

開始するには、ブローカーRabbitMQMaven依存関係をメッセージングミドルウェアとしてSpring Cloudスターターストリームをpom.xmlに追加する必要があります。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>3.1.3</version>
</dependency>

また、Maven Central からのモジュール依存関係を追加して、JUnitサポートも有効にします。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <version>3.1.3</version>
    <scope>test</scope>
</dependency>

3. 主な概念

マイクロサービスアーキテクチャは、「スマートエンドポイントとダムパイプ」の原則に従います。 エンドポイント間の通信は、RabbitMQやApacheKafkaなどのメッセージングミドルウェアパーティによって駆動されます。 サービスは、これらのエンドポイントまたはチャネルを介してドメインイベントを公開することによって通信します

Spring Cloud Streamフレームワークを構成する概念と、メッセージ駆動型サービスを構築するために知っておく必要のある重要なパラダイムについて見ていきましょう。

3.1. コンストラクト

input バインディングをリッスンし、outputバインディングに応答を送信するSpring CloudStreamの単純なサービスを見てみましょう。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

アノテーション@EnableBindingは、インターフェイスProcessor内で定義されたチャネルINPUTおよびOUTPUTをバインドするようにアプリケーションを構成します。 両方のチャネルは、具体的なメッセージングミドルウェアまたはバインダーを使用するように構成できるバインディングです。

これらすべての概念の定義を見てみましょう。

  • Bindings —入力チャネルと出力チャネルを宣言的に識別するインターフェイスのコレクション
  • Binder —KafkaやRabbitMQなどのメッセージングミドルウェアの実装
  • Channel —メッセージングミドルウェアとアプリケーション間の通信パイプを表します
  • StreamListeners MessageConverterがミドルウェア固有のイベントとドメインオブジェクトタイプ/POJOの間でシリアル化/逆シリアル化を行った後、チャネルからのメッセージで自動的に呼び出されるBeanのメッセージ処理メソッド
  • Mes sage スキーマ—メッセージのシリアル化と逆シリアル化に使用されます。これらのスキーマは、場所から静的に読み取るか、動的にロードして、ドメインオブジェクトタイプの進化をサポートします。

3.2. コミュニケーションパターン

宛先に指定されたメッセージは、パブリッシュ/サブスクライブメッセージングパターンによって配信されます。パブリッシャーは、メッセージをトピックに分類し、それぞれに名前を付けます。 サブスクライバーは、1つ以上のトピックに関心を示します。 ミドルウェアはメッセージをフィルタリングし、興味深いトピックのメッセージをサブスクライバーに配信します。

これで、サブスクライバーをグループ化できます。 コンシューマーグループは、グループID で識別されるサブスクライバーまたはコンシューマーのセットであり、トピックまたはトピックのパーティションからのメッセージが負荷分散された方法で配信されます。

4. プログラミングモデル

このセクションでは、SpringCloudStreamアプリケーションの構築の基本について説明します。

4.1. 機能テスト

テストサポートは、チャネルとの対話とメッセージの検査を可能にするバインダー実装です。

上記のenrichLogMessageサービスにメッセージを送信し、メッセージの先頭に “[1]:”というテキストが含まれているかどうかを確認してみましょう。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

4.2. カスタムチャネル

上記の例では、SpringCloudが提供するProcessor インターフェイスを使用しました。このインターフェイスには、1つの入力チャネルと1つの出力チャネルしかありません。

1つの入力チャネルと2つの出力チャネルなど、別の何かが必要な場合は、カスタムプロセッサを作成できます。

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Springは、このインターフェースの適切な実装を提供します。 チャネル名は、 @Output( “myOutput”)のような注釈を使用して設定できます。

それ以外の場合、Springはメソッド名をチャネル名として使用します。 したがって、 myInput myOutput 、およびanotherOutputという3つのチャネルがあります。

ここで、値が10未満の場合にメッセージをある出力にルーティングし、値が10以上の場合に別の出力にメッセージをルーティングするとします。

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

4.3. 条件付きディスパッチ

@StreamListener アノテーションを使用すると、 SpEL式で定義した任意の条件を使用して、コンシューマーで予期されるメッセージをフィルタリングすることもできます。

例として、メッセージをさまざまな出力にルーティングする別のアプローチとして、条件付きディスパッチを使用できます。

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

このアプローチの唯一の制限は、これらのメソッドが値を返さないことです。

5. 設定

RabbitMQブローカーからのメッセージを処理するアプリケーションをセットアップしましょう。

5.1. バインダー構成

META-INF / spring.binders を介して、デフォルトのバインダー実装を使用するようにアプリケーションを構成できます。

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

または、この依存関係を含めることで、RabbitMQのバインダーライブラリをクラスパスに追加できます。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

バインダーの実装が提供されていない場合、Springはチャネル間の直接メッセージ通信を使用します。

5.2. RabbitMQ構成

セクション3.1の例でRabbitMQバインダーを使用するように構成するには、 src / main /resourcesにあるapplication.ymlを更新する必要があります。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

input バインディングは、 queue.log.messages と呼ばれる交換を使用し、 output バインディングは、交換queue.pretty.log.messagesを使用します。 。 どちらのバインディングも、local_rabbitというバインダーを使用します。

事前にRabbitMQ交換またはキューを作成する必要がないことに注意してください。 アプリケーションを実行すると、両方の交換が自動的に作成されます

アプリケーションをテストするために、RabbitMQ管理サイトを使用してメッセージを公開できます。 交換queue.log.messagesPublishMessage パネルで、JSON形式でリクエストを入力する必要があります。

5.3. メッセージ変換のカスタマイズ

Spring Cloud Streamを使用すると、特定のコンテンツタイプにメッセージ変換を適用できます。 上記の例では、JSON形式を使用する代わりに、プレーンテキストを提供します。

これを行うには、MessageConverterを使用してカスタム変換をLogMessageに適用します。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    //...

    @Bean
    public MessageConverter providesTextPlainMessageConverter() {
        return new TextPlainMessageConverter();
    }

    //...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, 
        Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String 
          ? (String) payload 
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

これらの変更を適用した後、メッセージの公開パネルに戻り、ヘッダー「 contentTypes」を「text/ plain 」に設定し、ペイロードを「 Hello World “、以前と同じように機能するはずです。

5.4. 消費者グループ

アプリケーションの複数のインスタンスを実行している場合、入力チャネルに新しいメッセージがあるたびに、すべてのサブスクライバーに通知されます

ほとんどの場合、メッセージを1回だけ処理する必要があります。 Spring Cloud Streamは、コンシューマーグループを介してこの動作を実装します。

この動作を有効にするために、各コンシューマバインディングは spring.cloud.stream.bindings。 。グループグループ名を指定するプロパティ:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
          ...

6. メッセージ駆動型マイクロサービス

このセクションでは、マイクロサービスのコンテキストでSpringCloudStreamアプリケーションを実行するために必要なすべての機能を紹介します。

6.1. スケールアップする

複数のアプリケーションが実行されている場合、データがコンシューマー間で適切に分割されていることを確認することが重要です。 そのために、SpringCloudStreamは2つのプロパティを提供します。

  • spring.cloud.stream.instanceCount —実行中のアプリケーションの数
  • spring.cloud.stream.instanceIndex —現在のアプリケーションのインデックス

たとえば、上記の MyLoggerServiceApplication アプリケーションの2つのインスタンスをデプロイした場合、プロパティ spring .cloud.stream.instanceCount は両方のアプリケーションで2であり、プロパティ spring.cloud.stream.instanceIndexはそれぞれ0と1である必要があります。

この記事で説明されているように、SpringDataFlowを使用してSpringCloudStreamアプリケーションをデプロイすると、これらのプロパティは自動的に設定されます。

6.2. パーティショニング

ドメインイベントは、Partitionedメッセージである可能性があります。 これは、ストレージをスケールアップしてアプリケーションのパフォーマンスを向上させる場合に役立ちます。

ドメインイベントには通常、パーティションキーがあるため、関連するメッセージと同じパーティションになります。

ログメッセージをメッセージの最初の文字(パーティションキー)でパーティション化し、2つのパーティションにグループ化するとします。

AM で始まるログメッセージ用に1つのパーティションがあり、 NZ用に別のパーティションがあります。これは、次の2つのプロパティを使用して構成できます。

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression —ペイロードを分割する式
  • spring.cloud.stream.bindings.output.producer.partitionCount —グループの数

パーティション化する式が複雑すぎて、1行だけで記述できない場合があります。このような場合、プロパティ spring.cloud.stream.bindings.output.producerを使用して、カスタムパーティション戦略を記述できます。 .partitionKeyExtractorClass

6.3. 健康指標

マイクロサービスのコンテキストでは、サービスがダウンしたとき、または失敗し始めたときに検出する必要もあります。 Spring Cloud Streamは、バインダーのヘルスインジケーターを有効にするプロパティmanagement.health.binders.enabledを提供します。

アプリケーションを実行するとき、次の場所でヘルスステータスを照会できます。 http:// /健康

7. 結論

このチュートリアルでは、Spring Cloud Streamの主要な概念を示し、RabbitMQでのいくつかの簡単な例を通じてそれを使用する方法を示しました。 Spring Cloud Streamの詳細については、こちらをご覧ください。

この記事のソースコードは、GitHubにあります。