1. 序章 

このチュートリアルでは、アプリケーション統合を作成するためのSpring統合JavaDSLについて学習します。

はじめにSpring統合で構築したファイル移動統合を採用し、代わりにDSLを使用します。

2. 依存関係

Spring Integration Java DSLは、 Spring IntegrationCoreの一部です。

したがって、その依存関係を追加できます。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

また、ファイル移動アプリケーションで作業するには、Spring統合ファイルも必要です。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

3. Spring Integration Java DSL

Java DSLの前は、ユーザーはSpringIntegrationコンポーネントをXMLで構成していました。

DSLは、純粋にJavaで完全なSpringIntegrationパイプラインを簡単に作成できる流暢なビルダーをいくつか導入しています。 

したがって、パイプを通過するデータを大文字にするチャネルを作成したいとします。

過去に、私たちは次のことを行った可能性があります。

<int:channel id="input"/>

<int:transformer input-channel="input" expression="payload.toUpperCase()" />

そして今、私たちは代わりに行うことができます:

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlows.from("input")
      .transform(String::toUpperCase)
      .get();
}

4. ファイル移動アプリ

ファイル移動の統合を開始するには、いくつかの簡単な構成要素が必要です。

4.1. 統合フロー

必要な最初のビルディングブロックは統合フローです。これは、IntegrationFlowsビルダーから取得できます。

IntegrationFlows.from(...)

にはいくつかのタイプがありますが、このチュートリアルでは、次の3つだけを見ていきます。

  • MessageSource s
  • MessageChannel s、および
  • 文字列s

3つすべてについては後ほど説明します。

from を呼び出した後、いくつかのカスタマイズ方法を利用できるようになりました。

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // add more components
  .get();

最終的に、 IntegrationFlows は、SpringIntegrationアプリの最終製品であるIntegrationFlow、のインスタンスを常に生成します。

入力を受け取り、適切な変換を実行し、結果を出力するこのパターンは、すべてのSpringIntegrationアプリの基本です

4.2. 入力ソースの説明

まず、ファイルを移動するには、統合フローにファイルを探す場所を指定する必要があります。そのためには、 MessageSource:が必要です。

@Bean
public MessageSource<File> sourceDirectory() {
  // .. create a message source
}

簡単に言えば、 MessageSource は、アプリケーションの外部にあるメッセージを送信できる場所です。

具体的には、その外部ソースをSpringメッセージング表現に適応できるものが必要です。 そして、この適応入力に焦点を合わせているため、これらはしばしば入力チャネルアダプターと呼ばれます。

spring -integration-file 依存関係は、ユースケースに最適な入力チャネルアダプターを提供します: FileReadingMessageSource:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File(INPUT_DIR));
    return messageSource;
}

ここで、 FileReadingMessageSource は、 INPUT_DIR で指定されたディレクトリを読み取り、そこからMessageSourceを作成します。

これをIntegrationFlows.from呼び出しのソースとして指定しましょう。

IntegrationFlows.from(sourceDirectory());

4.3. 入力ソースの構成

さて、これを長寿命のアプリケーションとして考えている場合、起動時にすでに存在するファイルを移動するだけでなく、に入ってきたファイルに気付くことができるようにしたいと思うでしょう。

これを容易にするために、 from は、入力ソースのさらなるカスタマイズとして、追加のconfigureersを使用することもできます。

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

この場合、Spring Integrationにそのソース(この場合はファイルシステム)を10秒ごとにポーリングするように指示することで、入力ソースの復元力を高めることができます。

もちろん、これはファイル入力ソースだけに当てはまるわけではありません。このポーラーを任意のMessageSourceに追加できます。

4.4. 入力ソースからのメッセージのフィルタリング

次に、ファイル移動アプリケーションで特定のファイルのみを移動したいとします。たとえば、jpg拡張子の付いた画像ファイルなどです。

このために、GenericSelectorを使用できます。

