開発者ドキュメント

spring-boot-kotlin-coroutines

Kotlinコルーチンを使用したノンブロッキングスプリングブーツ

1. 概要

link:/kotlin-coroutines[Kotlin Coroutines]は、多くの場合、リアクティブでコールバックが多いコードを読みやすくします。
このチュートリアルでは、これらのコルーチンを活用して、ノンブロッキングlink:/spring-boot[Spring Boot]アプリケーションを構築する方法を説明します。 また、リアクティブアプローチとコルーチンアプローチを比較します。

2. コルーチンの動機

今日では、システムが数千または数百万ものリクエストを処理するのが一般的です。 その結果、開発の世界は、ノンブロッキング計算とリクエスト処理に向かっています。 *コアスレッドからI / O操作をオフロードすることにより、システムリソースを効率的に利用します。**従来のスレッドごとのアプローチ*と比較して、より多くの要求を処理できます。
非同期処理は簡単なタスクではなく、エラーが発生しやすくなります。 幸いなことに、Java link:/java-completablefuture[_CompletableFutures_]のようなこの複雑さに対処するためのツール、またはlink:/rxjava-tutorial[RxJava]のようなリアクティブライブラリがあります。 実際、https://www.baeldung.com/spring-tutorial [Spring framework]は、https://www.baeldung.com/spring-reactor [Reactor]およびhttps://www.baeldungを使用した事後対応アプローチを既にサポートしています。 .com / spring-webflux [WebFlux]フレームワーク。
非同期コードは読みにくい場合がありますが、* https://www.baeldung.com/kotlin [Kotlin language]はコルーチンの概念を提供し、同時および非同期コードを順次スタイルで記述できるようにします*。
コルーチンは非常に柔軟であるため、ジョブとスコープを介してタスクの実行をより詳細に制御できます。 それに加えて、* Kotlinコルーチンは、既存のJavaノンブロッキングフレームワークと完全に並行して動作します*。
Springは、バージョン5.2からKotlinコルーチンをサポートします。

3. プロジェクトのセットアップ

必要な依存関係を追加することから始めましょう。
このチュートリアルで使用する依存関係のほとんどにはまだ安定したリリースがないため、スナップショットとマイルストーンリポジトリを含める必要があります。
<pluginRepositories>
    <pluginRepository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/snapshot</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </pluginRepository>
    <pluginRepository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/milestone</url>
    </pluginRepository>
</pluginRepositories>
非同期クライアントサーバーイベント駆動型フレームワークであるlink:/netty[Netty]フレームワークを使用してみましょう。 リアクティブWebサーバーの組み込み実装として_NettyWebServer_を使用します。
さらに、*バージョン3.0以降、Servlet仕様*では、*アプリケーションを非ブロッキング方式で処理する*サポートが導入されています。 そのため、https://www.baeldung.com/jetty-embedded [Jetty]またはlink:/tomcat[Tomcat]のようなサーブレットコンテナも使用できます。
Spring Boot経由のSpring 5.2を含むこれらのバージョンを使用してみましょう。
<properties>
    <kotlin.version>1.3.31</kotlin.version>
    <r2dbc.version>1.0.0.M1</r2dbc.version>
    <r2dbc-spi.version>1.0.0.M7</r2dbc-spi.version>
    <h2-r2dbc.version>1.0.0.BUILD-SNAPSHOT</h2-r2dbc.version>
    <kotlinx-coroutines.version>1.2.1</kotlinx-coroutines.version>
    <spring-boot.version>2.2.0.M2</spring-boot.version>
</properties>
次に、非同期処理をWebFluxに依存しているため、https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux [_spring-boot-starter-webflux_を使用することが非常に重要です。 ]代わりに_spring-boot-starter-web_ *。 したがって、この依存関係を_pom.xml_に含める必要があります。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
次に、リアクティブデータベースアクセスをサポートするために* R2DBC依存関係を追加します*。
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-h2</artifactId>
    <version>${h2-r2dbc.version}</version>
</dependency>
<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-spi</artifactId>
    <version>${r2dbc-spi.version}</version>
</dependency>
最後に、我々はKotlinコアとコルーチンの依存関係を追加します:
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>${kotlinx-coroutines.version}</version>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
    <version>${kotlinx-coroutines.version}</version>
</dependency>

4. コルーチンを使用したSpring Data R2DBC

