デフォルトでは、RxJSの監視可能なストリームは、ストリーム内の値のキャッシュをバッファリングまたは保持しません。 ただし、バッチで処理することが有益な場合は、値の一部をバッファリングすると便利な場合があります。 RxJSには、これを簡単にするための5つの演算子が用意されています。 簡単な例を使用して、バッファリング演算子を調べてみましょう。

バッファリングされた値は、スタンドアロン値ではなく、値の配列として出力されることに注意してください。

この例では、ソースの監視対象としてボタンクリックを使用し、クリックを乱数にマップします。

バッファ

buffer 演算子は、引数として別のobservableを取り、提供されたobservableが出力するまで、元のobservableから出力された値をバッファリングします。 その後、バッファはリセットされ、提供されたオブザーバブルがもう一度放出されるまでバッファリングを再開します。

次の例では、ボタンのクリックからマップされた値は、 release $ observableが放出されるまで( release ボタンがクリックされたとき)バッファリングされます。

const btn = document.querySelector('.click-me');
const releaseBtn = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBtn, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .buffer(release$)
  .subscribe(random => console.log(random));

bufferCount

bufferCount を使用すると、値が出力される前にバッファーに保持する値をいくつか指定できます。

ここでは、4つのマップされた値(4つのボタンクリック)が出力されてコンソールに記録されるまでバッファリングされます。

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferCount(4)
  .subscribe(random => console.log(random));

これにより、4回クリックするたびに、[86, 93, 57, 64]のような配列がコンソールに記録されます。

bufferTime

bufferTime は、の値をバッファリングするのにミリ秒かかります。 時間が経過すると、バッファリングされた値が出力され、バッファが再開されます。

ここで、クリックイベントからマップされた値は、1秒間バッファリングされてから、出力されます。

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferTime(1000)
  .take(15)
  .subscribe(random => console.log(random));

ユーザーが1秒間のバッファリング期間の1つで3回クリックすると、[44, 71, 90]のような配列がコンソールに記録されます。 これは永遠に続く可能性があるため、ここでは take 演算子を使用して、15個の配列値が発行された後にストリームを閉じています。

bufferWhen

bufferWhenbufferに似ていますが、オブザーバブルを取得する代わりに、クロージングセレクターと呼ばれるセレクター関数を使用します。 ここでは、 buffer と同じ例を使用しますが、 release$を返す関数を使用していることを少し区別しています。

const btn = document.querySelector('.click-me');
const releaseBuffer = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBuffer, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferWhen(() => release$)
  .subscribe(random => console.log(random));

bufferToggle

bufferTogglebufferに似ていますが、代わりに2つの引数を取ります。バッファーを開始するためのobservableと、バッファーを停止して値を出力するための終了セレクター関数です。

start $ observableが発行されるまで、ボタンのクリックは無視されます。 次に、 stop$オブザーバブルが発行されるまで値がバッファリングされます。 start $ は、新しいバッファを開始するためにもう一度発行する必要があります。

const btn = document.querySelector('.click-me');

const startBtn = document.querySelector('.start');
const stopBtn = document.querySelector('.stop');

const start$ = Rx.Observable
  .fromEvent(startBtn, 'click')
  .do(_ => console.log('Start buffering!'));

const stop$ = Rx.Observable
  .fromEvent(stopBtn, 'click')
  .do(_ => console.log('Stop buffering!'));

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferToggle(start$, () => stop$)
  .subscribe(random => console.log(random));

ここでは、 start$およびstop$オブザーバブルでdo演算子を使用して、メッセージが送信されたときにコンソールにログを記録しています。