1. 概要

このチュートリアルでは、R2DBCを使用してリアクティブな方法でデータベース操作を実行する方法を示します。

R2DBCを探索するために、単一のエンティティに対してCRUD操作を実装する単純なSpring WebFlux RESTアプリケーションを作成し、その目標を達成するために非同期操作のみを使用します。

2. R2DBC とは何ですか?

リアクティブな開発が増えており、新しいフレームワークが毎日登場し、既存のフレームワークの採用が増えています。 ただし、リアクティブ開発の主な問題は、Java/JVMの世界でのデータベースアクセスが基本的に同期のままであるという事実です。 これは、JDBCが設計された方法の直接的な結果であり、これら2つの根本的に異なるアプローチを適応させるための醜いハックにつながりました。

Javaランドでの非同期データベースアクセスの必要性に対処するために、2つの標準が登場しました。 最初のADBC(Asynchronous Database Access API)は、Oracleの支援を受けていますが、この記事の執筆時点では、明確なタイムラインがなく、やや行き詰まっているようです。

ここで取り上げる2つ目は、R2DBC(Reactive Relational Database Connectivity)です。これは、Pivotalや他の企業のチームが主導するコミュニティの取り組みです。 まだベータ版であるこのプロジェクトは、より活力を示しており、Postgres、H2、およびMSSQLデータベースのドライバーをすでに提供しています。

3. プロジェクトの設定

プロジェクトでR2DBCを使用するには、コアAPIと適切なドライバーに依存関係を追加する必要があります。 この例ではH2を使用するため、これは2つの依存関係のみを意味します。

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>0.8.0.M7</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>0.8.0.M7</version>
</dependency>

Maven Centralには今のところまだR2DBCアーティファクトがないため、Springのリポジトリをいくつかプロジェクトに追加する必要があります。

<repositories>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
   </repository>
   <repository>
       <id>spring-snapshots</id>
       <name>Spring Snapshots</name>
       <url>https://repo.spring.io/snapshot</url>
       <snapshots>
           <enabled>true</enabled>
       </snapshots>
    </repository>
</repositories>

4. 接続ファクトリセットアップ

R2DBCを使用してデータベースにアクセスするために最初に行う必要があるのは、 ConnectionFactoryオブジェクトを作成することです。これは、JDBCのDataSourceと同様の役割を果たします。を作成する最も簡単な方法] ConnectionFactory は、ConnectionFactoriesクラスを介して行われます。

このクラスには、 ConnectionFactoryOptions オブジェクトを返し、 ConnectionFactory。 必要なのは1つのインスタンスだけなので ConnectionFactory 、作成しましょう @ Bean 後で必要な場所でインジェクションを介して使用できます。

@Bean
public ConnectionFactory connectionFactory(R2DBCConfigurationProperties properties) {
    ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(properties.getUrl());
    Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
    if (!StringUtil.isNullOrEmpty(properties.getUser())) {
        ob = ob.option(USER, properties.getUser());
    }
    if (!StringUtil.isNullOrEmpty(properties.getPassword())) {
        ob = ob.option(PASSWORD, properties.getPassword());
    }        
    return ConnectionFactories.get(ob.build());    
}

ここでは、 @ConfigurationProperties アノテーションで装飾されたヘルパークラスから受け取ったオプションを取得し、ConnectionFactoryOptionsインスタンスにデータを入力します。 これを設定するために、R2DBCは、Optionと値を受け取る単一のoptionメソッドを使用してビルダーパターンを実装します。

R2DBCは、上記で使用したUSERNAMEPASSWORDなど、多くのよく知られたオプションを定義しています。 これらのオプションを設定する別の方法は、接続文字列を ConnectionFactoryOptionsクラスのparse()メソッドに渡すことです。

典型的なR2DBC接続URLの例を次に示します。

r2dbc:h2:mem://./testdb

この文字列をコンポーネントに分割してみましょう。

  • r2dbc :R2DBC URLの固定スキーム識別子—別の有効なスキームは rd2bcs で、SSLで保護された接続に使用されます
  • h2 :適切な接続ファクトリを見つけるために使用されるドライバー識別子
  • mem :ドライバー固有のプロトコル—この場合、これはインメモリデータベースに対応します
  • //./ testdb :ドライバー固有の文字列。通常、ホスト、データベース、および追加のオプションが含まれます。

オプションセットの準備ができたら、それを get() static factoryメソッドに渡して、 ConnectionFactorybeanを作成します。

5. ステートメントの実行

JDBCと同様に、R2DBCの使用は、主にSQLステートメントをデータベースに送信して結果セットを処理することです。 ただし、R2DBCはリアクティブAPIであるため、パブリッシャーやサブスクライバーなどのリアクティブストリームタイプに大きく依存します。

