1. 序章

このチュートリアルでは、データ処理アプリケーションの例を使用して ApacheCrunchを示します。 このアプリケーションは、MapReduceフレームワークを使用して実行します。

まず、ApacheCrunchの概念について簡単に説明します。 次に、サンプルアプリにジャンプします。 このアプリでは、テキスト処理を行います。

  • まず、テキストファイルから行を読み取ります
  • 後で、それらを単語に分割し、いくつかの一般的な単語を削除します
  • 次に、残りの単語をグループ化して、一意の単語とその数のリストを取得します
  • 最後に、このリストをテキストファイルに書き込みます

2. クランチとは何ですか?

MapReduceは、サーバーのクラスター上で大量のデータを処理するための分散並列プログラミングフレームワークです。 HadoopやSparkなどのソフトウェアフレームワークはMapReduceを実装しています。

Crunchは、JavaでMapReduceパイプラインを作成、テスト、実行するためのフレームワークを提供します。 ここでは、MapReduceジョブを直接記述しません。 むしろ、データパイプラインを定義します(つまり Crunch APIを使用して、入力、処理、および出力の各ステップを実行する操作)。 Crunch PlannerはそれらをMapReduceジョブにマップし、必要に応じて実行します。

したがって、すべてのCrunchデータパイプラインはPipelineインターフェイスのインスタンスによって調整されます。このインターフェイスは、 Source インスタンスを介してパイプラインにデータを読み取り、パイプラインからターゲットインスタンス。

データを表すための3つのインターフェースがあります。

  1. PCollection –要素の不変の分散コレクション
  2. PTable 、V >> –キーと値の不変、分散、順序付けされていないマルチマップ
  3. PGroupedTable 、V >> –タイプKのキーの分散されたソートされたマップ反復可能正確に1回繰り返される可能性のあるV

DoFn は、すべてのデータ処理関数の基本クラスです。 これは、MapReduceの Mapper Reducer 、およびCombinerクラスに対応します。 開発時間のほとんどは、それを使用した論理計算の記述とテストに費やしています

Crunchに慣れてきたので、それを使用してサンプルアプリケーションを作成しましょう。

3. クランチプロジェクトの設定

まず、Mavenを使用してクランチプロジェクトを設定しましょう。 これは2つの方法で行うことができます。

  1. 既存のプロジェクトのpom.xmlファイルに必要な依存関係を追加します
  2. アーキタイプを使用してスタータープロジェクトを生成する

両方のアプローチを簡単に見てみましょう。

3.1. Mavenの依存関係

Crunchを既存のプロジェクトに追加するために、pom.xmlファイルに必要な依存関係を追加しましょう。

まず、crunch-coreライブラリを追加しましょう。

<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>0.15.0</version>
</dependency>

次に、Hadoopと通信するためのhadoop-clientライブラリを追加しましょう。 Hadoopのインストールに一致するバージョンを使用します。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>

Maven Centralで、crunch-coreおよびhadoop-clientライブラリの最新バージョンを確認できます。

3.2. Mavenアーキタイプ

別のアプローチは、Crunchが提供するMavenアーキタイプを使用してスタータープロジェクトをすばやく生成することです。

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype

上記のコマンドでプロンプトが表示されたら、Crunchバージョンとプロジェクトアーティファクトの詳細を提供します。

4. クランチパイプラインのセットアップ

プロジェクトを設定したら、Pipelineオブジェクトを作成する必要があります。 Crunchには3つのパイプライン実装があります

  • MRPipeline –HadoopMapReduce内で実行
  • SparkPipeline –一連のSparkパイプラインとして実行されます
  • MemPipeline –クライアントでメモリ内で実行され、単体テストに役立ちます

通常、MemPipelineのインスタンスを使用して開発およびテストします。 後で、実際の実行にMRPipelineまたはSparkPipelineのインスタンスを使用します。

メモリ内パイプラインが必要な場合は、静的メソッド getInstance を使用して、MemPipelineインスタンスを取得できます。

Pipeline pipeline = MemPipeline.getInstance();

ただし、ここでは、 MRPipeline のインスタンスを作成して、Hadoop でアプリケーションを実行してみましょう。

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. 入力データの読み取り

パイプラインオブジェクトを作成した後、入力データを読み取ります。  Pipelineインターフェースは、テキストファイル readTextFile(pathName)。から入力を読み取るための便利なメソッドを提供します。

