1. 序章

このチュートリアルでは、Springやその他のツールやフレームワークを使用してJavaでリアクティブシステムを作成するための基本を理解します。

その過程で、リアクティブプログラミングがリアクティブシステムを作成するための単なる推進力である方法について説明します。 これは、リアクティブシステムを作成する理由と、その過程でインスピレーションを得たさまざまな仕様、ライブラリ、および標準を理解するのに役立ちます。

2. リアクティブシステムとは何ですか?

過去数十年にわたって、テクノロジーの展望は、テクノロジーの価値を私たちが見る方法に完全な変革をもたらしたいくつかの混乱を経験してきました。 インターネットの前のコンピューティングの世界は、それが私たちの現在を変える方法と手段を想像することはできませんでした。

インターネットが大衆に届き、インターネットが約束する絶え間なく進化する経験を持っているため、アプリケーションアーキテクトは彼らの需要を満たすために気を配る必要があります。

基本的に、これは、以前の方法でアプリケーションを設計することは決してできないことを意味します。 応答性の高いアプリケーションはもはや贅沢ではなく必需品です

それもまた、ランダムな障害と予測できない負荷に直面しています。 時間の必要性は、正しい結果を得るだけでなく、それを速く得ることです! 私たちが提供することを約束する素晴らしいユーザーエクスペリエンスを推進することは非常に重要です。

これが、リアクティブシステムを提供できるアーキテクチャスタイルの必要性を生み出すものです。

2.1. 反応性マニフェスト

2013年に、Jonas Bonerが率いる開発者チームが集まり、 ReactiveManifestoとして知られるドキュメントで一連のコア原則を定義しました。 これが、リアクティブシステムを作成するためのアーキテクチャスタイルの基礎を築いたものです。 それ以来、このマニフェストは開発者コミュニティから多くの関心を集めています。

基本的に、このドキュメントでは、リアクティブシステムのレシピを、柔軟性があり、緩く結合され、スケーラブルであると規定しています。 これにより、このようなシステムの開発が容易になり、障害に耐え、最も重要なことに応答性が高くなり、すばらしいユーザーエクスペリエンスの基盤となります。

それで、この秘密のレシピは何ですか? まあ、それはほとんど秘密ではありません! マニフェストは、リアクティブシステムの基本的な特性または原則を定義します。

  • Responsive :リアクティブシステムは、迅速で一貫した応答時間を提供し、したがって一貫したサービス品質を提供する必要があります
  • 復元力:レプリケーションと分離によるランダムな障害が発生した場合でも、リアクティブシステムは応答性を維持する必要があります
  • Elastic :このようなシステムは、費用対効果の高いスケーラビリティを通じて、予測できないワークロードの下でも応答性を維持する必要があります
  • メッセージ駆動型:システムコンポーネント間で受け渡される非同期メッセージに依存する必要があります

これらの原則は単純で賢明に聞こえますが、複雑なエンタープライズアーキテクチャに実装するのが常に簡単であるとは限りません。 このチュートリアルでは、これらの原則を念頭に置いて、Javaでサンプルシステムを開発します。

3. リアクティブプログラミングとは何ですか?

先に進む前に、リアクティブプログラミングとリアクティブシステムの違いを理解することが重要です。 私たちはこれらの用語を頻繁に使用し、一方を他方と誤解しがちです。 前に見たように、リアクティブシステムは特定のアーキテクチャスタイルの結果です。

対照的に、リアクティブプログラミングは、非同期および非ブロッキングコンポーネントの開発に焦点を当てたプログラミングパラダイムです。 リアクティブプログラミングの中核は、私たちが観察して反応することができ、背圧をかけることさえできるデータストリームです。 これにより、非ブロッキング実行が可能になり、実行スレッドが少なくなり、スケーラビリティが向上します。

さて、これはリアクティブシステムとリアクティブプログラミングが相互に排他的であることを意味するものではありません。 実際、リアクティブプログラミングはリアクティブシステムを実現するための重要なステップですが、それがすべてではありません。

3.1. リアクティブストリーム

