R2DBC –リアクティブリレーショナルデータベース接続

1. 概要

このチュートリアルでは、R2DBCを使用して、*リアクティブな方法でデータベース操作を実行する方法*を示します。
R2DBCを探索するために、単一のエンティティに対してCRUD操作を実装する単純なSpring WebFlux RESTアプリケーションを作成し、その目的を達成するために非同期操作のみを使用します。

2. R2DBCとは何ですか?

リアクティブ開発が増加しており、新しいフレームワークが毎日登場し、既存のフレームワークの採用が増えています。 ただし、事後対応型開発の主要な問題は、* Java / JVMの世界でのデータベースアクセスは基本的に同期のまま*であるという事実です。 これは、JDBCの設計方法の直接的な結果であり、これら2つの根本的に異なるアプローチを適応させるためのtoいハックをもたらしました。
Javaでの非同期データベースアクセスの必要性に対処するために、2つの標準が登場しました。 最初の1つであるADBC(Asynchronous Database Access API)はOracleによって支援されていますが、この記事の執筆時点では、明確なタイムラインがなく、やや停滞しているようです。
ここで取り上げる2つ目は、R2DBC(Reactive Relational Database Connectivity)です。これは、Pivo​​talや他の企業のチームが率いるコミュニティの取り組みです。 このプロジェクトはまだベータ版ですが、より活力があり、Postgres、H2、およびMSSQLデータベースのドライバーを既に提供しています。

3. プロジェクトのセットアップ

プロジェクトでR2DBCを使用するには、コアAPIと適切なドライバーに依存関係を追加する必要があります。 この例では、H2を使用するため、2つの依存関係のみを意味します。
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>0.8.0.M7</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>0.8.0.M7</version>
</dependency>
Maven CentralにはまだR2DBCアーティファクトがないため、Springのリポジトリをプロジェクトに追加する必要があります。
<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
   </repository>
   <repository>
       <id>spring-snapshots</id>
       <name>Spring Snapshots</name>
       <url>https://repo.spring.io/snapshot</url>
       <snapshots>
           <enabled>true</enabled>
       </snapshots>
    </repository>
</repositories>

4. 接続ファクトリーのセットアップ

R2DBCを使用してデータベースにアクセスするために最初に必要なことは、_ConnectionFactoryオブジェクトを作成することです*。これは、JDBCの_DataSource._と同様の役割を果たします。
このクラスには、_ConnectionFactoryOptions_オブジェクトを取り、__ ConnectionFactoryを返す静的メソッドがあります。 __必要なのは_ConnectionFactory_の1つのインスタンスだけなので、後で必要に応じてインジェクション経由で使用できる_ @ Bean_を作成しましょう。
@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }
    return ConnectionFactories.get(ob.build());
}
ここでは、_ @ ConfigurationProperties_アノテーションで装飾されたヘルパークラスから受け取ったオプションを取得し、_ConnectionFactoryOptions_インスタンスを設定します。 R2DBCにデータを入力するために、_Option_と値を取る単一の__option __methodを使用してビルダーパターンを実装します。
R2DBCでは、上記で使用した_USERNAME_や__PASSWORD __などの多数の既知のオプションを定義しています。 これらのオプションを設定する別の方法は、connection_ConnectionFactoryOptions_クラスの_parse()_メソッドに接続文字列を渡すことです。
一般的なR2DBC接続URLの例を次に示します。
r2dbc:h2:mem://./testdb
この文字列をコンポーネントに分けましょう。
  • r2dbc:R2DBC URLの固定スキーム識別子-別の有効なスキーム
    SSLで保護された接続に使用される_rd2bcs_

  • h2:適切な接続を見つけるために使用されるドライバー識別子
    工場

  • mem:ドライバー固有のプロトコル–この例では、これは
    インメモリデータベース

  • _ //。/ testdb_:通常はホストを含​​むドライバー固有の文字列、
    データベース、および追加のオプション。

    オプションの準備が整ったら、_get()_ staticファクトリメソッドに渡して__ConnectionFactory __beanを作成します。

5. ステートメントの実行

