春にサーバーから送信されたイベント
1. 概要
このチュートリアルでは、Springを使用してServer-Sent-EventsベースのAPIを実装する方法を説明します。
簡単に言うと、Server-Sent-Events(略してSSE)は、Webアプリケーションが単方向のイベントストリームを処理し、サーバーがデータを送信するたびに更新を受信できるようにするHTTP標準です。
Spring 4.2バージョンはすでにサポートしていますが、Spring 5以降、より慣用的で便利な処理方法が利用できるようになりました。
2. Spring5Webfluxを使用したSSE
これを実現するために、 Reactorライブラリによって提供されるFluxクラス、またはイベントメタデータを制御できるServerSentEventエンティティなどの実装を利用できます。
2.1. Fluxを使用してイベントをストリーミングする
Flux は、イベントのストリームのリアクティブな表現であり、指定された要求または応答メディアタイプに基づいて異なる方法で処理されます。
SSEストリーミングエンドポイントを作成するには、 W3C仕様に従い、そのMIMEタイプを text /event-streamとして指定する必要があります。
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
interval メソッドは、long値を段階的に出力するFluxを作成します。 次に、これらの値を目的の出力にマップします。
アプリケーションを起動し、エンドポイントを参照して試してみましょう。
サーバーによって秒単位でプッシュされるイベントにブラウザーがどのように反応するかを確認します。 フラックスとリアクターコアの詳細については、この投稿をご覧ください。
2.2. ServerSentEvent要素を利用する
次に、出力StringをServerSentSeventオブジェクトにラップし、これを行うことの利点を調べます。
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
理解できるように、ServerSentEventエンティティを使用することにはいくつかの利点があります。
- 実際のシナリオで必要となるイベントメタデータを処理できます
- 「text/event-stream」メディアタイプ宣言は無視できます
この場合、 id 、イベント名、そして最も重要なことに、イベントの実際のデータを指定しました。
また、 comments 属性と、 try 値を追加することもできます。これは、イベントを送信しようとするときに使用される再接続時間を指定します。
2.3. WebClientを使用したサーバー送信イベントの消費
次に、WebClientを使用してイベントストリームを消費しましょう。
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
subscribe メソッドを使用すると、イベントを正常に受信したとき、エラーが発生したとき、およびストリーミングが完了したときにどのように進むかを示すことができます。
この例では、 retrieve メソッドを使用しました。これは、応答本文を取得するための単純で簡単な方法です。
onStatus ステートメントを追加するシナリオを処理しない限り、4xxまたは5xx応答を受信すると、このメソッドは自動的にWebClientResponseExceptionをスローします。
一方、 exchange メソッドも使用できます。このメソッドは、 ClientResponse へのアクセスを提供し、失敗した応答でエラー信号を出しません。
イベントメタデータが必要ない場合は、ServerSentEventラッパーをバイパスできることを考慮に入れる必要があります。
3. SpringMVCでのSSEストリーミング
すでに述べたように、SSE仕様は、SseEmitterクラスが導入されたSpring4.2以降でサポートされていました。
簡単に言うと、 ExecutorService を定義します。これは、 SseEmitter がデータをプッシュする作業を行い、エミッタインスタンスを返し、接続を次のように開いたままにするスレッドです。
@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();
ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
sseMvcExecutor.execute(() -> {
try {
for (int i = 0; true; i++) {
SseEventBuilder event = SseEmitter.event()
.data("SSE MVC - " + LocalTime.now().toString())
.id(String.valueOf(i))
.name("sse event - mvc");
emitter.send(event);
Thread.sleep(1000);
}
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
ユースケースのシナリオに適したExecutorServiceを常に選択してください。
この興味深いチュートリアルを読むことで、Spring MVCのSSEについて詳しく学び、他の例を見ることができます。
4. サーバー送信イベントについて
SSEエンドポイントを実装する方法がわかったので、いくつかの基本的な概念を理解して、もう少し深く掘り下げてみましょう。
SSEは、イベントをいつでも一方向にストリーミングできるようにするために、ほとんどのブラウザーで採用されている仕様です。
「イベント」は、仕様で定義された形式に従うUTF-8でエンコードされたテキストデータのストリームです。
この形式は、改行で区切られた一連のキー値要素(id、retry、data、およびevent、名前を示します)で構成されます。
コメントもサポートされています。
この仕様は、データペイロード形式を制限するものではありません。 単純なStringまたはより複雑なJSONまたはXML構造を使用できます。
最後に考慮しなければならない点は、SSEストリーミングとWebSocketsの使用の違いです。
WebSockets はサーバーとクライアント間の全二重(双方向)通信を提供しますが、SSEは単方向通信を使用します。
また、 WebSockets はHTTPプロトコルではなく、SSEとは異なり、エラー処理標準を提供していません。
5. 結論
要約すると、この記事では、SSEストリーミングの主要な概念を学びました。これは、間違いなく、次世代システムを作成するための優れたリソースです。
私たちは今、このプロトコルを使用するときに内部で何が起こっているのかを理解するための優れた立場にあります。
さらに、Githubリポジトリにあるいくつかの簡単な例で理論を補完しました。