1. 概要

Maps は、当然、最も広く使用されているJavaコレクションのスタイルの1つです。

そして重要なことに、 HashMap はスレッドセーフな実装ではありませんが、Hashtableは操作を同期することでスレッドセーフを提供します。

Hashtable はスレッドセーフですが、あまり効率的ではありません。 別の完全に同期されたMap、 Collections.synchronizedMap、も大きな効率を示しません。 高い同時実行性の下で高いスループットでスレッドセーフが必要な場合、これらの実装は適切ではありません。

この問題を解決するために、JavaコレクションフレームワークはJava1.5にConcurrentMapを導入しました。

以下の説明は、 Java1.8に基づいています。

2. ConcurrentMap

ConcurrentMap は、Mapインターフェースの拡張です。 これは、スループットとスレッドセーフの調整の問題を解決するための構造とガイダンスを提供することを目的としています。

ConcurrentMap は、いくつかのインターフェイスのデフォルトメソッドをオーバーライドすることにより、スレッドセーフでメモリ整合性のあるアトミック操作を提供するための有効な実装のガイドラインを提供します。

いくつかのデフォルトの実装がオーバーライドされ、nullキー/値のサポートが無効になります。

  • getOrDefault
  • forEach
  • replaceAll
  • ComputeIfAbsent
  • ComputeIfPresent
  • 計算する
  • マージ

次のAPIも、デフォルトのインターフェース実装なしで、アトミック性をサポートするためにオーバーライドされます。

  • putIfAbsent
  • 削除する
  • replace(key、oldValue、newValue)
  • replace(key、value)

残りのアクションは、基本的にMapと一貫して直接継承されます。

3. ConcurrentHashMap

ConcurrentHashMap は、すぐに使用できるConcurrentMap実装です。

パフォーマンスを向上させるために、内部ではテーブルバケット( Java 8 より前のテーブルセグメントとして使用されていました)としてノードの配列で構成され、主に更新中にCAS操作を使用します。

テーブルバケットは、最初の挿入時に遅延初期化されます。 各バケットは、バケットの最初のノードをロックすることで個別にロックできます。 読み取り操作はブロックされず、更新の競合は最小限に抑えられます。

必要なセグメントの数は、テーブルにアクセスするスレッドの数に比例するため、セグメントごとに進行中の更新は、ほとんどの場合1つ以下になります。

Java 8 より前は、必要な「セグメント」の数はテーブルにアクセスするスレッドの数に比例していたため、セグメントごとに進行中の更新はほとんどの場合1つ以下でした。

そのため、コンストラクターは HashMap と比較して、使用する推定スレッド数を制御するための追加のconcurrencyLevel引数を提供します。

public ConcurrentHashMap(
public ConcurrentHashMap(
 int initialCapacity, float loadFactor, int concurrencyLevel)

他の2つの引数:initialCapacityloadFactorは、HashMapとまったく同じで機能しました。

ただし、Java 8以降、コンストラクターは下位互換性のためにのみ存在します。パラメーターは、マップの初期サイズにのみ影響を与えることができます

3.1. スレッドセーフ

ConcurrentMap は、マルチスレッド環境でのキー/値操作のメモリの一貫性を保証します。

別のスレッドでそのオブジェクトにアクセスまたは削除した後のキーまたは値happen-beforeアクションとしてオブジェクトをConcurrentMapに配置する前のスレッドでのアクション。

確認するために、メモリに一貫性のないケースを見てみましょう。

@Test
public void givenHashMap_whenSumParallel_thenError() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    List<Integer> sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map, 
  int executionTimes) throws InterruptedException {
    List<Integer> sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService = 
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test", 
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

並列のmap.computeIfPresentアクションごとに、 HashMap は現在の整数値の一貫したビューを提供せず、一貫性のない望ましくない結果につながります。

ConcurrentHashMap に関しては、一貫性のある正しい結果を得ることができます。

@Test
public void givenConcurrentMap_whenSumParallel_thenCorrect() 
  throws Exception {
    Map<String, Integer> map = new ConcurrentHashMap<>();
    List<Integer> sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();
    
    assertEquals(0, wrongResultCount);
}

3.2. Nullキー/値

ConcurrentMapによって提供されるほとんどのAPIは、nullキーまたは値を許可しません。次に例を示します。

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutWithNullKey_thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap_whenPutNullValue_thenThrowsNPE() {
    concurrentMap.put("test", null);
}

ただし、計算*およびマージアクションの場合、計算値はnullになる可能性があります。これは、キーと値のマッピングが存在する場合は削除されるか、以前に存在しない場合は存在しないままであることを示します

@Test
public void givenKeyPresent_whenComputeRemappingNull_thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}