このセクションでは、リアクティブスタイルとコルーチンスタイルの両方でデータベースにアクセスすることに焦点を当てます。

4.1. リアクティブR2DBC

リアクティブリレーショナルデータベースクライアントから始めましょう。 簡単に言えば、* R2DBCは、データベースベンダーによって実装されるリアクティブAPIを宣言するAPI仕様です*。
データストアは、インメモリlink:/spring-boot-h2-database[H2データベース]によって強化されます。 さらに、PostgreSQLおよびMicrosoft SQL用の*リアクティブリレーショナルドライバが利用可能です*。
最初に、リアクティブアプローチを使用して簡単なリポジトリを実装しましょう。
@Repository
class ProductRepository(private val client: DatabaseClient) {

    fun getProductById(id: Int): Mono<Product> {
        return client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
    }

    fun addNewProduct(name: String, price: Float): Mono<Void> {
        return client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
    }

    fun getAllProducts(): Flux<Product> {
        return client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
    }
}
ここでは、* non-blocking _DatabaseClient_ *を使用して、データベースに対してクエリを実行しています。 それでは、サスペンド関数と対応するKotlin型を使用して、リポジトリクラスを書き換えましょう。

4.2. コルーチンを使用したR2DBC

関数をリアクティブからコルーチンAPIに変換するには、関数定義の前に中断修飾子を追加します。
fun noResultFunc(): Mono<Void>
suspend fun noResultFunc()
さらに、_Void_戻り値の型を省略できます。 void以外の結果の場合、定義された型の結果を_Mono_クラスにラップせずに返します。
fun singleItemResultFunc(): Mono<T>
fun singleItemResultFunc(): T?
次に、ソースが複数のアイテムを放出する可能性がある場合、次のように_Flux_を_Flow_に変更します。
fun multiItemsResultFunc(): Flux<T>
fun mutliItemsResultFunc(): Flow<T>
これらのルールを適用して、リポジトリをリファクタリングしましょう。
@Repository
class ProductRepositoryCoroutines(private val client: DatabaseClient) {

    suspend fun getProductById(id: Int): Product? =
        client.execute()
          .sql("SELECT * FROM products WHERE id = $1")
          .bind(0, id)
          .`as`(Product::class.java)
          .fetch()
          .one()
          .awaitFirstOrNull()

    suspend fun addNewProduct(name: String, price: Float) =
        client.execute()
          .sql("INSERT INTO products (name, price) VALUES($1, $2)")
          .bind(0, name)
          .bind(1, price)
          .then()
          .awaitFirstOrNull()

    @FlowPreview
    fun getAllProducts(): Flow<Product> =
        client.select()
          .from("products")
          .`as`(Product::class.java)
          .fetch()
          .all()
          .asFlow()
}
上記のスニペットには、注意が必要ないくつかのポイントがあります。 これらの_await * _関数はどこから来たのですか? これらは、_kotlin-coroutines-reactive_ライブラリの* Kotlin拡張関数として定義されています*。
さらに、https://github.com/spring-projects/spring-data-r2dbc [_spring-data-r2dbc_ library]で利用可能な拡張機能がさらにあります。

5. Spring WebFluxコントローラー

これまで、リポジトリの実装方法を見てきましたが、データストアへの実際のクエリはまだ行っていません。 したがって、このセクションでは、ノンブロッキングコントローラーを作成して、Spring WebFluxフレームワークにコルーチンを適用する方法を見つけます。

5.1. リアクティブコントローラー

リポジトリを介してデータベースにクエリを実行する2つの単純なエンドポイントを定義しましょう。
より身近なリアクティブスタイルから始めましょう。
@RestController
class ProductController {
    @Autowired
    lateinit var productRepository: ProductRepository

    @GetMapping("/{id}")
    fun findOne(@PathVariable id: Int): Mono<Product> {
        return productRepository.getProductById(id)
    }

    @GetMapping("/")
    fun findAll(): Flux<Product> {
        return productRepository.getAllProducts()
    }
}
これにより、「実際のI / O操作を実行するスレッドはどれですか?」という疑問が生じます。デフォルトでは、各クエリの操作は、基礎となるスケジューラー実装によって選択された個別のリアクターNIOスレッドで実行されます。

5.2. コルーチンを備えたコントローラー

