1. 概要

この簡単な記事では、リアクティブなイベント駆動型アプリケーションの実際のシナリオを設定することにより、reactor-バスを紹介します。

注:reactor-busプロジェクトは、Reactor 3.xで削除されました:アーカイブされたreactor-busリポジトリ

2. プロジェクトリアクターの基本

2.1. なぜリアクトル?

最新のアプリケーションは、膨大な数の同時リクエストを処理し、大量のデータを処理する必要があります。 標準のブロッキングコードは、これらの要件を満たすのにもはや十分ではありません。

リアクティブデザインパターンは、単一または複数のサービスハンドラーからの大量の同時サービス要求を非同期処理するためのイベントベースのアーキテクチャアプローチです。

Project Reactorはこのパターンに基づいており、JVMでノンブロッキングのリアクティブアプリケーションを構築するという明確で野心的な目標を持っています。

2.2. シナリオ例

始める前に、リアクティブなアーキテクチャスタイルを活用することが理にかなっているいくつかの興味深いシナリオを次に示します。これは、それをどこに適用できるかを理解するためです。

  • アマゾンのような大規模なオンラインショッピングプラットフォームの通知サービス
  • 銀行セクター向けの巨大なトランザクション処理サービス
  • 株価が同時に変動する商社

3. Mavenの依存関係

pom.xmlに次の依存関係を追加して、プロジェクトReactorバスの使用を開始しましょう:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

MavenCentralreactor-busの最新バージョンを確認できます。

4. デモアプリケーションの構築

reactorベースのアプローチの利点をよりよく理解するために、実際の例を見てみましょう。

オンラインショッピングプラットフォームのユーザーに通知を送信するためのシンプルなアプリケーションを作成します。 たとえば、ユーザーが新しい注文を行うと、アプリはメールまたはSMSで注文確認を送信します。

一般的な同期実装は、当然、電子メールまたはSMSサービスのスループットによって制限されます。 したがって、休日などのトラフィックの急増は一般的に問題があります。

リアクティブアプローチを使用すると、システムをより柔軟に設計し、ゲートウェイサーバーなどの外部システムで発生する可能性のある障害やタイムアウトにうまく適応できるようになります。

アプリケーションを見てみましょう–より伝統的な側面から始めて、より反応性の高い構造に移ります。

4.1. シンプルなPOJO

まず、通知データを表すPOJOクラスを作成しましょう。

public class NotificationData {
	
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. サービスレイヤー

次に、単純なサービスレイヤーを定義しましょう。

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

そして、実行時間の長い操作をシミュレートする実装:

@Service
public class NotificationServiceimpl implements NotificationService {
	
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
		
      Thread.sleep(5000);
		
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

SMSまたは電子メールゲートウェイを介してメッセージを送信する実際のシナリオを説明するために、意図的に5秒の遅延を導入していることに注意してください。 beginNotification との方法 Thread.sleep(5000)。

したがって、スレッドがサービスにヒットすると、5秒間ブロックされます。

4.3. 消費者

ここで、アプリケーションのより反応的な側面に飛び込んで、コンシューマーを実装しましょう。次に、reactorイベントバスにマップします。

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
	
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }	
    }
}

 

ご覧のとおり、作成したコンシューマーは消費者インターフェース。 メインロジックはacceptメソッドにあります。

これは、一般的なSpringリスナーの実装で満たすことができる同様のアプローチです。

4.4. コントローラー

最後に、イベントを使用できるようになったので、イベントも生成しましょう。

単純なコントローラーでこれを行います。

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

これは非常に自明です。ここでは、EventBusを介してイベントを発行しています。

たとえば、クライアントがパラメータ値が10のURLにヒットした場合、10個のイベントがイベントバスを介して送信されます。

4.5. Java構成

それでは、すべてをまとめて、単純なSpring Bootアプリケーションを作成しましょう。

まず、EventBusおよびEnvironmentBeanを構成する必要があります。

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

この例では、環境で使用可能なデフォルトのスレッドプールを使用してEventBusをインスタンス化しています

または、カスタマイズされたディスパッチャーインスタンスを使用することもできます。

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

これで、メインアプリケーションコードを作成する準備が整いました。

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

run メソッドでは、通知が特定のセレクターと一致したときにトリガーされるnotificationConsumerを登録しています。

$属性の静的インポートを使用してSelectorオブジェクトを作成していることに注目してください。

5. アプリケーションをテストする

次に、NotificationApplicationの動作を確認するためのテストを作成しましょう。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

ご覧のとおり、リクエストが実行されるとすぐに、10個のタスクすべてがブロッキングを作成せずに即座に送信されます。 送信されると、通知イベントは並行して処理されます。

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

このシナリオでは、これらのイベントを特定の順序で処理する必要がないことを覚えておくことが重要です。

6. 結論

このクイックチュートリアルでは、単純なイベント駆動型アプリケーションを作成しました。 また、より反応的で非ブロッキングのコードを書き始める方法も見てきました。

ただし、このシナリオは、対象の表面を引っかいただけであり、リアクティブパラダイムの実験を開始するための適切な基盤を表しています。

いつものように、ソースコードはGitHubから入手できます。