Reactive Streams は、2013年に開始されたコミュニティイニシアチブであり、ノンブロッキングバックプレッシャを使用した非同期ストリーム処理の標準を提供します。 ここでの目的は、必要な操作とエンティティを記述できる一連のインターフェイス、メソッド、およびプロトコルを定義することでした。

それ以来、リアクティブストリーム仕様に準拠する複数のプログラミング言語でのいくつかの実装が登場しました。 これらには、いくつか例を挙げると、Akka Streams、Ratpack、Vert.xが含まれます。

3.2. Java用のリアクティブライブラリ

リアクティブストリームの背後にある当初の目的の1つは、最終的には公式のJava標準ライブラリとして含まれることでした。 その結果、リアクティブストリームの仕様は、Java9で導入されたJavaFlowライブラリと意味的に同等です。

それとは別に、Javaでリアクティブプログラミングを実装するための一般的な選択肢がいくつかあります。

  • Reactive Extensions :一般にReactiveXとして知られ、監視可能なストリームを使用した非同期プログラミング用のAPIを提供します。 これらは、 Rx Java として知られているJavaを含む、複数のプログラミング言語およびプラットフォームで使用できます。
  • Project Reactor :これは別のリアクティブライブラリであり、リアクティブストリームの仕様に基づいており、JVM上で非アプリケーションを構築することを目的としています。 また、Springエコシステムリアクティブスタックの基盤でもあります。

4. シンプルなアプリケーション

このチュートリアルの目的のために、最小限のフロントエンドを備えたマイクロサービスアーキテクチャに基づく単純なアプリケーションを開発します。 アプリケーションアーキテクチャには、リアクティブシステムを作成するのに十分な要素が必要です。

このアプリケーションでは、エンドツーエンドのリアクティブプログラミングやその他のパターンとツールを採用して、リアクティブシステムの基本的な特性を実現します。

4.1. 建築

まず、リアクティブシステムの特性を必ずしも示すとは限らない単純なアプリケーションアーキテクチャを定義することから始めます。 今後は、これらの特性を1つずつ実現するために必要な変更を加えていきます。

したがって、最初に、単純なアーキテクチャを定義することから始めましょう。

これは非常にシンプルなアーキテクチャであり、注文できるコマースのユースケースを促進するための多数のマイクロサービスがあります。 また、ユーザーエクスペリエンスのフロントエンドがあり、すべての通信はRESToverHTTPとして行われます。 さらに、すべてのマイクロサービスは、個々のデータベースでデータを管理します。これは、サービスごとのデータベースと呼ばれる手法です。

次のサブセクションでは、この単純なアプリケーションを作成します。 これは、このアーキテクチャの誤謬と、これをリアクティブシステムに変換できるように原則と実践を採用する方法と手段を理解するためのベースになります。

4.3. 在庫マイクロサービス

在庫マイクロサービスは、製品のリストとその現在の在庫を管理する責任があります。 また、注文の処理時に在庫を変更することもできます。 このサービスの開発には、MongoDBでSpring Bootを使用します。

いくつかのエンドポイントを公開するコントローラーを定義することから始めましょう。

@GetMapping
public List<Product> getAllProducts() {
    return productService.getProducts();
}
 
@PostMapping
public Order processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}
 
@DeleteMapping
public Order revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

およびビジネスロジックをカプセル化するサービス:

@Transactional
public Order handleOrder(Order order) {       
    order.getLineItems()
      .forEach(l -> {
          Product> p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          if (p.getStock() >= l.getQuantity()) {
              p.setStock(p.getStock() - l.getQuantity());
              productRepository.save(p);
          } else {
              throw new RuntimeException("Product is out of stock: " + l.getProductId());
          }
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

@Transactional
public Order revertOrder(Order order) {
    order.getLineItems()
      .forEach(l -> {
          Product p = productRepository.findById(l.getProductId())
            .orElseThrow(() -> new RuntimeException("Could not find the product: " + l.getProductId()));
          p.setStock(p.getStock() + l.getQuantity());
          productRepository.save(p);
      });
    return order.setOrderStatus(OrderStatus.SUCCESS);
}

トランザクション内のエンティティを永続化することに注意してください。これにより、例外が発生した場合に一貫性のない状態が発生することはありません。

これらとは別に、ドメインエンティティ、リポジトリインターフェイス、およびすべてが正しく機能するために必要な一連の構成クラスも定義する必要があります。

ただし、これらはほとんど定型文であるため、これらを使用することは避けます。これらは、この記事の最後のセクションで提供されているGitHubリポジトリで参照できます。

4.4. 配送マイクロサービス

出荷のマイクロサービスもそれほど違いはありません。 これは、注文に対して出荷を生成できるかどうかを確認し、可能であれば作成する責任があります。

前と同じように、エンドポイントを公開するコントローラーを定義します。実際には、単一のエンドポイントのみです。

@PostMapping
public Order process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

注文の出荷に関連するビジネスロジックをカプセル化するサービス:

public Order handleOrder(Order order) {
    LocalDate shippingDate = null;
    if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
      && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
        shippingDate = LocalDate.now().plusDays(1);
    } else {
        throw new RuntimeException("The current time is off the limits to place order.");
    }
    shipmentRepository.save(new Shipment()
      .setAddress(order.getShippingAddress())
      .setShippingDate(shippingDate));
    return order.setShippingDate(shippingDate)
      .setOrderStatus(OrderStatus.SUCCESS);
}

私たちのシンプルな配送サービスは、注文するための有効な時間枠をチェックするだけです。 以前のように、ボイラープレートコードの残りの部分について説明することは避けます。

4.5. マイクロサービスを注文する

最後に、他のものとは別に新しい注文の作成を担当する注文マイクロサービスを定義します。 興味深いことに、オーケストレーターサービスとしても機能し、注文の在庫サービスや配送サービスと通信します。

必要なエンドポイントを使用してコントローラーを定義しましょう。

@PostMapping
public Order create(@RequestBody Order order) {
    Order processedOrder = orderService.createOrder(order);
    if (OrderStatus.FAILURE.equals(processedOrder.getOrderStatus())) {
        throw new RuntimeException("Order processing failed, please try again later.");
    }
    return processedOrder;
}
@GetMapping
public List<Order> getAll() {
    return orderService.getOrders();
}

そして、注文に関連するビジネスロジックをカプセル化するサービス:

public Order createOrder(Order order) {
    boolean success = true;
    Order savedOrder = orderRepository.save(order);
    Order inventoryResponse = null;
    try {
        inventoryResponse = restTemplate.postForObject(
          inventoryServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
    }
    Order shippingResponse = null;
    try {
        shippingResponse = restTemplate.postForObject(
          shippingServiceUrl, order, Order.class);
    } catch (Exception ex) {
        success = false;
        HttpEntity<Order> deleteRequest = new HttpEntity<>(order);
        ResponseEntity<Order> deleteResponse = restTemplate.exchange(
          inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class);
    }
    if (success) {
        savedOrder.setOrderStatus(OrderStatus.SUCCESS);
        savedOrder.setShippingDate(shippingResponse.getShippingDate());
    } else {
        savedOrder.setOrderStatus(OrderStatus.FAILURE);
    }
    return orderRepository.save(savedOrder);
}

public List<Order> getOrders() {
    return orderRepository.findAll();
}

在庫および配送サービスへの電話を調整している注文の処理は、理想からはほど遠いものです。 複数のマイクロサービスを使用した分散トランザクションは、それ自体が複雑なトピックであり、このチュートリアルの範囲を超えています。

ただし、このチュートリアルの後半で、リアクティブシステムが分散トランザクションの必要性をある程度回避する方法を説明します。

前と同じように、ボイラープレートコードの残りの部分については説明しません。 ただし、これはGitHubリポジトリで参照できます。

4.6. フロントエンド

また、ディスカッションを完了するためのユーザーインターフェイスを追加しましょう。 ユーザーインターフェイスはAngularに基づいており、シンプルなシングルページアプリケーションになります。

作成注文とフェッチ注文を処理するには、Angularで単純なコンポーネントを作成する必要があります。 特に重要なのは、APIを呼び出して注文を作成する部分です。

createOrder() {
    let headers = new HttpHeaders({'Content-Type': 'application/json'});
    let options = {headers: headers}
    this.http.post('http://localhost:8080/api/orders', this.form.value, options)
      .subscribe(
        (response) => {
          this.response = response
        },
        (error) => {
          this.error = error
        }
      )
}

上記のコードスニペットは、注文データがフォームにキャプチャされ、コンポーネントのスコープ内で利用可能であることを期待しています。 Angularは、リアクティブフォームとテンプレート駆動型フォームを使用して、単純なフォームから複雑なフォームを作成するための素晴らしいサポートを提供します。

また、以前に作成された注文を取得する部分も重要です。

getOrders() {
  this.previousOrders = this.http.get(''http://localhost:8080/api/orders'')
}

Angular HTTPモジュールは本質的に非同期であるため、RxJSObservablesを返すことに注意してください。 ビューで応答を処理するには、応答を非同期パイプに通します。

<div class="container" *ngIf="previousOrders !== null">
  <h2>Your orders placed so far:</h2>
  <ul>
    <li *ngFor="let order of previousOrders | async">
      <p>Order ID: {{ order.id }}, Order Status: {{order.orderStatus}}, Order Message: {{order.responseMessage}}</p>
    </li>
  </ul>
</div>

もちろん、Angularが機能するにはテンプレート、スタイル、構成が必要ですが、これらはGitHubリポジトリで参照できます。 ここではすべてを1つのコンポーネントにバンドルしていることに注意してください。これは、理想的には実行すべきことではありません。

ただし、このチュートリアルでは、これらの懸念事項は対象外です。

4.7. アプリケーションの展開

アプリケーションの個々の部分をすべて作成したので、それらをどのようにデプロイする必要がありますか? ええと、私たちはいつでもこれを手動で行うことができます。 ただし、すぐに面倒になる可能性があることに注意する必要があります。

このチュートリアルでは、 Docker Compose を使用して、アプリケーションをDockerMachineビルドしてデプロイします。 これには、各サービスに標準のDockerfileを追加し、アプリケーション全体のDockerComposeファイルを作成する必要があります。

このdocker-compose.ymlファイルがどのように見えるか見てみましょう。

version: '3'
services:
  frontend:
    build: ./frontend
    ports:
      - "80:80"
  order-service:
    build: ./order-service
    ports:
      - "8080:8080"
  inventory-service:
    build: ./inventory-service
    ports:
      - "8081:8081"
  shipping-service:
    build: ./shipping-service
    ports:
      - "8082:8082"

これはDockerComposeのサービスのかなり標準的な定義であり、特別な注意は必要ありません。

4.8. このアーキテクチャの問題

複数のサービスが相互作用する単純なアプリケーションが用意できたので、このアーキテクチャの問題について説明できます。 次のセクションで対処しようとしていることがあり、最終的には、アプリケーションをリアクティブシステムに変換したであろう状態に到達します。

このアプリケーションは実稼働グレードのソフトウェアとはほど遠いものであり、いくつかの問題がありますが、リアクティブシステムの動機に関連する問題に焦点を当てます。

  • 在庫サービスまたは配送サービスのいずれかで障害が発生すると、連鎖的な影響が生じる可能性があります
  • 外部システムとデータベースへの呼び出しは、本質的にすべてブロックされています
  • 展開では、障害や変動する負荷を自動的に処理できません

5. リアクティブプログラミング

多くの場合、プログラムで呼び出しをブロックすると、重要なリソースが発生するのを待つだけになります。 これには、データベースコール、Webサービスへのコール、およびファイルシステムコールが含まれます。 この待機から実行のスレッドを解放し、結果が利用可能になったら元に戻すメカニズムを提供できれば、リソースの使用率が大幅に向上します。

これは、リアクティブプログラミングパラダイムを採用することで私たちにもたらされることです。 これらの呼び出しの多くでリアクティブライブラリに切り替えることは可能ですが、すべてで可能であるとは限りません。 幸いなことに、Springを使用すると、MongoDBおよびRESTAPIでリアクティブプログラミングを使用するのがはるかに簡単になります。

Spring Data Mongo は、MongoDB ReactiveStreamsJavaドライバーを介したリアクティブアクセスをサポートしています。 ReactiveMongoTemplateReactiveMongoRepositoryを提供し、どちらも広範なマッピング機能を備えています。

Spring WebFlux は、Spring用のリアクティブスタックWebフレームワークを提供し、ノンブロッキングコードとリアクティブストリームのバックプレッシャを有効にします。 Reactorをリアクティブライブラリとして活用します。 さらに、ReactiveStreamsバックプレッシャーを使用してHTTP要求を実行するためのWebClientを提供します。 HTTPクライアントライブラリとしてReactorNettyを使用します。

5.1. 在庫サービス

まず、エンドポイントを変更して、リアクティブなパブリッシャーを発行します。

@GetMapping
public Flux<Product> getAllProducts() {
    return productService.getProducts();
}
@PostMapping
public Mono<Order> processOrder(@RequestBody Order order) {
    return productService.handleOrder(order);
}

@DeleteMapping
public Mono<Order> revertOrder(@RequestBody Order order) {
    return productService.revertOrder(order);
}

もちろん、サービスにも必要な変更を加える必要があります。

@Transactional
public Mono<Order> handleOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          if (p.getStock() >= q) {
              p.setStock(p.getStock() - q);
              return productRepository.save(p);
          } else {
              return Mono.error(new RuntimeException("Product is out of stock: " + p.getId()));
          }
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

@Transactional
public Mono<Order> revertOrder(Order order) {
    return Flux.fromIterable(order.getLineItems())
      .flatMap(l -> productRepository.findById(l.getProductId()))
      .flatMap(p -> {
          int q = order.getLineItems().stream()
            .filter(l -> l.getProductId().equals(p.getId()))
            .findAny().get()
            .getQuantity();
          p.setStock(p.getStock() + q);
          return productRepository.save(p);
      })
      .then(Mono.just(order.setOrderStatus("SUCCESS")));
}

5.2. 配送サービス

同様に、配送サービスのエンドポイントを変更します。

@PostMapping
public Mono<Order> process(@RequestBody Order order) {
    return shippingService.handleOrder(order);
}

そして、リアクティブプログラミングを活用するためのサービスの対応する変更:

public Mono<Order> handleOrder(Order order) {
    return Mono.just(order)
      .flatMap(o -> {
          LocalDate shippingDate = null;
          if (LocalTime.now().isAfter(LocalTime.parse("10:00"))
            && LocalTime.now().isBefore(LocalTime.parse("18:00"))) {
              shippingDate = LocalDate.now().plusDays(1);
          } else {
              return Mono.error(new RuntimeException("The current time is off the limits to place order."));
          }
          return shipmentRepository.save(new Shipment()
            .setAddress(order.getShippingAddress())
            .setShippingDate(shippingDate));
      })
      .map(s -> order.setShippingDate(s.getShippingDate())
        .setOrderStatus(OrderStatus.SUCCESS));
    }

5.3. 注文サービス

注文サービスのエンドポイントにも同様の変更を加える必要があります。

@PostMapping
public Mono<Order> create(@RequestBody Order order) {
    return orderService.createOrder(order)
      .flatMap(o -> {
          if (OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return Mono.error(new RuntimeException("Order processing failed, please try again later. " + o.getResponseMessage()));
          } else {
              return Mono.just(o);
          }
      });
}

@GetMapping
public Flux<Order> getAll() {
    return orderService.getOrders();
}

Spring WebClient を使用してインベントリと出荷のリアクティブエンドポイントを呼び出す必要があるため、サービスの変更はより複雑になります。

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .flatMap(o -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(o))
            .exchange();
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return webClient.method(HttpMethod.POST)
                .uri(shippingServiceUrl)
                .body(BodyInserters.fromValue(o))
                .exchange();
          } else {
              return Mono.just(o);
          }
      })
      .onErrorResume(err -> {
          return webClient.method(HttpMethod.POST)
            .uri(inventoryServiceUrl)
            .body(BodyInserters.fromValue(order))
            .retrieve()
            .bodyToMono(Order.class)
            .map(o -> o.setOrderStatus(OrderStatus.FAILURE)
              .setResponseMessage(err.getMessage()));
      })
      .map(o -> {
          if (!OrderStatus.FAILURE.equals(o.getOrderStatus())) {
              return order.setShippingDate(o.getShippingDate())
                .setOrderStatus(OrderStatus.SUCCESS);
          } else {
              return order.setOrderStatus(OrderStatus.FAILURE)
                .setResponseMessage(o.getResponseMessage());
          }
      })
      .flatMap(orderRepository::save);
}

