1. 概要

この記事では、リアクティブストリームマニフェストに準拠するAkkaアクターフレームワークの上に構築されたakka-streamsライブラリについて説明します。 Akka Streams APIを使用すると、独立したステップからデータ変換フローを簡単に作成できます。

さらに、すべての処理は、リアクティブ、非ブロッキング、および非同期の方法で実行されます。

2. Mavenの依存関係

開始するには、 akka-streamおよびakka-stream-testkitライブラリをpom.xml:に追加する必要があります。

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_2.11</artifactId>
    <version>2.5.2</version>
</dependency>

3. Akka Streams API

Akka Streamsを使用するには、コアAPIの概念を知っておく必要があります。

  • Source –akka-streamライブラリで処理するためのエントリポイント–複数のソースからこのクラスのインスタンスを作成できます。 たとえば、単一の StringからSourceを作成する場合は、 single()メソッドを使用できます。または、Sourceを作成できます。要素のIterableからの
  • フロー–メイン処理ビルディングブロック –すべてのフローインスタンスには、1つの入力値と1つの出力値があります
  • マテリアライザー–フローに結果のログ記録や保存などの副作用を持たせたい場合は、マテリアライザーを使用できます; 最も一般的には、 NotUsedエイリアスをMaterializerとして渡し、Flowに副作用がないことを示します。
  • シンク操作–フローを構築しているとき、シンク操作を登録するまで実行されません–これはフロー全体のすべての計算をトリガーするターミナル操作です。

4. AkkaStreamsでのフローの作成

簡単な例を作成することから始めましょう。ここでは、複数のフローを作成して組み合わせる –整数のストリームを処理し、ストリームから整数ペアの平均移動ウィンドウを計算する方法を示します。

セミコロンで区切られた整数のStringを入力として解析し、例の akka-streamSourceを作成します。

4.1. フローを使用して入力を解析する

まず、後でFlowを作成するために使用するActorSystemのインスタンスを取得するDataImporterクラスを作成しましょう。

public class DataImporter {
    private ActorSystem actorSystem;

    // standard constructors, getters...
}

次に、を作成しましょう parseLine を生成するメソッドリスト整数区切られた入力から弦。 ここでは、解析のためにのみJavaStreamAPIを使用していることに注意してください。

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");
    return Arrays.stream(fields)
      .map(Integer::parseInt)
      .collect(Collectors.toList());
}

最初のFlowは、 parseLine を入力に適用して、入力タイプStringおよび出力タイプIntegerFlowを作成します。 ]:

private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)
      .mapConcat(this::parseLine);
}

parseLine()メソッドを呼び出すと、コンパイラは、そのラムダ関数の引数が String になることを認識します。これは、Flowへの入力タイプと同じです。 。

Listをフラット化するため、 mapConcat()メソッド(Java 8 flatMap()メソッドと同等)を使用していることに注意してください。 parseLine()によってIntegerFlowに返される整数。これにより、処理の後続のステップでリストを処理する必要がなくなります。

4.2. Flowを使用して計算を実行する

この時点で、解析された整数のFlowがあります。 次に、すべての入力要素をペアにグループ化し、それらのペアの平均を計算するロジックを実装する必要があります

次に、整数のフローを作成し、grouped()メソッドを使用してそれらをグループ化します。

次に、平均を計算します。

これらの平均が処理される順序には関心がないため、mapAsyncUnordered()メソッドを使用して、複数のスレッドを使用して並列に平均を計算し、スレッド数を引数として渡すことができます。方法。

ラムダとしてFlowに渡されるアクションは、 CompleteableFuture を返す必要があります。これは、そのアクションが別のスレッドで非同期に計算されるためです。

private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .grouped(2)
      .mapAsyncUnordered(8, integers ->
        CompletableFuture.supplyAsync(() -> integers.stream()
          .mapToDouble(v -> v)
          .average()
          .orElse(-1.0)));
}