JDBCと同様に、R2DBCの使用は、主にSQLステートメントをデータベースに送信し、結果セットを処理することに関するものです。 *ただし、R2DBCはリアクティブAPIであるため、__ Publisher __and _Subscriber_ *などのリアクティブストリームタイプに大きく依存しています。
これらのタイプを直接使用するのは少し面倒なので、よりクリーンで簡潔なコードを書くのに役立つ__Mono __and _Flux_のようなプロジェクトリアクターのタイプを使用します。
次のセクションでは、単純な_Account_クラスのリアクティブDAOクラスを作成して、データベース関連のタスクを実装する方法を見ていきます。 このクラスには3つのプロパティのみが含まれ、データベースに対応するテーブルがあります。
public class Account {
    private Long id;
    private String iban;
    private BigDecimal balance;
    // ... getters and setters omitted
}

5.1. 接続する

データベースにステートメントを送信する前に、* Connection_インスタンス*が必要です。 _ConnectionFactory_を作成する方法は既に見たので、_Connection_を取得するためにそれを使用することは驚くことではありません。 覚えておく必要があるのは、通常の_Connection_を取得する代わりに、単一の_Connection._の__Publisher ___を取得することです。
通常のSpring _ @ Component_である_ReactiveAccountDao_は、コンストラクターの注入を介して_ConnectionFactory_を取得するため、ハンドラーメソッドで簡単に使用できます。
_findById()_メソッドの最初の数行を見て、a__Connection_を取得して使用を開始する方法を見てみましょう。
public Mono<Account>> findById(Long id) {
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // use the connection
      )
      // ... downstream processing omitted
}
ここでは、_ConnectionFactory_から返された_Publisher_を、イベントストリームの初期ソースである_Mono_に適合させています。

5.1. 声明の準備と提出

_Connection_ができたので、それを使用して_Statement_を作成し、パラメーターをバインドします。
.flatMap( c ->
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
 )
_Connection_のメソッド_createStatement_は、SQLクエリ文字列を受け取ります。このクエリ文字列には、オプションで「マーカー」と呼ばれるバインドプレースホルダーを含めることができますhttps://r2dbc.io/spec/0.8.0.M8/spec/html /#statements.parameterized [仕様内]。
ここで注意すべき点がいくつかあります。まず、* _ createStatement_は同期操作*であり、流なスタイルを使用して値を返された__Statementにバインドできます。 __秒、非常に重要な、*プレースホルダー/マーカー構文はベンダー固有です!*
この例では、パラメータをマークするために_ $ n_を使用するH2固有の構文を使用しています。 他のベンダーは、_:param_、_ @ Pn_、またはその他の規則など、異なる構文を使用する場合があります。 *これは、レガシーコードをこの新しいAPIに移行する際に注意する必要がある重要な側面です*。
流processなAPIパターンと単純化された入力により、バインディングプロセス自体は非常に簡単です。
__bind()__に渡される最初のパラメーターは、ステートメント内のマーカーの配置に対応するゼロベースの序数にすることも、実際のマーカーを含む文字列にすることもできます。
すべてのパラメータに値を設定したら、_execute()_を呼び出して、__ Publisher __of __Result __objectsを返します。これをさらに処理するためにa again__Mono ___にラップします。 _doFinally()_ハンドラーをthis __Mono ___にアタッチして、ストリーム処理が正常に完了したかどうかに関係なく接続を閉じるようにします。

5.2. 処理結果

パイプラインの次のステップでは、* _ Result_オブジェクトを処理し、_ResponseEntity <Account> _インスタンスのストリームを生成します*。
指定された_id_を持つインスタンスは1つだけであることがわかっているため、実際には_Mono_ストリームを返します。 実際の変換は、受信した_Result_の_map()_メソッドに渡された関数内で行われます。
.map(result -> result.map((row, meta) ->
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));
結果の_map()_メソッドには、2つのパラメーターを受け取る関数が必要です。 1つ目は、各列の値を収集し、__Account ___instanceを設定するために使用する_Row_オブジェクトです。 2番目の_meta_は、列名や型など、現在の行に関する情報を含む__RowMetadata __objectです。
パイプラインの前の_map()_呼び出しは_Mono <Producer <Account >> _に解決されますが、このメソッドから_Mono <Account> _を返す必要があります。 これを修正するために、最後の_flatMap()_ステップを追加し、_Producer_をa__Mono._に適合させます。