public Flux<Order> getOrders() {
    return orderRepository.findAll();
}

リアクティブAPIを使用したこの種類のオーケストレーションは、簡単な演習ではなく、多くの場合、エラーが発生しやすく、デバッグが困難です。 次のセクションで、これをどのように簡略化できるかを見ていきます。

5.4. フロントエンド

APIは発生したイベントをストリーミングできるので、フロントエンドでもそれを活用できるのは当然のことです。 幸い、Angularは EventSource サーバー送信イベントのインターフェースをサポートしています。

以前のすべての注文をイベントのストリームとしてプルして処理する方法を見てみましょう。

getOrderStream() {
    return Observable.create((observer) => {
        let eventSource = new EventSource('http://localhost:8080/api/orders')
        eventSource.onmessage = (event) => {
            let json = JSON.parse(event.data)
            this.orders.push(json)
            this._zone.run(() => {
                observer.next(this.orders)
            })
        }
        eventSource.onerror = (error) => {
            if(eventSource.readyState === 0) {
                eventSource.close()
                this._zone.run(() => {
                    observer.complete()
                })
            } else {
                this._zone.run(() => {
                    observer.error('EventSource error: ' + error)
                })
            }
        }
    })
}

6. メッセージ駆動型アーキテクチャ

ここで取り上げる最初の問題は、サービス間の通信に関連しています。 現在、これらの通信は同期的であり、いくつかの問題があります。 これらには、カスケード障害、複雑なオーケストレーション、分散トランザクションなどが含まれます。