このメソッドを呼び出して、入力テキストファイルを読み取ります。

PCollection<String> lines = pipeline.readTextFile(inputPath);

上記のコードは、テキストファイルをStringのコレクションとして読み取ります。

次のステップとして、入力を読み取るためのテストケースを作成しましょう。

@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
    Pipeline pipeline = MemPipeline.getInstance();
    PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);

    assertEquals(21, lines.asCollection()
      .getValue()
      .size());
}

このテストでは、テキストファイルを読み取るときに予想される行数を取得することを確認します。

6. データ処理手順

入力データを読み取った後、それを処理する必要があります。  Crunch APIには、一般的なデータ処理シナリオを処理するためのDoFnのサブクラスがいくつか含まれています

  • FilterFn –ブール条件に基づいてコレクションのメンバーをフィルタリングします
  • MapFn –各入力レコードを正確に1つの出力レコードにマップします
  • CombineFn –複数の値を1つの値に結合します
  • JoinFn –内部結合、左外部結合、右外部結合、完全外部結合などの結合を実行します

これらのクラスを使用して、次のデータ処理ロジックを実装しましょう。

  1. 入力ファイルの各行を単語に分割します
  2. ストップワードを削除する
  3. ユニークな単語を数える

6.1. テキストの行を単語に分割する

まず、 Tokenizer クラスを作成して、行を単語に分割しましょう。

DoFnクラスを拡張します。 このクラスには、processという抽象メソッドがあります。 このメソッドは、からの入力レコードを処理します PCollection そして出力をに送信しますエミッタ。 

このメソッドで分割ロジックを実装する必要があります。

public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter
      .onPattern("\\s+")
      .omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

上記の実装では、GuavaライブラリのSplitterクラスを使用して、行から単語を抽出しました。

次に、Tokenizerクラスの単体テストを作成しましょう。

@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
 
    @Mock
    private Emitter<String> emitter;

    @Test
    public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
        Tokenizer splitter = new Tokenizer();
        splitter.process("  hello  world ", emitter);

        verify(emitter).emit("hello");
        verify(emitter).emit("world");
        verifyNoMoreInteractions(emitter);
    }
}

上記のテストは、正しい単語が返されることを確認します。

最後に、このクラスを使用して、入力テキストファイルから読み取った行を分割してみましょう。

PCollectionインターフェイスのparallelDoメソッドは、指定された DoFn をすべての要素に適用し、新しいPCollectionを返します。

行コレクションでこのメソッドを呼び出し、Tokenizerのインスタンスを渡します。

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

その結果、入力テキストファイル内の単語のリストを取得します。 次のステップでストップワードを削除します。

6.2. ストップワードを削除する

前の手順と同様に、 StopWordFilter クラスを作成して、ストップワードを除外してみましょう。

ただし、DoFnの代わりにFilterFnを拡張します。 FilterFn には、acceptという抽象メソッドがあります。 このメソッドでフィルタリングロジックを実装する必要があります。

public class StopWordFilter extends FilterFn<String> {

    // English stop words, borrowed from Lucene.
    private static final Set<String> STOP_WORDS = ImmutableSet
      .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
        "or", "s", "such", "t", "that", "the", "their", "then", "there",
        "these", "they", "this", "to", "was", "will", "with" });

    @Override
    public boolean accept(String word) {
        return !STOP_WORDS.contains(word);
    }
}

次に、StopWordFilterクラスの単体テストを記述しましょう。

public class StopWordFilterUnitTest {

    @Test
    public void givenFilter_whenStopWordPassed_thenFalseReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertFalse(filter.accept("the"));
        assertFalse(filter.accept("a"));
    }

    @Test
    public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertTrue(filter.accept("Hello"));
        assertTrue(filter.accept("World"));
    }

    @Test
    public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
        PCollection<String> words = MemPipeline
          .collectionOf("This", "is", "a", "test", "sentence");
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        assertEquals(ImmutableList.of("This", "test", "sentence"),
         Lists.newArrayList(noStopWords.materialize()));
    }
}

このテストは、フィルタリングロジックが正しく実行されていることを確認します。

最後に、 StopWordFilter を使用して、前の手順で生成された単語のリストをフィルタリングしてみましょう。 PCollectionインターフェイスのfilterメソッドは、指定されたFilterFnをすべての要素に適用し、新しいPCollectionを返します。

