1. 概要

この記事では、 java .util.concurrent パッケージの2つの構成要素、LongAdderLongAccumulator。について説明します。

どちらもマルチスレッド環境で非常に効率的に作成されており、非常に巧妙な戦術を活用してロックフリーでありながらスレッドセーフを維持します。

2. LongAdder

AtomicLong を使用することがボトルネックになる可能性がある、いくつかの値を頻繁にインクリメントするロジックについて考えてみましょう。 これは、コンペアアンドスワップ操作を使用します。これは、激しい競合の下で、多くのCPUサイクルの浪費につながる可能性があります。

一方、 LongAdder は、非常に巧妙なトリックを使用して、スレッド間の競合を減らします。

LongAdder、のインスタンスをインクリメントする場合は、 increment()メソッドを呼び出す必要があります。 その実装は、オンデマンドで拡張できるカウンターの配列を保持します

したがって、より多くのスレッドが increment()を呼び出すと、配列は長くなります。 アレイ内の各レコードは個別に更新できるため、競合が減少します。 そのため、 LongAdder は、複数のスレッドからカウンターをインクリメントするための非常に効率的な方法です。

LongAdder クラスのインスタンスを作成し、複数のスレッドから更新してみましょう。

LongAdder counter = new LongAdder();
ExecutorService executorService = Executors.newFixedThreadPool(8);

int numberOfThreads = 4;
int numberOfIncrements = 100;

Runnable incrementAction = () -> IntStream
  .range(0, numberOfIncrements)
  .forEach(i -> counter.increment());

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(incrementAction);
}

LongAdder のカウンターの結果は、 sum()メソッドを呼び出すまで使用できません。 このメソッドは、下の配列のすべての値を反復処理し、それらの値を合計して適切な値を返します。 ただし、 sum()メソッドの呼び出しには非常にコストがかかる可能性があるため、注意が必要です。

assertEquals(counter.sum(), numberOfIncrements * numberOfThreads);

sum()を呼び出した後、 LongAdder のインスタンスに関連付けられているすべての状態をクリアして、最初からカウントを開始したい場合があります。 sumThenReset()メソッドを使用して、次のことを実現できます。

assertEquals(counter.sumThenReset(), numberOfIncrements * numberOfThreads);
assertEquals(counter.sum(), 0);

その後のsum()メソッドの呼び出しはゼロを返すことに注意してください。これは、状態が正常にリセットされたことを意味します。

さらに、Javaは DoubleAdder も提供し、LongAdder。と同様のAPIを使用してdouble値の合計を維持します。

3. LongAccumulator

LongAccumulator も非常に興味深いクラスです。これにより、さまざまなシナリオでロックフリーアルゴリズムを実装できます。 たとえば、提供された LongBinaryOperator に従って結果を累積するために使用できます。これは、StreamAPIのreduce()操作と同様に機能します。

LongAccumulator のインスタンスは、LongBinaryOperatorとそのコンストラクターに初期値を指定することで作成できます。 LongAccumulatorは、累積の順序が重要ではない可換関数を提供すると、正しく機能することを覚えておくことが重要です。

LongAccumulator accumulator = new LongAccumulator(Long::sum, 0L);

LongAccumulator を作成していますが、 ch は、すでにアキュムレータにあった値に新しい値を追加します。 LongAccumulator の初期値をゼロに設定しているため、 accumulate()メソッドの最初の呼び出しでは、previousValueの値はゼロになります。

複数のスレッドからaccumulate()メソッドを呼び出しましょう。

int numberOfThreads = 4;
int numberOfIncrements = 100;

Runnable accumulateAction = () -> IntStream
  .rangeClosed(0, numberOfIncrements)
  .forEach(accumulator::accumulate);

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(accumulateAction);
}

accumulate()メソッドに引数として数値を渡す方法に注目してください。 このメソッドは、 sum()関数を呼び出します。

LongAccumulator は、コンペアアンドスワップの実装を使用しています。これにより、これらの興味深いセマンティクスが実現します。

まず、 LongBinaryOperator、として定義されたアクションを実行し、次にpreviousValueが変更されたかどうかを確認します。 変更された場合は、新しい値でアクションが再実行されます。 そうでない場合は、アキュムレータに格納されている値を変更することに成功します。

これで、すべての反復からのすべての値の合計が20200であったと断言できます。

assertEquals(accumulator.get(), 20200);

興味深いことに、Javaは DoubleAccumulator にも同じ目的とAPIを提供しますが、double値用です。

4. ダイナミックストライピング

Javaのすべての加算器とアキュムレータの実装は、と呼ばれる興味深い基本クラスから継承しています。 Striped64。 このクラスは、現在の状態を維持するために1つの値だけを使用するのではなく、状態の配列を使用して、競合をさまざまなメモリ位置に分散します。 

Striped64の機能を簡単に説明します。

さまざまなスレッドがさまざまなメモリ位置を更新しています。 状態の配列(つまり、ストライプ)を使用しているため、このアイデアは動的ストライピングと呼ばれます。 興味深いことに、 Striped64 は、このアイデアと64ビットデータ型で機能するという事実にちなんで名付けられました。

ダイナミックストライピングにより、全体的なパフォーマンスが向上することが期待されます。 ただし、JVMがこれらの状態を割り当てる方法は、逆効果になる可能性があります。

より具体的には、JVMはこれらの状態をヒープ内で互いに近くに割り当てる場合があります。 これは、いくつかの状態が同じCPUキャッシュラインに存在できることを意味します。 したがって、 1つのメモリ位置を更新すると、近くの状態へのキャッシュミスが発生する可能性がありますこの現象は、偽共有と呼ばれ、パフォーマンスを低下させます

偽共有を防ぐため。 Striped64 実装は、各状態の周囲に十分なパディングを追加して、各状態が独自のキャッシュラインに存在することを確認します。

@Contended アノテーションは、このパディングを追加する役割を果たします。 パディングにより、メモリ消費量が増える代わりにパフォーマンスが向上します。

5. 結論

このクイックチュートリアルでは、LongAdderLongAccumulatorを見て、両方の構成を使用して非常に効率的でロックのないソリューションを実装する方法を示しました。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。