この問題を解決する明白な方法は、これらの通信を非同期にすることです。 すべてのサービス間通信を容易にするためのメッセージブローカーは、私たちのためにトリックを行うことができます。 メッセージブローカーとしてKafkaを使用し、メッセージを生成および消費するために Spring forKafkaを使用します。

単一のトピックを使用して、サービスが反応するためのさまざまな注文ステータスの注文メッセージを生成および消費します。

各サービスをどのように変更する必要があるかを見てみましょう。

6.1. 在庫サービス

インベントリサービスのメッセージプロデューサーを定義することから始めましょう。

@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendMessage(Order order) {
    this.kafkaTemplate.send("orders", order);
}

次に、トピックに関するさまざまなメッセージに反応するために、在庫サービスのメッセージコンシューマーを定義する必要があります。

@KafkaListener(topics = "orders", groupId = "inventory")
public void consume(Order order) throws IOException {
    if (OrderStatus.RESERVE_INVENTORY.equals(order.getOrderStatus())) {
        productService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    } else if (OrderStatus.REVERT_INVENTORY.equals(order.getOrderStatus())) {
        productService.revertOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_SUCCESS));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.INVENTORY_REVERT_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

これは、冗長エンドポイントの一部をコントローラーから安全に削除できることも意味します。 これらの変更は、アプリケーションで非同期通信を実現するのに十分です。