これらの型を直接使用するのは少し面倒なので、MonoFluxなどのプロジェクトreactorの型を使用して、よりクリーンで簡潔なコードを記述します。

次のセクションでは、単純な Account クラスのリアクティブDAOクラスを作成して、データベース関連のタスクを実装する方法を説明します。 このクラスには3つのプロパティのみが含まれ、データベースに対応するテーブルがあります。

public class Account {
    private Long id;
    private String iban;
    private BigDecimal balance;
    // ... getters and setters omitted
}

5.1. 接続を取得する

ステートメントをデータベースに送信する前に、接続インスタンスが必要です。 ConnectionFactory を作成する方法はすでに見てきたので、Connectionを取得するためにそれを使用するのは当然です。 覚えておかなければならないのは、通常の接続を取得する代わりに、単一の接続のパブリッシャーを取得することです。

通常のSpring@ComponentであるReactiveAccountDaoは、コンストラクターインジェクションを介して ConnectionFactory を取得するため、ハンドラーメソッドですぐに利用できます。

findById()メソッドの最初の数行を見て、Connectionを取得して使用を開始する方法を確認しましょう。

public Mono<Account>> findById(Long id) {         
    return Mono.from(connectionFactory.create())
      .flatMap(c ->
          // use the connection
      )
      // ... downstream processing omitted
}

ここでは、ConnectionFactoryから返されたPublisherを、イベントストリームの初期ソースであるMonoに適合させています。

5.1. ステートメントの準備と提出

接続ができたので、それを使用してステートメントを作成し、それにパラメーターをバインドしましょう。

.flatMap( c -> 
    Mono.from(c.createStatement("select id,iban,balance from Account where id = $1")
      .bind("$1", id)
      .execute())
      .doFinally((st) -> close(c))
 )

ConnectionのメソッドcreateStatementは、SQLクエリ文字列を受け取ります。これは、オプションでバインドプレースホルダーを持つことができます—仕様では「マーカー」と呼ばれます。

ここで注目すべき点がいくつかあります。まず、 createStatementは同期操作であり、流暢なスタイルを使用して、返されたステートメントに値をバインドできます。 2番目の、そして非常に重要なプレースホルダー/マーカー構文はベンダー固有です!

この例では、 $nを使用してパラメーターをマークするH2固有の構文を使用しています。 他のベンダーは、:param @Pn 、またはその他の規則など、異なる構文を使用する場合があります。 これは、レガシーコードをこの新しいAPIに移行するときに注意しなければならない重要な側面です。

流暢なAPIパターンと簡略化されたタイピングにより、バインディングプロセス自体は非常に簡単です。すべてのタイピング変換を処理するオーバーロードされたbind()メソッドが1つだけあります —もちろんデータベースルールに従います。

bind()に渡される最初のパラメーターは、ステートメント内のマーカーの配置に対応するゼロベースの序数にすることも、実際のマーカーを含む文字列にすることもできます。

すべてのパラメーターに値を設定したら、 execute()を呼び出します。これにより、ResultオブジェクトのPublisherが返され、にラップされます。 ]Monoさらに処理します。 doFinally()ハンドラーをこの Mono にアタッチして、ストリーム処理が正常に完了したかどうかに関係なく、接続を確実に閉じるようにします。

5.2. 処理結果

パイプラインの次のステップは Resultオブジェクトを処理し、ResponseEntityのストリームを生成しますインスタンス

指定されたidを持つインスタンスは1つだけであることがわかっているため、実際にはMonoストリームを返します。 実際の変換は、受信した Resultmap()メソッドに渡された関数内で行われます。

.map(result -> result.map((row, meta) -> 
    new Account(row.get("id", Long.class),
      row.get("iban", String.class),
      row.get("balance", BigDecimal.class))))
.flatMap(p -> Mono.from(p));

結果のmap()メソッドは、2つのパラメーターを受け取る関数を想定しています。 1つ目は、 Row オブジェクトで、各列の値を収集し、アカウントインスタンスにデータを入力するために使用します。 2番目のmetaは、 RowMetadata オブジェクトであり、列名やタイプなど、現在の行に関する情報が含まれています。

以前地図() パイプラインの呼び出しは、 単核症 >> 、しかし、私たちは返す必要があります単核症この方法から。 これを修正するために、最後の flatMap()ステップを追加します。これは、ProducerMono。に適合させます。

5.3. バッチステートメント

R2DBCは、ステートメントバッチの作成と実行もサポートします。これにより、1回の execute()呼び出しで複数のSQLステートメントを実行できます。 通常のステートメントとは対照的に、バッチステートメントはバインディングをサポートせず、主にETLジョブなどのシナリオでパフォーマンス上の理由で使用されます。

サンプルプロジェクトでは、ステートメントのバッチを使用して Account テーブルを作成し、それにいくつかのテストデータを挿入します。