単語コレクションでこのメソッドを呼び出し、StopWordFilterのインスタンスを渡します。

PCollection<String> noStopWords = words.filter(new StopWordFilter());

その結果、フィルタリングされた単語のコレクションが得られます。

6.3. ユニークな単語を数える

フィルタリングされた単語のコレクションを取得した後、各単語が出現する頻度をカウントします。  PCollectionインターフェイスには、一般的な集計を実行するためのいくつかのメソッドがあります。

  • min –コレクションの最小要素を返します
  • max –コレクションの最大要素を返します
  • length –コレクション内の要素の数を返します
  • count –コレクションの各一意の要素のカウントを含むPTableを返します

count メソッドを使用して、一意の単語とその数を取得してみましょう。

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();

7. 出力を指定

前の手順の結果として、単語とその数の表ができました。 この結果をテキストファイルに書き込みたいと思います。 パイプラインインターフェースは、出力を書き込むための便利なメソッドを提供します:

void write(PCollection<?> collection, Target target);

void write(PCollection<?> collection, Target target,
  Target.WriteMode writeMode);

<T> void writeTextFile(PCollection<T> collection, String pathName);

したがって、writeTextFileメソッドを呼び出しましょう。

pipeline.writeTextFile(counts, outputPath);

8. パイプラインの実行を管理する

これまでのすべてのステップで、データパイプラインが定義されました。 入力が読み取られたり処理されたりしていません。  これは、Crunchが遅延実行モデルを使用しているためです。

パイプラインインターフェイスでジョブの計画と実行を制御するメソッドが呼び出されるまで、MapReduceジョブは実行されません。

  • run –必要な出力を作成するための実行プランを準備し、それを同期的に実行します
  • done –出力の生成に必要な残りのジョブを実行し、作成された中間データファイルをクリーンアップします
  • runAsync – runメソッドに似ていますが、非ブロッキング方式で実行されます

したがって、 done メソッドを呼び出して、パイプラインをMapReduceジョブとして実行しましょう。

PipelineResult result = pipeline.done();

上記のステートメントは、MapReduceジョブを実行して入力を読み取り、処理して、結果を出力ディレクトリに書き込みます。

9. パイプラインをまとめる

これまで、入力データを読み取り、処理して出力ファイルに書き込むロジックを開発し、単体テストを行いました。

次に、それらを組み合わせて、データパイプライン全体を構築しましょう。

public int run(String[] args) throws Exception {
    String inputPath = args[0];
    String outputPath = args[1];

    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(inputPath);

    // Define a function that splits each line in a PCollection of Strings into
    // a PCollection made up of the individual words in the file.
    // The second argument sets the serialization format.
    PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

    // Take the collection of words and remove known stop words.
    PCollection<String> noStopWords = words.filter(new StopWordFilter());

    // The count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    PTable<String, Long> counts = noStopWords.count();

    // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, outputPath);

    // Execute the pipeline as a MapReduce.
    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
}

10. Hadoop起動構成

これで、データパイプラインの準備が整いました。

ただし、起動するにはコードが必要です。 したがって、mainメソッドを記述してアプリケーションを起動しましょう。

public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

ToolRunner.run は、コマンドラインからHadoop構成を解析し、MapReduceジョブを実行します。

11. アプリケーションを実行する

これで、完全なアプリケーションの準備が整いました。 次のコマンドを実行してビルドしてみましょう。

mvn package

上記のコマンドの結果として、パッケージ化されたアプリケーションと特別なジョブjarがターゲットディレクトリに取得されます。

このジョブjarを使用して、Hadoopでアプリケーションを実行してみましょう。

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>

アプリケーションは入力ファイルを読み取り、結果を出力ファイルに書き込みます。 出力ファイルには、次のような一意の単語とその数が含まれています。

[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]

Hadoopに加えて、スタンドアロンアプリケーションとして、または単体テストとして、IDE内でアプリケーションを実行できます。

12. 結論

このチュートリアルでは、MapReduceで実行されるデータ処理アプリケーションを作成しました。 Apache Crunchを使用すると、JavaでMapReduceパイプラインを簡単に作成、テスト、実行できます。

いつものように、完全なソースコードはGithubにあります。