1. 概要

このチュートリアルでは、Spring統合フレームワークでのトランザクションサポートについて説明します。

2. メッセージフローのトランザクション

Springは、初期のバージョン以降、リソースとトランザクションの同期をサポートしています。 複数のトランザクションマネージャーによって管理されるトランザクションを同期するためによく使用されます。

たとえば、JMSコミットをJDBCコミットと同期させることができます。

一方、メッセージフローにはより複雑なユースケースもあります。 これには、非トランザクションリソースとさまざまなタイプのトランザクションリソースの同期が含まれます。

通常、メッセージングフローは、2つの異なるタイプのメカニズムによって開始できます。

2.1. ユーザープロセスによって開始されたメッセージフロー

一部のメッセージフローは、一部のメッセージチャネルでメッセージをトリガーしたり、メッセージゲートウェイメソッドを呼び出したりするなど、サードパーティプロセスの開始に依存します。

これらのフローのトランザクションサポートは、Springの標準トランザクションサポートを介して構成します。トランザクションをサポートするために、Spring統合によってフローを明示的に構成する必要はありません。 Spring Integrationメッセージフローは、Springコンポーネントのトランザクションセマンティクスを自然に尊重します。

たとえば、ServiceActivatorまたはそのメソッドに@Transactionalで注釈を付けることができます。

@Transactional
public class TxServiceActivator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void storeTestResult(String testResult) {
        this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
        log.info("Test result is stored: {}", testResult);
    }
}

storeTestResult メソッドは任意のコンポーネントから実行でき、トランザクションコンテキストは通常どおり適用されます。 このアプローチでは、トランザクション構成を完全に制御できます。

2.2. デーモンプロセスによって開始されたメッセージフロー

このタイプのメッセージフローは、自動化によく使用されます。 たとえば、 Poller がメッセージキューをポーリングして、ポーリングされたメッセージで新しいメッセージフローを開始したり、スケジューラーが新しいメッセージを作成して事前定義された時間にメッセージフローを開始したりしてプロセスをスケジュールします。

本質的に、これらはトリガープロセス(デーモンプロセス)によって開始されるトリガーベースのフローです。 これらのフローでは、新しいメッセージフローが開始されるたびにトランザクションコンテキストを作成するためのトランザクション構成を提供する必要があります。

構成を通じて、フローをSpringの既存のトランザクションサポートに委任します。

この記事の残りの部分では、このタイプのメッセージフローのトランザクションサポートに焦点を当てます。

3. ポーラートランザクションサポート

Poller は、統合フローの一般的なコンポーネントです。 さまざまなソースから定期的にデータを取得し、統合チェーンを介して渡します。

Spring Integrationは、すぐに使用できるポーラーのトランザクションサポートを提供します。 Poller コンポーネントを構成するときはいつでも、トランザクション構成を提供できます。

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
    ...
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

private TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
      .transactionManager(txManager)
      .build();
}

TransactionManagerとカスタムTransactionSynchronizationFactoryへの参照を提供する必要があります。そうでない場合は、デフォルトに依存できます。 内部的には、Springのネイティブトランザクションがプロセスをラップします。 その結果、このポーラーによって開始されるすべてのメッセージフローはトランザクションです。

4. トランザクション境界

トランザクションが開始されると、トランザクションコンテキストは常に現在のスレッドにバインドされます。 メッセージフローに含まれる可能性のあるエンドポイントとチャネルの数に関係なく、フローが同じスレッドに存在する限り、トランザクションコンテキストは常に保持されます。

あるサービスで新しいスレッドを開始してそれを破ると、Transactionalの境界も破られます。 基本的に、トランザクションはその時点で終了します。

スレッド間で正常なハンドオフが発生した場合、フローは成功したと見なされます。 これにより、その時点でトランザクションがコミットされますが、フローは続行され、ダウンストリームのどこかで例外が発生する可能性があります。

その結果、その Exception はフローのイニシエーターに戻ることができるため、トランザクションはロールバックに終わる可能性があります。 そのため、スレッドの境界を破ることができる任意のポイントでトランザクションチャネルを使用する必要があります

たとえば、JMS、JDBC、またはその他のトランザクションチャネルを使用する必要があります。

5. トランザクションの同期

一部のユースケースでは、特定の操作をフロー全体を含むトランザクションと同期させることが有益です。

たとえば、着信ファイルを読み取り、その内容に基づいてデータベースの更新を実行するPollerの使用方法を示します。 データベース操作が完了すると、操作の成功に応じてファイルの名前も変更されます。

例に移る前に、 このアプローチは、ファイルシステムの操作をトランザクションと同期させることを理解することが重要です。 本質的にトランザクションではないファイルシステムが実際にトランザクションになることはありません。

トランザクションはポーリングの前に開始され、フローが完了するとコミットまたはロールバックされ、その後にファイルシステムで同期された操作が続きます。

まず、単純なPollerを使用してInboundChannelAdapterを定義します。

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

Poller には、前に説明したように、 TransactionManager、への参照が含まれています。 さらに、TransactionSynchronizationFactoryへの参照も含まれています。 このコンポーネントは、ファイルシステム操作をトランザクションと同期するためのメカニズムを提供します。

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
      new ExpressionEvaluatingTransactionSynchronizationProcessor();

    SpelExpressionParser spelParser = new SpelExpressionParser();
 
    processor.setAfterCommitExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
 
    processor.setAfterRollbackExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

    return new DefaultTransactionSynchronizationFactory(processor);
}

トランザクションがコミットされると、 TransactionSynchronizationFactory は、ファイル名に「.PASSED」を追加してファイルの名前を変更します。 ただし、ロールバックすると「.FAILED」が追加されます。

InputChannel は、 FileToStringTransformer を使用してペイロードを変換し、toServiceChannelに委任します。 このチャネルはServiceActivatorにバインドされています。

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}
    
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
    return new FileToStringTransformer();
}

ServiceActivator は、学生の試験結果を含む受信ファイルを読み取ります。 結果をデータベースに書き込みます。 結果に文字列「fail」が含まれている場合、 Exception がスローされ、データベースがロールバックされます。

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

    jdbcTemplate.update("insert into STUDENT values(?)", payload);

    if (payload.toLowerCase().startsWith("fail")) {
        log.error("Service failure. Test result: {} ", payload);
        throw new RuntimeException("Service failure.");
    }

    log.info("Service success. Test result: {}", payload);
}

データベース操作が正常にコミットまたはロールバックされた後、TransactionSynchronizationFactoryはファイルシステム操作をその結果と同期します。

6. 結論

この記事では、 SpringIntegrationフレームワークでのトランザクションサポートについて説明しました。 さらに、トランザクションをファイルシステムなどの非トランザクションリソースの操作と同期させる方法を示しました。

この例の完全なソースコードは、GitHubでから入手できます。