JavaでのExchangerの概要
1. 概要
このチュートリアルでは、調査します java .util.concurrent.Exchanger
2. Exchangerの概要
exchange が呼び出されると、ペアの他のスレッドもそれを呼び出すのを待ちます。 この時点で、2番目のスレッドは、最初のスレッドがそのオブジェクトで待機していることを検出します。 スレッドは、保持しているオブジェクトを交換し、交換のシグナルを送信します。これで、スレッドは戻ることができます。
Exchangerを使用した2つのスレッド間のメッセージ交換を理解するための例を見てみましょう。
@Test
public void givenThreads_whenMessageExchanged_thenCorrect() {
Exchanger<String> exchanger = new Exchanger<>();
Runnable taskA = () -> {
try {
String message = exchanger.exchange("from A");
assertEquals("from B", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
Runnable taskB = () -> {
try {
String message = exchanger.exchange("from B");
assertEquals("from A", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture.allOf(
runAsync(taskA), runAsync(taskB)).join();
}
ここでは、共通の交換器を使用して、2つのスレッドが相互にメッセージを交換しています。 メインスレッドのオブジェクトを新しいスレッドと交換する例を見てみましょう。
@Test
public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
Runnable runner = () -> {
try {
String message = exchanger.exchange("from runner");
assertEquals("to runner", message);
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
throw new RuntimeException(e);
}
};
CompletableFuture<Void> result
= CompletableFuture.runAsync(runner);
String msg = exchanger.exchange("to runner");
assertEquals("from runner", msg);
result.join();
}
最初にrunnerスレッドを開始し、後でメインスレッドで exchange()を呼び出す必要があることに注意してください。
また、2番目のスレッドが交換時点に到達しない場合、最初のスレッドの呼び出しがタイムアウトする可能性があることに注意してください。 最初のスレッドが待機する時間は、オーバーロードされた exchange(T t、長いタイムアウト、TimeUnit timeUnit)を使用して制御できます。
3. GCデータ交換なし
Exchanger を使用して、あるスレッドから別のスレッドにデータを渡すパイプラインのようなパターンを作成できます。 このセクションでは、スレッドの単純なスタックを作成して、パイプラインとして相互にデータを継続的に渡します。
@Test
public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException {
Exchanger<Queue<String>> readerExchanger = new Exchanger<>();
Exchanger<Queue<String>> writerExchanger = new Exchanger<>();
Runnable reader = () -> {
Queue<String> readerBuffer = new ConcurrentLinkedQueue<>();
while (true) {
readerBuffer.add(UUID.randomUUID().toString());
if (readerBuffer.size() >= BUFFER_SIZE) {
readerBuffer = readerExchanger.exchange(readerBuffer);
}
}
};
Runnable processor = () -> {
Queue<String> processorBuffer = new ConcurrentLinkedQueue<>();
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
processorBuffer = readerExchanger.exchange(processorBuffer);
while (true) {
writerBuffer.add(processorBuffer.poll());
if (processorBuffer.isEmpty()) {
processorBuffer = readerExchanger.exchange(processorBuffer);
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
Runnable writer = () -> {
Queue<String> writerBuffer = new ConcurrentLinkedQueue<>();
writerBuffer = writerExchanger.exchange(writerBuffer);
while (true) {
System.out.println(writerBuffer.poll());
if (writerBuffer.isEmpty()) {
writerBuffer = writerExchanger.exchange(writerBuffer);
}
}
};
CompletableFuture.allOf(
runAsync(reader),
runAsync(processor),
runAsync(writer)).join();
}
ここでは、リーダー、プロセッサ、およびライターの3つのスレッドがあります。 一緒に、それらはそれらの間でデータを交換する単一のパイプラインとして機能します。
readerExchangerはreaderとprocessorスレッド間で共有され、writerExchangerはprocessor間で共有されますおよびwriterスレッド。
ここでの例はデモンストレーションのみであることに注意してください。 while(true)で無限ループを作成するときは注意が必要です。 また、コードを読みやすくするために、いくつかの例外処理を省略しました。
バッファを再利用しながらデータを交換するこのパターンにより、ガベージコレクションを減らすことができます。交換メソッドは同じキューインスタンスを返すため、これらのオブジェクトのGCはありません。 他のブロッキングキューとは異なり、エクスチェンジャーはデータを保持および共有するためのノードまたはオブジェクトを作成しません。
このようなパイプラインの作成はDisrupterパターンに似ていますが、重要な違いがあります。Disrupterパターンは複数のプロデューサーとコンシューマーをサポートしますが、エクスチェンジャーはコンシューマーとプロデューサーのペア間で使用できます。
4. 結論
だから、私たちは何を学びました交換器
いつものように、コードはGitHubでから入手できます。