@Bean
public CommandLineRunner initDatabase(ConnectionFactory cf) {
    return (args) ->
      Flux.from(cf.create())
        .flatMap(c -> 
            Flux.from(c.createBatch()
              .add("drop table if exists Account")
              .add("create table Account(" +
                "id IDENTITY(1,1)," +
                "iban varchar(80) not null," +
                "balance DECIMAL(18,2) not null)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120980198201982',100.00)")
              .add("insert into Account(iban,balance)" +
                "values('BR430120998729871000',250.00)")
              .execute())
            .doFinally((st) -> c.close())
          )
        .log()
        .blockLast();
}

ここでは、 createBatch()から返された Batch を使用して、いくつかのSQLステートメントを追加します。 次に、ステートメントインターフェイスで使用可能なのと同じ execute()メソッドを使用して、これらのステートメントを実行用に送信します。

この特定のケースでは、結果には関心がありません。ステートメントがすべて正常に実行されるだけです。 生成された結果が必要な場合は、このストリームにダウンストリームステップを追加して、発行されたResultオブジェクトを処理するだけです。

6. トランザクション

このチュートリアルで取り上げる最後のトピックはトランザクションです。 これまでに予想されるように、JDBCと同様に、つまりConnectionオブジェクトで使用可能なメソッドを使用してトランザクションを管理します。

以前と同様に、主な違いは、すべてのトランザクション関連のメソッドが非同期になり、適切なポイントでストリームに追加する必要があるPublisherを返すことです。

サンプルプロジェクトでは、 createAccount()メソッドの実装でトランザクションを使用しています。

public Mono<Account> createAccount(Account account) {    
    return Mono.from(connectionFactory.create())
      .flatMap(c -> Mono.from(c.beginTransaction())
        .then(Mono.from(c.createStatement("insert into Account(iban,balance) values($1,$2)")
          .bind("$1", account.getIban())
          .bind("$2", account.getBalance())
          .returnGeneratedValues("id")
          .execute()))
        .map(result -> result.map((row, meta) -> 
            new Account(row.get("id", Long.class),
              account.getIban(),
              account.getBalance())))
        .flatMap(pub -> Mono.from(pub))
        .delayUntil(r -> c.commitTransaction())
        .doFinally((st) -> c.close()));   
}

ここでは、トランザクション関連の呼び出しを2つのポイントで追加しました。 まず、データベースから新しい接続を取得した直後に、 beginTransactionMethod()を呼び出します。 トランザクションが正常に開始されたことを確認したら、insertステートメントを準備して実行します。

今回は、 returnGeneratedValues()メソッドを使用して、この新しいアカウントに対して生成されたID値を返すようにデータベースに指示しました。 R2DBCは、生成されたすべての値を含む単一の行を含む Result でこれらの値を返します。これを使用して、Accountインスタンスを作成します。

もう一度、着信を適応させる必要があります単核症 >>単核症 、だから私たちは追加します flatMap() これを解決するには 。 次に、トランザクションをコミットします delayUntil() ステップ。 これが必要なのは、返されたアカウントがすでにデータベースにコミットされていることを確認するためです。

最後に、 doFinally ステップをこのパイプラインにアタッチして、返されたMonoからのすべてのイベントが消費されたときにConnectionを閉じます。

7. DAOの使用例

リアクティブDAOができたので、それを使用して単純な Spring WebFlux アプリケーションを作成し、一般的なアプリケーションでの使用方法を紹介します。 このフレームワークはすでにリアクティブコンストラクトをサポートしているため、これは簡単な作業になります。 たとえば、GETメソッドの実装を見てみましょう。

@RestController
public class AccountResource {
    private final ReactiveAccountDao accountDao;

    public AccountResource(ReactiveAccountDao accountDao) {
        this.accountDao = accountDao;
    }

    @GetMapping("/accounts/{id}")
    public Mono<ResponseEntity<Account>> getAccount(@PathVariable("id") Long id) {
        return accountDao.findById(id)
          .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
          .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }
    // ... other methods omitted
}

ここでは、DAOから返された Mono を使用して、適切なステータスコードでResponseEntityを構築しています。 これは、指定されたIDのアカウントがない場合に NOT_FOUND (404)ステータスコードが必要なためです。

8. 結論

この記事では、R2DBCを使用したリアクティブデータベースアクセスの基本について説明しました。 このプロジェクトはまだ始まったばかりですが、2020年初頭のリリース日を目標に、急速に進化しています。

間違いなくJava12の一部ではないADBAと比較すると、R2DBCはより有望であり、すでにいくつかの人気のあるデータベースのドライバーを提供しているようです。Oracleはここでは注目に値しません。

いつものように、このチュートリアルで使用される完全なソースコードは、Githubから入手できます。