1概要

この簡単な記事では、

java.util.concurrent

のhttps://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html[

ConcurrentSkipListMap

]クラスを見ていきます。パッケージ。

この構造により、ロックフリーな方法でスレッドセーフなロジックを作成できます。

他のスレッドがまだマップにデータを挿入している間に、不変のデータのスナップショットを作成したい場合は、問題に最適です。

私たちは

その構造を使って

イベントの流れをソートし、最後の60秒間に到着したイベントのスナップショットを取得する** という問題を解決します。


2ストリームソートロジック

複数のスレッドから継続的に発生する一連のイベントがあるとしましょう。最後の60秒からのイベント、および60秒より古いイベントも取得できる必要があります。

まず、イベントデータの構造を定義しましょう。

public class Event {
    private ZonedDateTime eventTime;
    private String content;

   //standard constructors/getters
}


eventTime

フィールドを使用してイベントをソートし続けたいです。

ConcurrentSkipListMapを使用してこれを実現するには、

そのインスタンスを作成しながら

Comparator

をそのコンストラクタに渡す必要があります。

ConcurrentSkipListMap<ZonedDateTime, String> events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

到着したすべてのイベントをタイムスタンプを使用して比較します。

comparingLong()

メソッドを使用し、

ZonedDateTimeから

long__タイムスタンプを取得できるextract関数を渡しています。

イベントが到着したら、

put()

メソッドを使用してそれらをマップに追加するだけです。このメソッドは明示的な同期を必要としません。

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}


ConcurrentSkipListMap

は、コンストラクターで渡された

Comparator

を使用して、その下のイベントのソートを処理します。


ConcurrentSkipListMap

の最も注目すべき長所は、ロックフリーな方法でそのデータの不変スナップショットを作成できるメソッドです。過去1分以内に到着したすべてのイベントを取得するには、

tailMap()

メソッドを使用して、要素を取得する時刻を渡します。

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

過去1分間のすべてのイベントが返されます。これは不変のスナップショットになります。最も重要なことは、他の書き込みスレッドが明示的なロックを行う必要なく

ConcurrentSkipListMap

に新しいイベントを追加できることです。


headMap()

メソッドを使用することで、今からその1分後に到着したすべてのイベントを取得できます。

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

これにより、1分以上経過したすべてのイベントの不変のスナップショットが返されます。上記のメソッドはすべて

EventWindowSort

クラスに属しています。これは次のセクションで使用します。


3ソートストリームロジックのテスト


ConcurrentSkipListMapを使用してソートロジックを実装したら、

それぞれ100個のイベントを送信する2つのライタースレッドを作成して** テストすることができます。

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

各スレッドは

acceptEvent()

メソッドを呼び出して、今から

eventTime

を持つイベントを「今マイナス100秒」まで送信します。

それまでの間、1分以内のイベントのスナップショットを返す

getEventsFromLastMinute()

メソッドを呼び出すことができます。

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
  = eventWindowSort.getEventsFromLastMinute();


eventsFromLastMinute

内のイベント数は、プロデューサースレッドが__EventWindowSortにイベントを送信する速度に応じて、テストの実行ごとに異なります。返されたスナップショット内に古いイベントが1つもないことをアサートできます1分以上

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();

assertEquals(eventsOlderThanOneMinute, 0);

スナップショットには、1分以内に発生するイベントが0個以上あることがわかります。

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();

assertTrue(eventYoungerThanOneMinute > 0);


getEventsFromLastMinute()

では、その下にある

tailMap()

を使用しています。


ConcurrentSkipListMapの

headMap()

メソッドを使用している

getEventsOlderThatOneMinute()__をテストしましょう。

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
  = eventWindowSort.getEventsOlderThatOneMinute();

今回は、1分以上経過したイベントのスナップショットを取得します。このようなイベントは0個以上あると主張できます。

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();

assertTrue(eventsOlderThanOneMinute > 0);

そして次に、最後の1分以内にある単一のイベントはありません。

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();

assertEquals(eventYoungerThanOneMinute, 0);

注意すべき最も重要なことは、

他のスレッドが

ConcurrentSkipListMap.

に新しい値を追加している間に

データのスナップショットを取得できることです。


4結論

このクイックチュートリアルでは、

ConcurrentSkipListMap

の基本と、いくつかの実用的な例を見てみました


_.

_


ConcurrentSkipListMap

の高性能を利用して、同時に複数のスレッドがマップを更新している場合でもデータの不変のスナップショットを提供できるノンブロッキングアルゴリズムを実装しました。

これらすべての例とコードスニペットの実装はhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。これはMavenプロジェクトなので、そのままインポートして実行するのは簡単なはずです。