一時停止機能を活用し、対応するリポジトリクラスを使用して、コントローラーをリファクタリングしましょう。
@RestController
class ProductControllerCoroutines {
    @Autowired
    lateinit var productRepository: ProductRepositoryCoroutines

    @GetMapping("/{id}")
    suspend fun findOne(@PathVariable id: Int): Product? {
        return productRepository.getProductById(id)
    }

    @FlowPreview
    @GetMapping("/")
    fun findAll(): Flow<Product> {
        return productRepository.getAllProducts()
    }
}
まず、_findAll()_機能が停止ものではないことに注意してください。 ただし、_Flowを返す限り、_は内部的に中断関数を呼び出します。
このバージョンでは、データベースクエリは*リアクティブな例と同じリアクタスレッドで実行されます*。

6. Spring WebFlux WebClient

次に、システムにlink:/spring-cloud-tutorial[microservices]があることを想像してください。
リクエストを完了するには、別のサービスにクエリを実行して追加のデータを取得する必要があります。 したがって、この場合の良い例は、製品の在庫数量を取得することです。 APIを介して別のサービスを呼び出すには、WebFluxフレームワークの_https://www.baeldung.com/spring-5-webclient [WebClient] _を使用します。

6.1. リアクティブ_WebClient_

まず、簡単なリクエストの作成方法を見てみましょう。
val htmlResponse = webClient.get()
  .uri("link:/")
  .retrieve().bodyToMono<String>()
次に、外部在庫サービスを呼び出して在庫数を取得し、結合された結果をクライアントに返します。 最初に、リポジトリから製品を取得し、ストックサービスを照会します。
@GetMapping("/{id}/stock")
fun findOneInStock(@PathVariable id: Int): Mono<ProductStockView> {
   val product = productRepository.getProductById(id)

   val stockQuantity = webClient.get()
     .uri("/stock-service/product/$id/quantity")
     .accept(MediaType.APPLICATION_JSON)
     .retrieve()
     .bodyToMono<Int>()
   return product.zipWith(stockQuantity) {
       productInStock, stockQty ->
         ProductStockView(productInStock, stockQty)
   }
}
リポジトリから_Mono <Product> _タイプのオブジェクトを返していることに注意してください。 次に、_WebClient_から_Mono <Int> _を取得します。 最後に、https://www.baeldung.com/reactor-combine-streams [_zipWith()_]メソッドを呼び出すと、実際のサブスクリプションが発生します。 両方のリクエストが完了するのを待って、最終的にそれらを新しいオブジェクトに結合します。

6.2. コルーチンを使用した_WebClient_

ここで、_WebClient_をコルーチンで使用する方法を見てみましょう。
GETリクエストを実行するには、_awaitExchange()_および_awaitBody()_の拡張機能を一時停止します。
val htmlResponse = webClient.get()
  .uri("link:/")
  .awaitExchange()
  .awaitBody<String>()
マイクロサービスの例に戻りましょう。 ストックサービスに対してリクエストを実行できます。
@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView {
    val product = productRepository.getProductById(id)
    val quantity = webClient.get()
      .uri("/stock-service/product/$id/quantity")
      .accept(APPLICATION_JSON)
      .awaitExchange()
      .awaitBody<Int>()
    return ProductStockView(product!!, quantity)
}
これはブロッキングコードのように見えることに注意してください。 コルーチンを使用する主な利点の1つは、非同期コードを流かつ読み取り可能な方法で作成できることです。
上記の例では、データベースクエリとWebリクエストが次々に実行されます。 これは、*コルーチンがデフォルトでシーケンシャルであるためです*。
中断機能を並行して実行できますか? 絶対に! クエリを並行して実行するためにエンドポイントメソッドを変更しましょう。
@GetMapping("/{id}/stock")
suspend fun findOneInStock(@PathVariable id: Int): ProductStockView = coroutineScope {
    val product: Deferred<Product?> = async(start = CoroutineStart.LAZY) {
        productRepository.getProductById(id)
    }
    val quantity: Deferred<Int> = async(start = CoroutineStart.LAZY) {
        webClient.get()
          .uri("/stock-service/product/$id/quantity")
          .accept(APPLICATION_JSON)
          .awaitExchange().awaitBody<Int>()
    }
    ProductStockView(product.await()!!, quantity.await())
}
ここでは、_async \ {} _ブロックで一時停止関数をラップすることにより、_Deferred <> _タイプのオブジェクトを取得します。 デフォルトでは、*コルーチンはすぐに実行がスケジュールされます*。 その結果、_await()_メソッドが呼び出されたときに正確に実行するには、to_CoroutineStart.LAZY_をオプションの_start_パラメーターとして渡す必要があります。
最後に、関数の実行を開始するために、_await()_メソッドを呼び出します。 その場合、2つの関数は並行して実行されます。 この手法は、_並列分解とも呼ばれます。
非同期ブロックの関数は、*個別のワーカースレッドにディスパッチされることに注意するのは興味深いです。 その後、* Reactor NIOプールのスレッドで*実際のI / O操作が発生します*。
構造化された同時実行を強制するために、_coroutineScope \ {} _ scoping関数を使用して独自のスコープを作成しました。 ブロック内のすべてのコルーチンが完了するのを待ってから完了します。 ただし、_coroutineScope \ {} _関数はhttps://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html[runBlocking]と比較して現在のスレッドをブロックしません。

