1概要

Spring Integrationは、いくつかのhttps://www.baeldung.com/spring-integration[企業統合パターン]を使いやすくします。これらの方法の1つはhttps://www.baeldung.com/spring-integration-java-dsl[its DSL]を通してあります。

このチュートリアルでは、いくつかの設定を簡略化するためのDSLによるサブフローのサポートについて説明します。


2私たちの仕事

3つの異なるバケットに分けたい一連の整数があるとしましょう。

Spring Integrationを使用してこれを実行したい場合は、3つの出力チャネルを作成することから始めることができます。

  • 0、3、6、9などの数字は

    multipleOfThreeChannel

    に行きます

  • 1、4、7、10のような数字は

    remainderIsOneChannel

    に行きます

  • そして、2、5、8、11のような数字は

    remainderIsTwoChannel

    に行きます

サブフローがいかに役立つかを見るために、サブフローなしでこれがどのように見えるかから始めましょう。

次に、サブフローを使用して構成を単純化します。


  • publishSubscribeChannel


  • routeToRecipients


  • if-then

    ロジックを設定するための

    __Filter

    __s


  • __Router

    __s、スイッチロジックを設定します


3前提条件

サブフローを設定する前に、それらの出力チャンネルを作成しましょう。

デモ用にはもう少し簡単なので、これらの

__QueueChannel

__を作成します。

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {

    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

最終的に、これらは我々のグループ化された数が終わるところです。

また、Spring Integrationは簡単に複雑に見え始めるので、読みやすさのためにいくつかのヘルパーメソッドを追加します。


4サブフローなしで解決する

今度はフローを定義する必要があります。

サブフローがない場合、単純な考え方は、各タイプの数値に対して1つずつ、3つの別々の統合フローを定義することです。



  • IntegrationFlow

    コンポーネントに同じメッセージシーケンスを送信しますが、各コンポーネントの出力メッセージは異なります。**


4.1.

IntegrationFlow

コンポーネントの定義

まず、


_ SubflowConfiguration


クラスにそれぞれの

IntegrationFlow_

Beanを定義しましょう。

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

このフローには2つのエンドポイント、つまり

Filter

が続く____Splitterがあります。

フィルタはそれがどのように聞こえるかを行います。しかし、なぜスプリッタも必要なのでしょうか。

これはすぐにわかりますが、基本的には入力

コレクション

を個々のメッセージに分割します。

そして、もちろん、もう2つの

IntegrationFlow

Beanを同じ方法で定義できます。


4.2. メッセージングゲートウェイ

フローごとに、

Message Gateway

も必要です。

簡単に言うと、RESTサービスがHTTPを抽象化する方法と同様に、これらは呼び出し側からSpring Integration Messages APIを抽象化します。

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);

}

それぞれについて、

__ @ Gateway


アノテーションを使用して、入力チャネルの暗黙の名前を指定する必要があります。これは単にBeanの名前の後に

“。input” __を付けたものです。ラムダベースのフローを使用しているため、この規約を使用できます。

これらのメソッドは私たちのフローへの入り口です。


4.3. メッセージの送信と出力の確認

それでは、テストしましょう。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {

    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;

    @Test
    public void whenSendMessagesToMultipleOf3Flow__thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

メッセージを

List

として送信したことに注意してください。これが、単一の「リストメッセージ」を受け取り、それを複数の「番号メッセージ」に変換するためにスプリッタが必要だった理由です。

待つことなく次の利用可能なメッセージを取得するために

o



receive

を呼び出します。私たちのリストには2の3の倍数があるので、私たちはそれを2回呼ぶことができると予想します。

__receive


returns

null__への3番目の呼び出し。


__receive、


もちろん

Message

が返されるので、

getPayload__を呼び出して数値を抽出します。

同様に、他の2つについても同じことができます。

  • それで、それはサブフローのない解決策でした** 私たちは維持するための3つの別々のフローと3つの別々のゲートウェイメソッドがあります。

ここでは、3つの


_IntegrationFlow

_

Beanを1つのBeanに、3つのゲートウェイメソッドを1つのBeanに置き換えます。


5

publishSubscribeChannel


を使用する


publishSubscribeChannel()

メソッドは、購読中のすべてのサブフローにメッセージをブロードキャストします。このようにして、3つではなく1つのフローを作成できます。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription ->
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

このように、** サブフローは匿名です。つまり、独立して対処することはできません。

フローは1つだけなので、

__ NumbersClassifier

__を同様に編集しましょう。

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

今、私たちは

IntegrationFlow

beanとゲートウェイメソッドを一つだけ持っているので、私達はリストを一度だけ送る必要があります。

@Test
public void whenSendMessagesToFlow__thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

   //same assertions as before
}

