1. 序章

このチュートリアルでは、HazelcastJetについて学習します。 これは、Hazelcast、Incが提供する分散データ処理エンジンです。 HazelcastIMDGの上に構築されています。

Hazelcast IMDGについて知りたい場合は、こちらが開始するための記事です。

2. ヘーゼルキャストジェットとは何ですか?

Hazelcast Jetは、データをストリームとして扱う分散データ処理エンジンです。 データベースまたはファイルに保存されているデータと、Kafkaサーバーによってストリーミングされているデータを処理できます。

さらに、ストリームをサブセットに分割し、各サブセットに集計を適用することで、無限のデータストリームに対して集計関数を実行できます。 この概念は、Jetの用語ではウィンドウ処理として知られています。

Jetをマシンのクラスターにデプロイしてから、データ処理ジョブを送信できます。 Jetは、クラスターのすべてのメンバーにデータを自動的に処理させます。 クラスタの各メンバーはデータの一部を消費するため、任意のレベルのスループットに簡単にスケールアップできます。

HazelcastJetの一般的な使用例は次のとおりです。

  • リアルタイムストリーム処理
  • 高速バッチ処理
  • Java8ストリームを分散して処理する
  • マイクロサービスでのデータ処理

3. 設定

私たちの環境でHazelcastJetをセットアップするには、pom.xmlに単一のMaven依存関係を追加する必要があります。

これが私たちのやり方です:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>4.2</version>
</dependency>

この依存関係を含めると、分散データ処理パイプラインを構築するために必要なすべてのインフラストラクチャを提供する10Mbjarファイルがダウンロードされます。

HazelcastJetの最新バージョンはここにあります。

4. サンプルアプリケーション

Hazelcast Jetの詳細を学ぶために、文の入力とそれらの文で検索する単語を受け取り、それらの文で指定された単語の数を返すサンプルアプリケーションを作成します。

4.1. パイプライン

パイプラインは、Jetアプリケーションの基本構造を形成します。 パイプライン内の処理は次の手順に従います。

  • ソースからデータを読み取る
  • データを変換する
  • シンクにデータを書き込む

このアプリケーションの場合、パイプラインは分散リストから読み取り、グループ化と集約の変換を適用し、最後に分散マップに書き込みます。

パイプラインの記述方法は次のとおりです。

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    p.readFrom(Sources.<String>list(LIST_NAME))
      .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
      .filter(word -> !word.isEmpty())
      .groupingKey(wholeItem())
      .aggregate(counting())
      .writeTo(Sinks.map(MAP_NAME));
    return p;
}

ソースから読み取ったら、データをトラバースし、正規表現を使用してスペース全体に分割します。 その後、空白を除外します。

最後に、単語をグループ化し、それらを集約して、結果を地図。

4.2. 仕事

パイプラインが定義されたので、パイプラインを実行するためのジョブを作成します。

パラメータを受け入れてカウントを返すcountWord関数の記述方法は次のとおりです。

public Long countWord(List<String> sentences, String word) {
    long count = 0;
    JetInstance jet = Jet.newJetInstance();
    try {
        List<String> textList = jet.getList(LIST_NAME);
        textList.addAll(sentences);
        Pipeline p = createPipeLine();
        jet.newJob(p).join();
        Map<String, Long> counts = jet.getMap(MAP_NAME);
        count = counts.get(word);
        } finally {
            Jet.shutdownAll();
      }
    return count;
}

ジョブを作成してパイプラインを使用するために、最初にJetインスタンスを作成します。 次に、入力リストを分散リストにコピーして、すべてのインスタンスで使用できるようにします。

次に、上記で構築したパイプラインを使用してジョブを送信します。 メソッドnewJob()は、Jetによって非同期的に開始された実行可能ジョブを返します。 join メソッドは、ジョブが完了するのを待ち、ジョブがエラーで完了した場合は例外をスローします。

ジョブが完了すると、パイプラインで定義したマップに結果が取得されます。 したがって、Jetインスタンスから Map を取得し、それに対する単語のカウントを取得します。

最後に、Jetインスタンスをシャットダウンします。 Jetインスタンスは独自のスレッドを開始するため、実行の終了後にシャットダウンすることが重要です。 そうしないと、メソッドが終了した後でもJavaプロセスは存続します。

Jet用に作成したコードをテストする単体テストは次のとおりです。

@Test
public void whenGivenSentencesAndWord_ThenReturnCountOfWord() {
    List<String> sentences = new ArrayList<>();
    sentences.add("The first second was alright, but the second second was tough.");
    WordCounter wordCounter = new WordCounter();
    long countSecond = wordCounter.countWord(sentences, "second");
    assertEquals(3, countSecond);
}

5. 結論

この記事では、HazelcastJetについて学びました。 それとその機能の詳細については、マニュアルを参照してください。

いつものように、この記事で使用されている例のコードは、Githubにあります。