3.3. ストリームサポート

Java 8 は、ConcurrentHashMapでもStreamサポートを提供します。

ほとんどのストリームメソッドとは異なり、バルク(シーケンシャルおよびパラレル)操作により、安全に同時変更が可能になります。 ConcurrentModificationException はスローされません。これは、そのイテレーターにも適用されます。 ストリームに関連して、いくつかの forEach * search 、および reduce * メソッドも追加され、より豊富なトラバーサルおよびmap-reduce操作をサポートします。

3.4. パフォーマンス

内部的には、ConcurrentHashMapはHashMap にいくぶん似ていますが、データアクセスと更新はハッシュテーブルに基づいています(ただし、より複雑です)。

そしてもちろん、 ConcurrentHashMap は、データの取得と更新のほとんどの同時発生の場合に、はるかに優れたパフォーマンスをもたらすはずです。

getおよびputのパフォーマンスの簡単なマイクロベンチマークを作成し、それをHashtableおよびCollections.synchronizedMapと比較して、両方の操作を実行してみましょう。 4スレッドで500,000回。

@Test
public void givenMaps_whenGetPut500KTimes_thenConcurrentMapFaster() 
  throws Exception {
    Map<String, Object> hashtable = new Hashtable<>();
    Map<String, Object> synchronizedHashMap = 
      Collections.synchronizedMap(new HashMap<>());
    Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime = 
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime = 
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map) 
  throws InterruptedException {
    ExecutorService executorService = 
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500_000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime) / 500_000;
}

マイクロベンチマークは単一のシナリオのみを対象としており、実際のパフォーマンスを常に適切に反映しているとは限らないことに注意してください。

そうは言っても、平均的な開発システムを備えたOS Xシステムでは、100回の連続実行(ナノ秒単位)の平均サンプル結果が表示されます。

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

複数のスレッドが共通のMapにアクセスすることが予想されるマルチスレッド環境では、ConcurrentHashMapが明らかに望ましいです。

ただし、 Map にアクセスできるのが単一のスレッドのみである場合は、 HashMap の方が、そのシンプルさと確かなパフォーマンスのために適しています。

3.5. 落とし穴

通常、取得操作は ConcurrentHashMap でブロックされず、更新操作と重複する可能性があります。 したがって、パフォーマンスを向上させるために、公式Javadoc に記載されているように、最近完了した更新操作の結果のみが反映されます。

覚えておくべき他のいくつかの事実があります:

  • size isEmpty containsValue などの集計ステータスメソッドの結果は、通常、マップが他のスレッドで同時に更新されていない場合にのみ役立ちます。
@Test
public void givenConcurrentMap_whenUpdatingAndGetSize_thenError() 
  throws InterruptedException {
    Runnable collectMapSizes = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            mapSizes.add(concurrentMap.size());
        }
    };
    Runnable updateMapData = () -> {
        for (int i = 0; i < MAX_SIZE; i++) {
            concurrentMap.put(String.valueOf(i), i);
        }
    };
    executorService.execute(updateMapData);
    executorService.execute(collectMapSizes);
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    assertNotEquals(MAX_SIZE, mapSizes.get(MAX_SIZE - 1).intValue());
    assertEquals(MAX_SIZE, concurrentMap.size());
}

