1概要

Spring Cloud Streamは、Spring BootとSpring Integrationの上に構築されたフレームワークで、イベント駆動型またはメッセージ駆動型のマイクロサービスを作成するのに役立ちます。

この記事では、Spring Cloud Streamの概念と構成を簡単な例を使って紹介します。


2 Mavenの依存関係

始めるには、https://search.maven.org/classic/#search%7C1%7Cg%3A%22org.springframework.cloud%22%20AND%20a%3A%22spring-を追加する必要があります。 cloud-starter-stream-rabbit%22

pom.xml

へのメッセージングミドルウェアとしてのMavenの依存関係:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

そしてhttps://search.maven.org/classic/#search%7Cgav%7C1%7Cg%3A%22org.springframework.cloud%22%20AND%20a%3A%22spring-cloud-stream-testを追加しますJUnitサポートを有効にするための-support%22[Maven Centralからのモジュール依存]

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <version>1.3.0.RELEASE</version>
    <scope>test</scope>
</dependency>

** 3主なコンセプト

マイクロサービスアーキテクチャは、「https://martinfowler.com/articles/microservices.html#SmartEndpointsAndDumbPipes[スマートエンドポイントとダムパイプ]」の原則に従います。エンドポイント間の通信は、RabbitMQやApache Kafkaなどのメッセージングミドルウェア関係者によって推進されています。

  • サービスは、これらのエンドポイントまたはチャネル** を介してドメインイベントを公開することによって通信します。

Spring Cloud Streamフレームワークを構成する概念と、メッセージ駆動型サービスを構築するために注意しなければならないパラダイムを見ていきましょう。


3.1. コンストラクト

Spring Cloud Streamで

input

bindingをリッスンし、

output

bindingに応答を送信する単純なサービスを見てみましょう。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[]args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

アノテーション

@ EnableBinding

は、インターフェース

Processor

内で定義された

INPUT

および

OUTPUT

チャネルをバインドするようにアプリケーションを構成します。

どちらのチャネルも、具象メッセージングミドルウェアまたはバインダを使用するように設定できるバインディングです。

これらすべての概念の定義を見てみましょう。


  • Bindings

    – 入力を識別するインタフェースの集まり

宣言的に出力チャンネル
**

Binder

– Kafkaなどのメッセージングミドルウェアの実装

うさぎのMQ
**

Channel

– 間の通信パイプを表します

メッセージングミドルウェアとその応用
**

StreamListeners

– Bean内のメッセージ処理メソッド

その後のチャネルからのメッセージに対して自動的に呼び出されます。

MessageConverter

は、間のシリアライゼーション/デシリアライゼーションを行います。
ミドルウェア固有のイベントとドメインオブジェクトタイプ/POJO
**

Message


Schemas

– のシリアライゼーションとデシリアライゼーションに使用されます

メッセージ、これらのスキーマはロケーションから静的に読み取ることも動的にロードすることもでき、ドメインオブジェクト型の進化をサポート


3.2. コミュニケーションパターン

  • 宛先に指定されたメッセージは、

    Publish-Subscribe

    メッセージングパターンによって配信されます。** パブリッシャは、メッセージをトピックに分類し、それぞれを名前で識別します。購読者は1つ以上のトピックに関心を示します。ミドルウェアはメッセージをフィルタリングし、興味深いトピックを購読者に配信します。

これで、購読者をグループ化できました。

消費者グループ

は、グループまたはグループのパーティションからのメッセージが負荷分散された方法で配信される、グループIDによって識別される一連の加入者または消費者です。


4プログラミングモデル

このセクションでは、Spring Cloud Streamアプリケーションを構築するための基本について説明します。


4.1. 機能テスト

テストサポートは、チャネルとの対話およびメッセージの検査を可能にするバインダーの実装です。

上記の

enrichLogMessage

サービスにメッセージを送信し、応答にメッセージの先頭に

“[1]:“

というテキストが含まれているかどうかを確認します。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage__thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}


4.2. カスタムチャンネル

上記の例では、Spring Cloudが提供する

Processor

インターフェイスを使用しました。これには、入力チャネルと出力チャネルがそれぞれ1つだけあります。

1つの入力チャンネルと2つの出力チャンネルのように、何か違うものが必要な場合は、カスタムプロセッサを作成できます。

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Springはこのインターフェースの適切な実装を提供します。

チャンネル名は

@ Output(“ myOutput”)

のようにアノテーションを使って設定できます。

それ以外の場合、Springはメソッド名をチャンネル名として使用します。

したがって、

myInput



myOutput

、および

anotherOutput

という3つのチャネルがあります。

ここで、値が10より小さい場合にメッセージを1つの出力にルーティングし、値が10以上の場合は別の出力にルーティングするとします。

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}


4.3. 条件付き発送


@ StreamListener

アノテーションを使用して、

SpEL式

で定義した任意の条件を使用して、

コンシューマに期待されるメッセージを

フィルタ処理することもできます。

例として、メッセージをさまざまな出力にルーティングするための別のアプローチとして、条件付きディスパッチを使用できます。

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT,
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT,
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

このアプローチの唯一の制限は、これらのメソッドが値を返さないことです。


5セットアップ

RabbitMQブローカーからのメッセージを処理するアプリケーションを設定しましょう。


5.1. バインダー構成


META-INF/spring.binders

でデフォルトのバインダー実装を使うようにアプリケーションを設定できます。

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

バインダーの実装が提供されていない場合、Springはチャネル間の直接メッセージ通信を使用します。