今後は統合フローの定義のみが変更されるため、テストを再度表示することはありません。


6.

routeToRecipients


を使用する

  • 同じことを実現するもう1つの方法は

    routeToRecipients

    です。これはフィルタリングが組み込まれているので便利です。**

この方法を使用すると、ブロードキャスト用のチャネルとサブフローの両方を指定できます。 **


6.1.

受信者


以下のコードでは、条件に基づいて、受信者として

multipleof3Channel



remainderIs1Channel、

、および

remainderIsTwoChannel

を指定します。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel",
            this::isMultipleOfThree)
          .<Integer> recipient("remainderIsOneChannel",
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel",
            this::isRemainderTwo));
}

条件なしで

__recipient


を呼び出すこともでき、

__routeToRecipientsは無条件にその宛先にパブリッシュします。

6.2.

recipientFlow


routeToRecipients

を使用すると、

publishSubscribeChannelと同様に、完全なフローを定義できます。

上記のコードを修正し、最初の受信者として匿名サブフローを指定しましょう。

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("mutipleOfThreeChannel"))
  ...);

  • このサブフローはメッセージのシーケンス全体を受け取るので、** 同じ振る舞いをするためには以前のようにフィルタをかける必要があります。

繰り返しますが、1つの

IntegrationFlow

Beanで十分です

それでは、

if-else

コンポーネントに移りましょう。そのうちの一つが

Filter

です。


7.

if-then

フローを使用する

前の例のすべてで既に

Filter

を使用しています。良いニュースは、さらに処理するための条件だけでなく、破棄されたメッセージのための

チャネルまたは

** フローも指定できることです。

  • フローとチャネルの破棄は、

    __else

    __blockのように考えられます。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree,
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleofThreeChannel");
}

この場合、

if-else

ルーティングロジックを実装しました。

  • 数が3の倍数ではない場合、それらを破棄します

廃棄フローへのメッセージ

他にもありますのでここでフローを使用します
宛先チャネルを知るために必要なロジック。

破棄フローでは、数が残りの1の数ではない場合、

the

それらのメッセージを破棄チャネルに破棄します。


8計算値に対する

__switch –

__ing

そして最後に、

__route


methodを試してみましょう。

Router

はフローを任意の数の部分に分割できますが、

__Filterは2つしかできません。

8.1.

channelMapping


IntegrationFlow

Beanを定義しましょう。

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3,
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

上記のコードでは、除算を実行してルーティングキーを計算します。

route(p -> p % 3,...

このキーに基づいて、メッセージをルーティングします。

channelMapping(0, "multipleof3Channel")

8.2.

subFlowMapping

さて、他の人たちと同じように、

channelMapping



subFlowMapping

に置き換えて、サブフローを指定することでもっと制御を取ることができます:

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

あるいは、

__channel


methodの代わりに


handle

__methodを呼び出すことでさらに制御できます。

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
     //do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

この場合、サブフローは

route()

メソッドの後でメインフローに戻るので、

remainderIsTwoChannel.

チャンネルを指定する必要があります。

9.まとめ

このチュートリアルでは、サブフローを使用していくつかの方法でメッセージをフィルタ処理してルーティングする方法について説明しました。

いつものように、完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/spring-integration[on GitHub]から入手可能です。