6.2. 配送サービス

配送サービスの変更は、以前に在庫サービスで行った変更と比較的似ています。 メッセージプロデューサーは同じであり、メッセージコンシューマーは出荷ロジックに固有です。

@KafkaListener(topics = "orders", groupId = "shipping")
public void consume(Order order) throws IOException {
    if (OrderStatus.PREPARE_SHIPPING.equals(order.getOrderStatus())) {
        shippingService.handleOrder(order)
          .doOnSuccess(o -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_SUCCESS)
                .setShippingDate(o.getShippingDate()));
          })
          .doOnError(e -> {
              orderProducer.sendMessage(order.setOrderStatus(OrderStatus.SHIPPING_FAILURE)
                .setResponseMessage(e.getMessage()));
          }).subscribe();
    }
}

必要がなくなったので、コントローラー内のすべてのエンドポイントを安全にドロップできます。

6.3. 注文サービス

オーダーサービスの変更は、以前にすべてのオーケストレーションを行っていた場所であるため、もう少し複雑になります。

それにもかかわらず、メッセージプロデューサーは変更されないままであり、メッセージコンシューマーは注文サービス固有のロジックを引き受けます。

@KafkaListener(topics = "orders", groupId = "orders")
public void consume(Order order) throws IOException {
    if (OrderStatus.INITIATION_SUCCESS.equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.RESERVE_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("INVENTORY-SUCCESS".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.PREPARE_SHIPPING));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else if ("SHIPPING-FAILURE".equals(order.getOrderStatus())) {
        orderRepository.findById(order.getId())
          .map(o -> {
              orderProducer.sendMessage(o.setOrderStatus(OrderStatus.REVERT_INVENTORY));
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    } else {
        orderRepository.findById(order.getId())
          .map(o -> {
              return o.setOrderStatus(order.getOrderStatus())
                .setResponseMessage(order.getResponseMessage());
          })
          .flatMap(orderRepository::save)
          .subscribe();
    }
}

ここでのコンシューマーは、異なる注文ステータスの注文メッセージに反応しているだけです。 これが、異なるサービス間の振り付けを提供するものです。

最後に、この振り付けをサポートするために、注文サービスも変更する必要があります。

public Mono<Order> createOrder(Order order) {
    return Mono.just(order)
      .flatMap(orderRepository::save)
      .map(o -> {
          orderProducer.sendMessage(o.setOrderStatus(OrderStatus.INITIATION_SUCCESS));
          return o;
      })
      .onErrorResume(err -> {
          return Mono.just(order.setOrderStatus(OrderStatus.FAILURE)
            .setResponseMessage(err.getMessage()));
      })
      .flatMap(orderRepository::save);
}

これは、前のセクションでリアクティブエンドポイントを使用して作成する必要があったサービスよりもはるかに単純であることに注意してください。 非同期コレオグラフィーは、結果整合性と複雑なデバッグおよび監視を犠牲にしてもたらされますが、多くの場合、はるかに単純なコードになります。 ご想像のとおり、フロントエンドは注文の最終ステータスをすぐには取得しません。

7. コンテナオーケストレーションサービス

私たちが解決したいパズルの最後のピースは、展開に関連しています。

アプリケーションに必要なのは、十分な冗長性と、必要に応じて自動的にスケールアップまたはスケールダウンする傾向です。

Dockerを介してサービスのコンテナー化をすでに達成しており、DockerComposeを介してサービス間の依存関係を管理しています。 これらはそれ自体が素晴らしいツールですが、私たちが望むものを達成するのに役立ちません。

したがって、アプリケーションの冗長性とスケーラビリティを処理できるコンテナオーケストレーションサービスが必要です。 いくつかのオプションがありますが、人気のあるものの1つにKubernetesがあります。 Kubernetesは、コンテナ化されたワークロードの高度にスケーラブルなデプロイを実現するための、クラウドベンダーに依存しない方法を提供します。

Kubernetes は、Dockerなどのコンテナーをデプロイの最小単位であるポッドにラップします。 さらに、 Deployment を使用して、目的の状態を宣言的に記述することができます。

デプロイメントはReplicaSetsを作成します。これは、ポッドの起動を内部的に担当します。 任意の時点で実行する必要がある同一のポッドの最小数を説明できます。 これにより、冗長性が提供され、高可用性が実現します。

アプリケーションのKubernetesデプロイを定義する方法を見てみましょう。

apiVersion: apps/v1
kind: Deployment
metadata: 
  name: inventory-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: inventory-deployment
  template: 
    metadata: 
      labels: 
        name: inventory-deployment
    spec: 
      containers:
      - name: inventory
        image: inventory-service-async:latest
        ports: 
        - containerPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: shipping-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: shipping-deployment
  template: 
    metadata: 
      labels: 
        name: shipping-deployment
    spec: 
      containers:
      - name: shipping
        image: shipping-service-async:latest
        ports: 
        - containerPort: 8082
---
apiVersion: apps/v1
kind: Deployment
metadata: 
  name: order-deployment
spec: 
  replicas: 3
  selector:
    matchLabels:
      name: order-deployment
  template: 
    metadata: 
      labels: 
        name: order-deployment
    spec: 
      containers:
      - name: order
        image: order-service-async:latest
        ports: 
        - containerPort: 8080

ここでは、ポッドの3つの同一のレプリカをいつでも維持するためのデプロイメントを宣言しています。 これは冗長性を追加するための良い方法ですが、さまざまな負荷には不十分な場合があります。 Kubernetesは、水平ポッドオートスケーラーと呼ばれる別のリソースを提供します。これは、CPU使用率などの観測されたメトリックに基づいて、デプロイ内のポッドの数をスケーリングできます。

Kubernetesクラスタでホストされるアプリケーションのスケーラビリティの側面について説明したばかりであることに注意してください。 これは、基盤となるクラスター自体がスケーラブルであることを必ずしも意味するものではありません。 高可用性Kubernetesクラスターの作成は簡単な作業ではなく、このチュートリアルの範囲を超えています。

8. 結果として生じる反応システム

アーキテクチャにいくつかの改善を加えたので、おそらくこれをリアクティブシステムの定義と照らし合わせて評価するときが来ました。 チュートリアルの前半で説明したリアクティブシステムの4つの特性に対する評価を維持します。

  • Responsive :リアクティブプログラミングパラダイムの採用は、エンドツーエンドのノンブロッキング、したがって応答性の高いアプリケーションを実現するのに役立つはずです。
  • 復元力:必要な数のポッドのReplicaSetを使用したKubernetesのデプロイにより、ランダムな障害に対する復元力が提供されます
  • Elastic :Kubernetesクラスターとリソースは、予測できない負荷に直面しても弾力性があるために必要なサポートを提供する必要があります
  • メッセージ駆動型:すべてのサービス間通信をKafkaブローカーを介して非同期で処理することは、ここで役立つはずです。

これは非常に有望に見えますが、まだ終わっていません。 正直なところ、真にリアクティブなシステムの探求は、継続的な改善の実行である必要があります。 アプリケーションがごく一部である非常に複雑なインフラストラクチャで失敗する可能性のあるすべてを先取りすることはできません。

したがって、リアクティブシステムは、全体を構成するすべての部分に信頼性を要求します。 物理ネットワークからDNSなどのインフラストラクチャサービスに至るまで、最終目標の達成を支援するために、それらすべてが一致している必要があります。

多くの場合、これらすべての部品に必要な保証を管理および提供できない場合があります。 そして、これがマネージドクラウドインフラストラクチャが私たちの苦痛を軽減するのに役立つ場所です。 IaaS(Infeastrure-as-a-Service)、BaaS(Backend-as-a-Service)、PaaS(Platform-as-a-Service)などのサービスのホストから選択して、外部の関係者に責任を委任することができます。 これにより、可能な限りアプリケーションの責任を負うことになります。

9. 結論

このチュートリアルでは、リアクティブシステムの基本と、リアクティブプログラミングとの比較について説明しました。 複数のマイクロサービスを使用して単純なアプリケーションを作成し、リアクティブシステムで解決しようとしている問題を強調しました。

さらに、リアクティブプログラミング、メッセージベースのアーキテクチャ、およびアーキテクチャにコンテナオーケストレーションサービスを導入して、リアクティブシステムを実現しました。

最後に、結果として得られるアーキテクチャと、それがリアクティブシステムへの道のりであり続ける方法について説明しました。 このチュートリアルでは、リアクティブシステムの作成に役立つすべてのツール、フレームワーク、またはパターンを紹介しているわけではありませんが、その過程を紹介しています。

いつものように、この記事のソースコードはGitHubにあります。