@Bean
public GenericSelector<File> onlyJpgs() {
    return new GenericSelector<File>() {

        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

それでは、統合フローをもう一度更新しましょう。

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs());

または、このフィルターは非常に単純なので、代わりにlambdaを使用して定義することもできます。

IntegrationFlows.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5. ServiceActivatorsを使用したメッセージの処理

フィルタリングされたファイルのリストができたので、それらを新しい場所に書き込む必要があります。

Service Activator s は、Spring統合の出力について考えるときに使用するものです。

spring-integration-fileFileWritingMessageHandlerサービスアクティベーターを使用してみましょう。

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

ここで、 FileWritingMessageHandler は、受信した各MessageペイロードをOUTPUT_DIRに書き込みます。

繰り返しますが、更新しましょう:

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

ちなみに、setExpectReplyの使用法に注意してください。 統合フローは双方向である可能性があるため、この呼び出しは、この特定のパイプが一方向であることを示します。

4.6. 統合フローのアクティブ化

すべてのコンポーネントを追加したら、IntegrationFlowをBean として登録して、アクティブ化する必要があります。

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

get メソッドは、SpringBeanとして登録する必要があるIntegrationFlowインスタンスを抽出します。

アプリケーションコンテキストが読み込まれるとすぐに、IntegrationFlowに含まれるすべてのコンポーネントがアクティブ化されます。

これで、アプリケーションはソースディレクトリからターゲットディレクトリへのファイルの移動を開始します。

5. 追加コンポーネント

DSLベースのファイル移動アプリケーションでは、インバウンドチャネルアダプター、メッセージフィルター、およびサービスアクティベーターを作成しました。

他のいくつかの一般的なSpring統合コンポーネントを見て、それらをどのように使用するかを見てみましょう。

5.1. メッセージチャネル

前述のように、メッセージチャネルはフローを初期化する別の方法です。

IntegrationFlows.from("anyChannel")

これは、「anyChannelというチャネルBeanを検索または作成してください。 次に、他のフローからanyChannelに入力されたデータを読み取ります。」

しかし、実際にはそれよりも汎用的です。

簡単に言えば、チャネルはプロデューサーをコンシューマーから抽象化し、Java Queueと考えることができます。 チャネルはフローの任意のポイントに挿入できます

たとえば、あるディレクトリから次のディレクトリに移動するときにファイルに優先順位を付けたいとします。

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

次に、フローの間にchannelへの呼び出しを挿入できます。

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

選択できるチャネルは数十あり、並行性、監査、または中間永続性(KafkaまたはJMSバッファーを考えてください)用のより便利なチャネルがいくつかあります。

また、 Bridge と組み合わせると、チャネルが強力になる可能性があります。

5.2. 橋

2つのチャネルを組み合わせる場合は、ブリッジを使用します。

出力ディレクトリに直接書き込む代わりに、ファイル移動アプリに別のチャネルに書き込むようにしたと想像してみてください。

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

これで、チャネルに書き込んだだけなので、そこから他のフローにブリッジできます。

保持タンクをポーリングしてメッセージを送信し、宛先に書き込むブリッジを作成しましょう。

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
      .handle(targetDirectory())
      .get();
}

繰り返しになりますが、中間チャネルに書き込んだため、これらの同じファイルを取得して異なるレートで書き込む別のフローを追加できます

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
      .handle(anotherTargetDirectory())
      .get();
}

ご覧のとおり、個々のブリッジはさまざまなハンドラーのポーリング構成を制御できます。

アプリケーションコンテキストが読み込まれるとすぐに、ソースディレクトリから2つのターゲットディレクトリへのファイルの移動を開始する、より複雑なアプリが動作するようになります。

6. 結論

この記事では、Spring IntegrationJavaDSLを使用してさまざまな統合パイプラインを構築するさまざまな方法について説明しました。

基本的に、今回は純粋なJavaを使用して、前のチュートリアルからファイル移動アプリケーションを再作成することができました。

また、チャネルやブリッジなど、他のいくつかのコンポーネントも調べました。

このチュートリアルで使用される完全なソースコードは、Githubから入手できます。