1概要


Maps

は当然、最も広く使われているスタイルの1つです。

そして重要なことに、


HashMap


はスレッドセーフな実装ではありませんが、

Hashtable

は操作を同期することによってスレッドセーフを提供します。


Hashtable

はスレッドセーフですが、あまり効率的ではありません。

もう1つの完全に同期された

Map、


Collections.synchronizedMap、

も、それほど効率がよくありません。高い同時実行性の下で高スループットのスレッドセーフを望むのであれば、これらの実装は不可能です。

この問題を解決するために、

Java Collections Framework

** は

Java 1.5



ConcurrentMap

を導入しました。

以下の説明は

Java 1.8

に基づいています。


2

ConcurrentMap



ConcurrentMap

は、

Map

インターフェースの拡張です。スループットとスレッドセーフを両立させるという問題を解決するための構造と手引きを提供することを目的としています。


ConcurrentMap

は、いくつかのインタフェースのデフォルトメソッドをオーバーライドすることによって、スレッドセーフでメモリと一貫性のあるアトミック操作を提供するための有効な実装のガイドラインを示します。


null

キー/値のサポートを無効にして、いくつかのデフォルト実装がオーバーライドされます。


  • getOrDefault


  • forEach


  • replaceAll


  • computeIfAbsent


  • computeIfPresent


  • コンピューター


  • マージ

以下の

APIs

も、デフォルトのインターフェース実装なしで、原子性をサポートするためにオーバーライドされます。


  • putIfAbsent


  • 削除


  • replace(key、oldValue、newValue)


  • replace(キー、値)

残りのアクションは

Map

と基本的に一致して直接継承されます。


3

ConcurrentHashMap



ConcurrentHashMap

は、すぐに使用可能な

ConcurrentMap

の実装です。

パフォーマンスを向上させるために、内部ではテーブルバケットとしてのノードの配列(

Java 8

より前のテーブルセグメント)で構成され、主にhttps://en.wikipedia.org/wiki/Compare-and-swap[を使用しています。更新中の[CAS]操作。

テーブルバケットは最初の挿入時に遅延初期化されます。各バケットは、バケット内の最初のノードをロックすることで個別にロックできます。読み取り操作はブロックされず、更新の競合は最小限に抑えられます。

必要なセグメント数は、テーブルにアクセスしているスレッドの数に比例します。そのため、セグメントごとに進行中の更新は、ほとんどの場合1つの時間になります。


  • Java 8

    より前は、セグメントごとに進行中の更新が最長1時間になるように、必要な「セグメント」の数はテーブルにアクセスするスレッドの数と相対的でした。

そのため、コンストラクタは

HashMap

と比較して、使用する推定スレッド数を制御するための追加の

concurrencyLevel

引数を提供します。