8つの並列スレッドの平均を計算しています。 平均の計算にはJava8StreamAPIを使用していることに注意してください。

4.3. 複数のフローを単一のフローに構成する

Flow APIは流暢な抽象化であり、複数のFlowインスタンスを作成して、最終的な処理目標を達成できます。 たとえば、1つが JSONを解析し、が何らかの変換を実行し、別のフローが統計を収集する、きめ細かいフローを作成できます。

このような粒度は、各処理ステップを個別にテストできるため、よりテスト可能なコードを作成するのに役立ちます。

上記の2つのフローを作成しました。これらは、互いに独立して機能します。 今、私たちはそれらを一緒に構成したいと思います。

まず、入力 String を解析し、次に、要素のストリームの平均を計算します。

via()メソッドを使用してフローを作成できます。

Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)
      .via(parseContent())
      .via(computeAverage());
}

入力タイプがStringFlowとそれに続く2つのフローを作成しました。 parseContent() Flow は、 String 入力を受け取り、Integerを出力として返します。 computeAverage()フローは、その Integer を取得し、出力タイプとしてDoubleを返す平均を計算します。

5. シンクフローに追加します

前述したように、これまでのところ、 Flow 全体は、遅延しているため、まだ実行されていません。 フローの実行を開始するには、シンクを定義する必要があります。 シンク操作では、たとえば、データをデータベースに保存したり、結果を外部のWebサービスに送信したりできます。

データベースに結果を書き込む次のsave()メソッドを持つAverageRepositoryクラスがあるとします。

CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // write to database
        return average;
    });
}

ここで、このメソッドを使用してFlow処理の結果を保存するSink操作を作成します。 シンクを作成するには、最初に処理の結果を入力タイプとして受け取るフローを作成する必要があります。 次に、すべての結果をデータベースに保存します。

ここでも、要素の順序は気にしないため、 mapAsyncUnordered()メソッドを使用して save()操作を並列で実行できます。

FlowからSinkを作成するには、最初の引数として Sink.ignore()を指定して toMat()を呼び出す必要があります。処理のステータスを返したいので、2番目として Keep.right()

private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save)
      .toMat(Sink.ignore(), Keep.right());
}

6. フローのソースの定義

私たちがする必要がある最後のことはすることです入力からソースを作成します弦。 適用することができます calculateAverage() フローを使用してこのソースに経由() 方法。

次に、シンクを処理に追加するには、 runWith()メソッドを呼び出して、作成した storeAverages()シンクを渡す必要があります。

CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)
      .via(calculateAverage())
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {
          if (d != null) {
              System.out.println("Import finished ");
          } else {
              e.printStackTrace();
          }
      });
}

処理が終了すると、 whenComplete()コールバックが追加されることに注意してください。このコールバックでは、処理の結果に応じて何らかのアクションを実行できます。

7. AkkaStreamsのテスト

を使用して処理をテストできます akka-stream-testkit。

処理の実際のロジックをテストする最良の方法は、すべての Flow ロジックをテストし、 TestSink を使用して計算をトリガーし、結果をアサートすることです。

このテストでは、テストする Flow を作成し、次に、テスト入力コンテンツからSourceを作成します。

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";

    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);

    // then
    flow
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .request(4)
      .expectNextUnordered(5d, 5.5);
}

4つの入力引数を期待していることを確認しています。処理は非同期で並列に行われるため、平均である2つの結果が任意の順序で到着する可能性があります。

8. 結論

この記事では、akka-streamライブラリについて説明しました。

複数のフローを組み合わせて要素の移動平均を計算するプロセスを定義しました。 次に、ストリーム処理のエントリポイントである Source と、実際の処理をトリガーするSinkを定義しました。

最後に、akka-stream-testkitを使用して処理のテストを作成しました。

これらすべての例とコードスニペットの実装は、 GitHubプロジェクトにあります。これはMavenプロジェクトであるため、そのままインポートして実行するのは簡単です。