同時更新が厳密に管理されている場合でも、集約ステータスは信頼できます。

これらの集計ステータスメソッドはリアルタイムの精度を保証するものではありませんが、監視または推定の目的には十分な場合があります

ConcurrentHashMapsize()の使用は、 mappingCount()に置き換える必要があることに注意してください。後者のメソッドは、longカウントを返します。深くはありますが、それらは同じ推定に基づいています。

  • hashCodeの問題:まったく同じ hashCode()で多くのキーを使用することは、ハッシュテーブルのパフォーマンスを低下させる確実な方法であることに注意してください。

キーがComparableの場合の影響を改善するために、 ConcurrentHashMap は、キー間の比較順序を使用して、関係を断ち切ることができます。 それでも、同じ hashCode()をできるだけ使用しないようにする必要があります。

  • イテレータは、高速失敗トラバーサルではなく弱い一貫性を提供するため、単一スレッドでのみ使用するように設計されており、ConcurrentModificationException。をスローすることはありません。
  • デフォルトの初期テーブル容量は16であり、指定された同時実行レベルによって調整されます。
public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel) {
 
    //...
    if (initialCapacity < concurrencyLevel) {
        initialCapacity = concurrencyLevel;
    }
    //...
}
  • 再マッピング機能に関する注意:提供されているcomputeおよびmerge* メソッドを使用して再マッピング操作を実行できますが、予期しないことを避けるために、それらを高速、短く、シンプルに保ち、現在のマッピングに焦点を当てる必要があります。ブロッキング。
  • ConcurrentHashMap のキーは並べ替えられた順序ではないため、順序付けが必要な場合は、ConcurrentSkipListMapが適切な選択です。

4. ConcurrentNavigableMap

キーの順序付けが必要な場合は、TreeMapの同時バージョンであるConcurrentSkipListMapを使用できます。

ConcurrentMap の補足として、 ConcurrentNavigableMap は、キーの全体的な順序(デフォルトでは昇順)をサポートし、同時にナビゲートできます。 マップのビューを返すメソッドは、同時実行性の互換性のためにオーバーライドされます。

  • サブマップ
  • ヘッドマップ
  • tailMap
  • サブマップ
  • ヘッドマップ
  • tailMap
  • 下降マップ

keySet()ビューのイテレーターとスプリッターはweak-memory-consistencyで拡張されています。

  • navigableKeySet
  • keySet
  • 下降キーセット

5. ConcurrentSkipListMap

以前、NavigableMapインターフェースとその実装TreeMapについて説明しました。 ConcurrentSkipListMap は、TreeMapのスケーラブルな同時バージョンであることがわかります。

実際には、Javaには赤黒木を同時に実装することはできません。 SkipLists の同時バリアントは、 ConcurrentSkipListMap に実装され、 containsKey get 、の予想平均log(n)時間コストを提供します。 putおよびremove操作とそのバリアント。

TreeMap の機能に加えて、キーの挿入、削除、更新、およびアクセス操作がスレッドセーフで保証されています。 同時にナビゲートする場合のTreeMapとの比較は次のとおりです。

@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
 
    assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() 
  throws InterruptedException {
    NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
 
    assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(
  NavigableMap<Integer, Integer> navigableMap, 
  int elementCount, 
  int concurrencyLevel) throws InterruptedException {
 
    for (int i = 0; i < elementCount * concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }
    
    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

舞台裏でのパフォーマンスの懸念の完全な説明は、この記事の範囲を超えています。 詳細は、src.zipファイルのjava/util/concurrentの下にあるConcurrentSkipListMapのJavadocにあります。

6. 結論

この記事では、主にConcurrentMapインターフェイスとConcurrentHashMapの機能を紹介し、ConcurrentNavigableMapでキーの順序付けが必要であることを説明しました。

この記事で使用されているすべての例の完全なソースコードは、GitHubプロジェクトにあります。