Spring統合でのサブフローの使用
1. 概要
Spring Integrationを使用すると、いくつかの Enterprise IntegrationPatternsを簡単に使用できます。 これらの方法の1つは、そのDSLを使用することです。
このチュートリアルでは、いくつかの構成を簡素化するためのサブフローに対するDSLのサポートを見ていきます。
2. 私たちのタスク
3つの異なるバケットに分離したい整数のシーケンスがあるとしましょう。
また、Spring Integrationを使用してこれを行う場合は、次の3つの出力チャネルを作成することから始めることができます。
- 0、3、6、9のような数字は、multipleOfThreeChannelに送られます
- 1、4、7、10などの番号は、remainderIsOneChannelに送られます
- そして、2、5、8、11のような数字はremainderIsTwoChannelに行きます
サブフローがどれほど役立つかを確認するために、サブフローなしでこれがどのように見えるかから始めましょう。
次に、サブフローを使用して、次のように構成を簡略化します。
- publishSubscribeChannel
- routeToRecipients
- Filter s、if-thenロジックを構成します
- ルーター、スイッチロジックを構成します
3. 前提条件
サブフローを構成する前に、これらの出力チャネルを作成しましょう。
これらのQueueChannelは、デモが少し簡単なので、次のようにします。
@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
@Bean
QueueChannel multipleOfThreeChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsOneChannel() {
return new QueueChannel();
}
@Bean
QueueChannel remainderIsTwoChannel() {
return new QueueChannel();
}
boolean isMultipleOfThree(Integer number) {
return number % 3 == 0;
}
boolean isRemainderIOne(Integer number) {
return number % 3 == 1;
}
boolean isRemainderTwo(Integer number) {
return number % 3 == 2;
}
}
最終的に、これらはグループ化された番号が最終的になる場所です。
また、Spring Integrationは簡単に複雑に見え始める可能性があるため、読みやすくするためにいくつかのヘルパーメソッドを追加します。
4. サブフローなしで解決
次に、フローを定義する必要があります。
サブフローがない場合、簡単なアイデアは、数値のタイプごとに1つずつ、3つの個別の統合フローを定義することです。
同じシーケンスのメッセージを各IntegrationFlowコンポーネントに送信しますが、各コンポーネントの出力メッセージは異なります。
4.1. IntegrationFlowコンポーネントの定義
まず、SubflowConfigurationクラスで各IntegrationFlowBeanを定義しましょう。
@Bean
public IntegrationFlow multipleOfThreeFlow() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel");
}
このフローには、スプリッターとそれに続く Filt erの2つのエンドポイントが含まれています。
フィルターはそれがどのように聞こえるかを行います。 しかし、なぜスプリッターも必要なのですか? これはすぐにわかりますが、基本的には、入力コレクションを個々のメッセージに分割します。
もちろん、同じ方法でさらに2つのIntegrationFlowBeanを定義することもできます。
4.2. メッセージングゲートウェイ
フローごとに、メッセージゲートウェイも必要です。
簡単に言えば、これらは、RESTサービスがHTTPを抽象化する方法と同様に、呼び出し元からSpring IntegrationMessagesAPIを抽象化します。
@MessagingGateway
public interface NumbersClassifier {
@Gateway(requestChannel = "multipleOfThreeFlow.input")
void multipleOfThree(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsOneFlow.input")
void remainderIsOne(Collection<Integer> numbers);
@Gateway(requestChannel = "remainderIsTwoFlow.input")
void remainderIsTwo(Collection<Integer> numbers);
}
それぞれについて、 @Gateway アノテーションを使用し、入力チャネルの暗黙の名前を指定する必要があります。これは、単にBeanの名前の後に「。input」が続くものです。 ラムダベースのフローを使用しているため、この規則を使用できることに注意してください。
これらのメソッドは、フローへのエントリポイントです。
4.3. メッセージの送信と出力の確認
そして今、テストしましょう:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
@Autowired
private QueueChannel multipleOfThreeChannel;
@Autowired
private NumbersClassifier numbersClassifier;
@Test
public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
Message<?> outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 3);
outMessage = multipleOfThreeChannel.receive(0);
assertEquals(outMessage.getPayload(), 6);
outMessage = multipleOfThreeChannel.receive(0);
assertNull(outMessage);
}
}
メッセージをListとして送信したことに注意してください。そのため、単一の「リストメッセージ」を取得し、それを複数の「番号メッセージ」に変換するためにスプリッターが必要でした。
receiveとoを呼び出して、待機せずに次の利用可能なメッセージを取得します。 リストには3の倍数が2つあるので、2回呼び出すことができると予想されます。 receive への3番目の呼び出しは、nullを返します。
receive、はもちろん、 Message を返すため、getPayloadを呼び出して番号を抽出します。
同様に、他の2つについても同じことができます。
つまり、それはサブフローのないソリューションでした。維持する3つの別々のフローと、3つの別々のゲートウェイメソッドがあります。
ここで行うことは、3つの IntegrationFlow Beanを1つのBeanに置き換え、3つのゲートウェイメソッドを1つのBeanに置き換えることです。
5. publishSubscribeChannelを使用する
publishSubscribeChannel()メソッドは、サブスクライブしているすべてのサブフローにメッセージをブロードキャストします。 このようにして、3つではなく1つのフローを作成できます。
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.publishSubscribeChannel(subscription ->
subscription
.subscribe(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("multipleOfThreeChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderOne)
.channel("remainderIsOneChannel"))
.subscribe(subflow -> subflow
.<Integer> filter(this::isRemainderTwo)
.channel("remainderIsTwoChannel")));
}
このように、サブフローは匿名であるため、個別にアドレス指定することはできません。
フローは1つしかないので、NumbersClassifierも編集しましょう。
@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);
これで、 IntegrationFlow Beanとゲートウェイメソッドが1つしかないため、リストを送信する必要があるのは1回だけです。
@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));
// same assertions as before
}
今後、統合フローの定義のみが変更されるため、テストを再度表示することはありません。
6. routeToRecipientsを使用する
同じことを実現するもう1つの方法は、 routeToRecipients です。これは、フィルタリングが組み込まれているので便利です。
この方法を使用して、
6.1. 受信者
以下のコードでは、条件に基づいて、受信者として multipleof3Channel 、 remainderIs1Channel、、およびreservederIsTwoChannelを指定します。
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.routeToRecipients(route -> route
.<Integer> recipient("multipleOfThreeChannel",
this::isMultipleOfThree)
.<Integer> recipient("remainderIsOneChannel",
this::isRemainderOne)
.<Integer> recipient("remainderIsTwoChannel",
this::isRemainderTwo));
}
受信者を無条件で呼び出すこともでき、routeToRecipientsは無条件にその宛先に公開します。
6.2. receiveFlow
そして、それに注意してください routeToRecipients のように、完全なフローを定義できます
上記のコードを変更して、匿名サブフローを最初の受信者として指定しましょう。
.routeToRecipients(route -> route
.recipientFlow(subflow -> subflow
.<Integer> filter(this::isMultipleOfThree)
.channel("mutipleOfThreeChannel"))
...);
このサブフローはメッセージのシーケンス全体を受信するため、同じ動作を得るには、前と同じようにフィルタリングする必要があります。
繰り返しになりますが、1つのIntegrationFlowBeanで十分でした。
それでは、if-elseコンポーネントに移りましょう。 それらの1つはフィルターです。
7. if-thenフローの使用
これまでのすべての例では、すでにFilterを使用しています。 幸いなことに、さらに処理するための条件だけでなく、チャネルまたは破棄されたメッセージのフローも指定できます。
elseブロックのような廃棄フローとチャネルを考えることができます。
@Bean
public IntegrationFlow classify() {
return flow -> flow.split()
.<Integer> filter(this::isMultipleOfThree,
notMultiple -> notMultiple
.discardFlow(oneflow -> oneflow
.<Integer> filter(this::isRemainderOne,
twoflow -> twoflow
.discardChannel("remainderIsTwoChannel"))
.channel("remainderIsOneChannel"))
.channel("multipleofThreeChannel");
}
この場合、if-elseルーティングロジックを実装しました。
数が3の倍数でない場合、次にはそれらのメッセージを破棄フローに破棄します。 宛先チャネルを知るために必要なロジックが多いため、ここではフローを使用します。 - 破棄フローでは、 の数が残りの1でない場合、次にはそれらのメッセージを破棄チャネルに破棄します。
8. スイッチ-計算値のing
そして最後に、 ルートメソッド、これは私たちよりも少し多くの制御を与えます
8.1. channelMapping
IntegrationFlowBeanを定義しましょう。
@Bean
public IntegrationFlow classify() {
return classify -> classify.split()
.<Integer, Integer> route(number -> number % 3,
mapping -> mapping
.channelMapping(0, "multipleOfThreeChannel")
.channelMapping(1, "remainderIsOneChannel")
.channelMapping(2, "remainderIsTwoChannel"));
}
上記のコードでは、除算を実行してルーティングキーを計算します。
route(p -> p % 3,...
このキーに基づいて、メッセージをルーティングします。
channelMapping(0, "multipleof3Channel")
8.2. subFlowMapping
これで、他の場合と同様に、channelMappingをsubFlowMappingに置き換えて、サブフローを指定することで、より詳細な制御を行うことができます。
.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))
または、チャネルメソッドの代わりにハンドルメソッドを呼び出すことにより、さらに制御します。
.subFlowMapping(2, subflow -> subflow
.<Integer> handle((payload, headers) -> {
// do extra work on the payload
return payload;
}))).channel("remainderIsTwoChannel");
この場合、サブフローは route()メソッドの後にメインフローに戻るため、チャネルremainderIsTwoChannel。を指定する必要があります。
9. 結論
このチュートリアルでは、サブフローを使用していくつかの方法でメッセージをフィルタリングおよびルーティングする方法について説明しました。
いつものように、完全なソースコードはGitHubで入手できます。