5.2. RabbitMQの設定

RabbitMQバインダーを使うようにセクション3.1の例を設定するには、

src/main/resources

にある

application.yml

を更新する必要があります:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local__rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local__rabbit
      binders:
        local__rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host:/----

__input__バインディングは__queue.log.messages__という名前の交換を使用し、__output__バインディングは交換__queue.pretty.log.messages__を使用します。両方のバインディングは__local__rabbit__と呼ばれるバインダーを使用します。

RabbitMQのエクスチェンジやキューを事前に作成する必要はありません。アプリケーションを実行すると、両方の交換が自動的に作成されます。

アプリケーションをテストするには、RabbitMQ管理サイトを使用してメッセージを公開します。交換__queue.log.messages__の__Publish Message__パネルで、リクエストをJSON形式で入力する必要があります。

====  **  5.3. メッセージ変換のカスタマイズ**

Spring Cloud Streamでは、特定のコンテンツタイプにメッセージ変換を適用することができます。上記の例では、JSON形式を使用する代わりに、プレーンテキストを提供したいと思います。

これを行うには、__MessageConverter__ ** を使用して__LogMessage__にカスタム変換を適用します。

[source,java,gutter:,true]

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
//…​

@Bean
public MessageConverter providesTextPlainMessageConverter() {
    return new TextPlainMessageConverter();
}

   //...
}

[source,java,gutter:,true]

public class TextPlainMessageConverter extends AbstractMessageConverter {

public TextPlainMessageConverter() {
    super(new MimeType("text", "plain"));
}

@Override
protected boolean supports(Class<?> clazz) {
    return (LogMessage.class == clazz);
}

    @Override
    protected Object convertFromInternal(Message<?> message,
        Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String
          ? (String) payload
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

これらの変更を適用した後、__Publish Message__パネルに戻って、ヘッダ“ __contentTypes__”を“ __text/plain__”に、ペイロードを“ __Hello World__”に設定すれば、以前と同じように動作します。

====  **  5.4. 消費者グループ**

私たちのアプリケーションの複数のインスタンスを実行するとき、** 入力チャンネルに新しいメッセージがあるたびに、すべての加入者は** 通知されます。

ほとんどの場合、メッセージは一度だけ処理する必要があります。 Spring Cloud Streamはコンシューマグループを介してこの動作を実装します。

この動作を有効にするために、各コンシューマバインディングは__spring.cloud.stream.bindings。<CHANNEL> .group__プロパティを使用してグループ名を指定できます。

[source,text,gutter:,true]

spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local__rabbit
group: logMessageConsumers
…​

===  **  6. メッセージ駆動型マイクロサービス**

このセクションでは、Spring Cloud Streamアプリケーションをマイクロサービスのコンテキストで実行するために必要なすべての機能を紹介します。

====  **  6.1. スケールアップする**

複数のアプリケーションが実行されている場合、データがコンシューマ間で正しく分割されるようにすることが重要です。そのために、Spring Cloud Streamには2つのプロパティがあります。

**  **  __spring.cloud.stream.instanceCount__ **   - 実行中のアプリケーションの数

**  **  __spring.cloud.stream.instanceIndex__ **   - 現在のインデックス

応用

たとえば、上記の__MyLoggerServiceApplication__アプリケーションの2つのインスタンスをデプロイした場合、両方のアプリケーションに対してプロパティ__spring.cloud.stream.instanceCount__は2になり、プロパティ__spring.cloud.stream.instanceIndex__はそれぞれ0と1になるはずです。

link:/spring-cloud-data-flow-stream-processing[この記事]で説明されているようにSpring Data Flowを使用してSpring Cloud Streamアプリケーションをデプロイすると、これらのプロパティは自動的に設定されます。

====  **  6.2. パーティショニング**

ドメインイベントは__Partitioned__メッセージになります。これは、ストレージをスケールアップしてアプリケーションのパフォーマンスを向上させるときに役立ちます。

ドメインイベントは通常パーティションキーを持っているので、関連するメッセージと共に同じパーティションに入ります。

ログメッセージをメッセージの最初の文字(パーティションキー)でパーティション化し、2つのパーティションにグループ化するとします。

__A-M__で始まるログメッセージ用のパーティションと__N-Z用のパーティションがあります。これは、次の2つのプロパティを使用して構成できます。

**  __spring.cloud.stream.bindings.output.producer.partitionKeyExpression__

 - ペイロードを分割する式
**  __spring.cloud.stream.bindings.output.producer.partitionCount__  -  the

グループ数

** パーティション化する式が複雑すぎて1行だけで記述できない場合があります** このような場合は、__spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass__プロパティを使用してカスタムパーティション戦略を記述できます。

====  **  6.3. 健康インジケーター**

マイクロサービスのコンテキストでは、サービスが停止したときや失敗したときを検出する必要もあります。 Spring Cloud Streamは、バインダーのヘルスインジケーターを有効にするために__management.health.binders.enabled__プロパティを提供します。

アプリケーションを実行すると、__http://<host>:<port>/health__で正常性状態を照会できます。

===  **  7. 結論**

このチュートリアルでは、Spring Cloud Streamの主な概念を紹介し、RabbitMQ上のいくつかの簡単な例を通してそれを使用する方法を示しました。

Spring Cloud Streamに関する詳細情報はhttps://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/[ここ]にあります。

この記事のソースコードはhttps://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream/spring-cloud-stream-rabbit[over on GitHub]にあります。