5.3. バッチステートメント

R2DBCは、単一の__execute()__callで複数のSQLステートメントを実行できるステートメントバッチの作成と実行もサポートしています。 通常のステートメントとは対照的に、*バッチステートメントはバインディングをサポートしていません*。主にETLジョブなどのシナリオでパフォーマンス上の理由で使用されます。
サンプルプロジェクトでは、ステートメントのバッチを使用して_Account_テーブルを作成し、テストデータを挿入します。
@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c ->
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}
ここでは、__Batch ___returned from _createBatch()_を使用し、いくつかのSQLステートメントを追加します。 次に、_Statement_インターフェースで使用可能な同じ_execute()_メソッドを使用して、これらのステートメントを送信します。
この特定のケースでは、結果に関心はありません。文がすべて正常に実行されるだけです。 生成された結果が必要だった場合は、このストリームにダウンストリームステップを追加して、発行された_Result_オブジェクトを処理するだけです。

6. トランザクション

このチュートリアルで扱う最後のトピックは、トランザクションです。 これから予想されるように、JDBCと同様に、つまり__Connection ____objectで利用可能なメソッドを使用して、トランザクションを管理します。
前と同じように、主な違いは、*すべてのトランザクション関連のメソッドが非同期*であり、適切なポイントでストリームに追加する必要がある_Publisher_を返すことです。
サンプルプロジェクトでは、_createAccount()_メソッドの実装でトランザクションを使用しています。
public Mono<Account> createAccount(Account account) {
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) ->
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));
}
ここでは、トランザクション関連の呼び出しを2つのポイントに追加しました。 まず、データベースから新しい接続を取得した直後に、_beginTransactionMethod()_を呼び出します。 トランザクションが正常に開始されたことを確認したら、_insert_ステートメントを準備して実行します。
今回は、_returnGeneratedValues()_メソッドも使用して、この新しい_Account_に対して生成されたID値をデータベースに返すように指示しました。 R2DBCは、生成されたすべての値を含む単一行を含む__Result values__でこれらの値を返します。これを使用して、_Account_インスタンスを作成します。
ここでも、着信_Mono <Publisher <Account >> _を_Mono <Account> _に適合させる必要があるため、_flatMap()_を追加してthis__を解決します。 __次に、_delayUntil()_ステップでトランザクションをコミットします。 これが必要なのは、返された___Account ___hasがすでにデータベースにコミットされていることを確認したいからです。
最後に、返された_Mono_からのすべてのイベントが消費されたときに_Connection_を閉じるこのパイプラインに_doFinally_ステップをアタッチします。

7. サンプルDAOの使用法

リアクティブDAOができたので、これを使用して単純なlink:/spring-webflux[Spring WebFlux]アプリケーションを作成し、一般的なアプリケーションでの使用方法を紹介します。 このフレームワークは既にリアクティブ構成をサポートしているため、これは簡単なタスクになります。 たとえば、_GET_メソッドの実装を見てみましょう。
@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
    // ... other methods omitted
}
ここでは、DAOから返された_Mono_を使用して、適切なステータスコードで_ResponseEntity_を構築しています。 これは、特定のIDの_Account_がない場合に_NOT_FOUND_(404)__ __statusコードが必要なためです。

8. 結論

この記事では、R2DBCを使用したリアクティブデータベースアクセスの基本について説明しました。 まだ初期段階ですが、このプロジェクトは急速に進化しており、2020年初頭のリリース日を目標にしています。
Java 12の一部ではないADBAと比較すると、R2DBCはより有望であり、いくつかの人気のあるデータベースのドライバーを既に提供しているようです。
いつものように、このチュートリアルで使用される完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/persistence-modules/r2dbc[Github上]で入手できます。