public ConcurrentHashMap(

public ConcurrentHashMap(
 int initialCapacity, float loadFactor, int concurrencyLevel)

他の2つの引数

initialCapacity



loadFactor

は、まったく同じリンクで動作しました:/java-hashmap[as

HashMap

]。

  • しかし、

    Java 8

    以降、コンストラクタは下位互換性のためにのみ存在します。パラメータはマップの初期サイズにのみ影響を与えます** 。


3.1. スレッドセーフ


ConcurrentMap

は、マルチスレッド環境におけるキー/値操作でのメモリの一貫性を保証します。

オブジェクトをキーまたは値として

ConcurrentMap

に配置する前のスレッド内のアクション別のスレッドでそのオブジェクトをアクセスまたは削除した後の

happen-before

アクション。

確認するために、メモリの矛盾するケースを見てみましょう。

@Test
public void givenHashMap__whenSumParallel__thenError() throws Exception {
    Map<String, Integer> map = new HashMap<>();
    List<Integer> sumList = parallelSum100(map, 100);

    assertNotEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();

    assertTrue(wrongResultCount > 0);
}

private List<Integer> parallelSum100(Map<String, Integer> map,
  int executionTimes) throws InterruptedException {
    List<Integer> sumList = new ArrayList<>(1000);
    for (int i = 0; i < executionTimes; i++) {
        map.put("test", 0);
        ExecutorService executorService =
          Executors.newFixedThreadPool(4);
        for (int j = 0; j < 10; j++) {
            executorService.execute(() -> {
                for (int k = 0; k < 10; k++)
                    map.computeIfPresent(
                      "test",
                      (key, value) -> value + 1
                    );
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        sumList.add(map.get("test"));
    }
    return sumList;
}

各map.computeIfPresentアクションに対して、

HashMap

は現在の整数値がどうなるべきかについて一貫した見方を提供しないため、一貫性のない望ましくない結果になります。


ConcurrentHashMap

に関しては、一貫した正しい結果が得られます。

@Test
public void givenConcurrentMap__whenSumParallel__thenCorrect()
  throws Exception {
    Map<String, Integer> map = new ConcurrentHashMap<>();
    List<Integer> sumList = parallelSum100(map, 1000);

    assertEquals(1, sumList
      .stream()
      .distinct()
      .count());
    long wrongResultCount = sumList
      .stream()
      .filter(num -> num != 100)
      .count();

    assertEquals(0, wrongResultCount);
}


3.2.

Null

キー/値


ConcurrentMap

が提供するほとんどの

__API


では、

null__キーまたは値を使用できません。次に例を示します。

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap__whenPutWithNullKey__thenThrowsNPE() {
    concurrentMap.put(null, new Object());
}

@Test(expected = NullPointerException.class)
public void givenConcurrentHashMap__whenPutNullValue__thenThrowsNPE() {
    concurrentMap.put("test", null);
}

ただし、

compute



および

merge

アクションの場合、計算値は

null

になります。これは、キーと値のマッピングが存在する場合は削除されるか、それ以前に存在しない場合は存在しないことを示します 。

@Test
public void givenKeyPresent__whenComputeRemappingNull__thenMappingRemoved() {
    Object oldValue = new Object();
    concurrentMap.put("test", oldValue);
    concurrentMap.compute("test", (s, o) -> null);

    assertNull(concurrentMap.get("test"));
}


3.3. ストリームサポート


Java 8

は、

ConcurrentHashMap

でも

Stream

サポートを提供します。

ほとんどのストリーム方式とは異なり、一括(順次および並列)操作では並行変更を安全に実行できます。


ConcurrentModificationException

はスローされません。これはそのイテレータにも適用されます。ストリームに関連して、いくつかの

forEach





search

、および

reduce

メソッドも、より豊富なトラバーサルおよびマップ縮小操作をサポートするために追加されています。


3.4. パフォーマンス

  • 内部的には、

    ConcurrentHashMap



    HashMap

    ** と多少似ていますが、データアクセスと更新はハッシュテーブルに基づいています(ただしより複雑です)。

もちろん、

ConcurrentHashMap

を使用すると、データの取得と更新に関して、ほとんどの場合にはるかに優れたパフォーマンスが得られます。


get



put

のパフォーマンスに関する簡単なマイクロベンチマークを書き、それを

Hashtable



Collections.synchronizedMap

と比較して、両方の操作を4つのスレッドで500,000回実行します。

@Test
public void givenMaps__whenGetPut500KTimes__thenConcurrentMapFaster()
  throws Exception {
    Map<String, Object> hashtable = new Hashtable<>();
    Map<String, Object> synchronizedHashMap =
      Collections.synchronizedMap(new HashMap<>());
    Map<String, Object> concurrentHashMap = new ConcurrentHashMap<>();

    long hashtableAvgRuntime = timeElapseForGetPut(hashtable);
    long syncHashMapAvgRuntime =
      timeElapseForGetPut(synchronizedHashMap);
    long concurrentHashMapAvgRuntime =
      timeElapseForGetPut(concurrentHashMap);

    assertTrue(hashtableAvgRuntime > concurrentHashMapAvgRuntime);
    assertTrue(syncHashMapAvgRuntime > concurrentHashMapAvgRuntime);
}

private long timeElapseForGetPut(Map<String, Object> map)
  throws InterruptedException {
    ExecutorService executorService =
      Executors.newFixedThreadPool(4);
    long startTime = System.nanoTime();
    for (int i = 0; i < 4; i++) {
        executorService.execute(() -> {
            for (int j = 0; j < 500__000; j++) {
                int value = ThreadLocalRandom
                  .current()
                  .nextInt(10000);
                String key = String.valueOf(value);
                map.put(key, value);
                map.get(key);
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return (System.nanoTime() - startTime)/500__000;
}

マイクロベンチマークはただ1つのシナリオを検討しているだけであり、実際のパフォーマンスを常によく反映しているとは限りません。

そうは言っても、平均的なdevシステムを備えたOS Xシステムでは、100回連続して実行したときの平均サンプル結果が見られます(ナノ秒単位)。

Hashtable: 1142.45
SynchronizedHashMap: 1273.89
ConcurrentHashMap: 230.2

複数のスレッドが共通の

Map

にアクセスすると予想されるマルチスレッド環境では、

ConcurrentHashMap

が明らかに望ましいです。

ただし、

Map

が単一スレッドにしかアクセスできない場合、

HashMap

はその単純さと堅実なパフォーマンスのためにはより良い選択になります。


3.5. 落とし穴

検索操作は通常

ConcurrentHashMap

でブロックされず、更新操作とオーバーラップする可能性があります。そのため、パフォーマンスを向上させるために、https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.htmlに記載されているように、最新の更新操作の結果のみが反映されます。公式Javadoc]。

念頭に置くべき他のいくつかの事実があります。


  • size



    isEmpty

    、およびを含む集約ステータスメソッドの結果


containsValue

は通常、マップが他のスレッドで同時更新されていない場合にのみ役立ちます。

@Test
public void givenConcurrentMap__whenUpdatingAndGetSize__thenError()
  throws InterruptedException {
    Runnable collectMapSizes = () -> {
        for (int i = 0; i < MAX__SIZE; i++) {
            mapSizes.add(concurrentMap.size());
        }
    };
    Runnable updateMapData = () -> {
        for (int i = 0; i < MAX__SIZE; i++) {
            concurrentMap.put(String.valueOf(i), i);
        }
    };
    executorService.execute(updateMapData);
    executorService.execute(collectMapSizes);
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    assertNotEquals(MAX__SIZE, mapSizes.get(MAX__SIZE - 1).intValue());
    assertEquals(MAX__SIZE, concurrentMap.size());
}

同時更新が厳密に管理されている場合でも、集計ステータスは信頼できます。

これらのステータス集計方法はリアルタイムの正確性を保証するものではありませんが、監視や見積もりの​​目的には十分かもしれません。

後者のメソッドは

long

カウントを返すため、

ConcurrentHashMap



size()

の使用は

mappingCount()

に置き換える必要があります。



  • ハッシュコード

    問題

    :まったく同じキーを多くのキーで使うことに注意してください


hashCode()

は、ハッシュテーブルのパフォーマンスを低下させる確実な方法です。

キーが

Comparable

であるときの影響を改善するために、

ConcurrentHashMap

はキー間の比較順序を使用して関係を壊すことができます。それでも、できる限り同じ__hashCode()を使用しないでください。

  • イテレータは単一スレッドで使用するように設計されています

ファストフェイルトラバーサルではなく、弱い一貫性
__ConcurrentModificationExceptionをスローします。
** デフォルトの初期テーブル容量は16です。

指定された並行性レベル:

public ConcurrentHashMap(
  int initialCapacity, float loadFactor, int concurrencyLevel) {

   //...
    if (initialCapacity < concurrencyLevel) {
        initialCapacity = concurrencyLevel;
    }
   //...
}

  • 関数の再マッピングに関する注意:再マッピング操作は可能ですが

提供されている

compute



merge **

メソッドを使って、それらを速く、短く、そしてシンプルに保ち、現在のマッピングに集中して予期しないブロッキングを避けるべきです。


ConcurrentHashMap

の** キーはソート順になっていません。

順序付けが必要な場合は、

ConcurrentSkipListMap

が適切な選択です。


4

ConcurrentNavigableMap


キーの順序付けが必要な場合は、

TreeMap

の並行バージョンである

ConcurrentSkipListMap

を使用できます。


ConcurrentMap

の補足として、

ConcurrentNavigableMap

はそのキーの完全な順序付け(デフォルトでは昇順)をサポートし、同時にナビゲート可能です。マップのビューを返すメソッドは、同時実行性の互換性のためにオーバーライドされます。


  • subMap


  • headMap


  • tailMap


  • subMap


  • headMap


  • tailMap


  • descendingMap


keySet()

ビューのイテレータとスプリテレータは、弱いメモリの一貫性で強化されました。


  • navigableKeySet


  • keySet


  • descendingKeySet


5

ConcurrentSkipListMap


以前は、

NavigableMap

インターフェースとその実装リンク:/java-treemap[

TreeMap

]について説明しました。

ConcurrentSkipListMap

は、スケーラブルな同時バージョンの

TreeMap

と見なすことができます。

実際には、赤黒ツリーをJavaで同時に実装することはできません。

https://en.wikipedia.org/wiki/Skip


list[

SkipLists

]の並行変種は

ConcurrentSkipListMap

に実装されており、

containsKey



get



put

、および

remove__操作、およびそれらの変種の平均平均log(n)時間コストを提供します。

TreeMapの機能に加えて、キーの挿入、削除、更新、およびアクセス操作がスレッドセーフで保証されています。並行して移動したときの

TreeMap

との比較は次のとおりです。

@Test
public void givenSkipListMap__whenNavConcurrently__thenCountCorrect()
  throws InterruptedException {
    NavigableMap<Integer, Integer> skipListMap
      = new ConcurrentSkipListMap<>();
    int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);

    assertEquals(10000 **  4, count);
}

@Test
public void givenTreeMap__whenNavConcurrently__thenCountError()
  throws InterruptedException {
    NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
    int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);

    assertNotEquals(10000 **  4, count);
}

private int countMapElementByPollingFirstEntry(
  NavigableMap<Integer, Integer> navigableMap,
  int elementCount,
  int concurrencyLevel) throws InterruptedException {

    for (int i = 0; i < elementCount **  concurrencyLevel; i++) {
        navigableMap.put(i, i);
    }

    AtomicInteger counter = new AtomicInteger(0);
    ExecutorService executorService
      = Executors.newFixedThreadPool(concurrencyLevel);
    for (int j = 0; j < concurrencyLevel; j++) {
        executorService.execute(() -> {
            for (int i = 0; i < elementCount; i++) {
                if (navigableMap.pollFirstEntry() != null) {
                    counter.incrementAndGet();
                }
            }
        });
    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);
    return counter.get();
}

舞台裏でのパフォーマンスの懸念についての完全な説明は、この記事の範囲を超えています。詳細は、

src.zip

ファイルの

java/util/concurrent

の下にある__ConcurrentSkipListMapのJavadocにあります。


6. 結論

この記事では、主に

ConcurrentMap

インターフェースと

ConcurrentHashMap

の機能を紹介し、

ConcurrentNavigableMap

が必要なキーの順序付けについて説明しました。

この記事で使用されているすべての例の完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/core-java-concurrency-collections[GitHubプロジェクト]にあります。