7. WebFlux.fn DSLルート

最後に、https://www.baeldung.com/spring-webflux-kotlin [DSL Routes definition]でコルーチンを使用する方法を見てみましょう。
Kotlinを搭載したlink:/spring-5-functional-web[WebFlux Functional Framework]は、エンドポイントを定義するための簡潔で流fluentな方法を提供します。 _https://docs.spring.io/spring-framework/docs/5.2.0.M1/kdoc-api/spring-framework/org.springframework.web.reactive.function.server/co-router.html [coRouter \ {}] _ DSLは、ルーター機能を定義するためにKotlinコルーチンをサポートしています。
まず、DSLの方法でルーターエンドポイントを定義しましょう。
@Configuration
class RouterConfiguration {
    @FlowPreview
    @Bean
    fun productRoutes(productsHandler: ProductsHandler) = coRouter {
        GET("/", productsHandler::findAll)
        GET("/{id}", productsHandler::findOne)
        GET("/{id}/stock", productsHandler::findOneInStock)
    }
}
これでルート定義ができたので、_ProductController_と同じ機能を持つ_ProductsHandler_を実装しましょう:
@Component
class ProductsHandler(
  @Autowired var webClient: WebClient,
  @Autowired var productRepository: ProductRepositoryCoroutines) {

    @FlowPreview
    suspend fun findAll(request: ServerRequest): ServerResponse =
        ServerResponse.ok().json().bodyAndAwait(productRepository.getAllProducts())

    suspend fun findOneInStock(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        val product: Deferred<Product?> = GlobalScope.async {
            productRepository.getProductById(id)
        }
        val quantity: Deferred<Int> = GlobalScope.async {
            webClient.get()
              .uri("/stock-service/product/$id/quantity")
              .accept(MediaType.APPLICATION_JSON)
              .awaitExchange().awaitBody<Int>()
        }
        return ServerResponse.ok()
          .json()
          .bodyAndAwait(ProductStockView(product.await()!!, quantity.await()))
    }

    suspend fun findOne(request: ServerRequest): ServerResponse {
        val id = request.pathVariable("id").toInt()
        return ServerResponse.ok()
          .json()
          .bodyAndAwait(productRepository.getProductById(id)!!)
    }
}
_ProductsHandler_クラスを定義するために中断関数を使用したことに注意してください。 要求タイプと応答タイプを除いて、コントローラーと比べてあまり変更はありません。
簡単なRESTコントローラーをセットアップするために必要なのはこれだけです。 その結果、KotlinコルーチンとともにRoutes DSLを使用したおかげで、流endpointかつ簡潔なエンドポイント定義ができました。

8. 結論

この記事では、* Kotlinコルーチンを調査し、特にSpringフレームワーク*、R2DBC、およびWebFluxと統合する方法を見つけました。
プロジェクトにノンブロッキングアプローチを適用すると、アプリケーションのパフォーマンスとスケーラビリティが向上する場合があります。 さらに、* Kotlinコルーチンを使用すると、非同期コードがより読みやすくなる*ことも確認しました。
上記のライブラリの開発中バージョンは、安定したリリースに到達する前に大幅に変更される可能性があり、マイナーバージョンの違いは互いに互換性がない可能性があることに注意してください。
例のコードは、いつものようにhttps://github.com/eugenp/tutorials/tree/master/spring-boot-kotlin[GitHub]で入手できます。
モバイルバージョンを終了