1.はじめに

このチュートリアルでは、データ処理アプリケーションの例を使用してhttps://crunch.apache.org/[Apache Crunch]を紹介します。このアプリケーションはhttps://en.wikipedia.org/wiki/MapReduce[MapReduce]フレームワークを使用して実行します。

まず、Apache Crunchの概念について簡単に説明します。それでは、サンプルアプリにジャンプしましょう。このアプリでは、テキスト処理を行います。

  • まず最初に、テキストファイルから行を読みます。

  • 後で、それらを単語に分割し、いくつかの一般的な単語を削除します。

  • 次に、残りの単語をグループ化して、ユニークな単語のリストを取得します。

そして彼らのカウント
** 最後に、このリストをテキストファイルに書き込みます。

2.クランチとは何ですか?

MapReduceは、サーバーのクラスター上で大量のデータを処理するための分散型並列プログラミングフレームワークです。 HadoopやSparkなどのソフトウェアフレームワークはMapReduceを実装しています。


  • Crunchは、JavaでMapReduceパイプラインを作成、テスト、および実行するためのフレームワークを提供します。

    ** ここでは、MapReduceの求人を直接書くことはありません。

むしろ、クランチAPIを使用してデータパイプライン(すなわち、入力、処理、および出力ステップを実行するための操作)を定義します。 Crunch PlannerはそれらをMapReduceジョブにマップし、必要に応じてそれらを実行します。

  • したがって、すべてのCrunchデータパイプラインは

    Pipeline

    インターフェイスのインスタンスによって調整されます** このインターフェイスは、

    Source

    インスタンスを介してデータをパイプラインに読み込み、パイプラインから

    Target

    インスタンスにデータを書き込むためのメソッドも定義します。

データを表現するためのインターフェースが3つあります。


  1. PCollection

    – 不変の分散型の要素のコレクション


  2. PTable <K

    、V


    _>

    _

    – 不変の分散無秩序マルチ

キーと値


PGroupedTable <K

、V


_>

_

– 型のキーの分散ソートマップ

Kを

Iterable

Vに変換します。


__

DoFn





はすべてのデータ処理関数


の基本クラスです。これはMapReduceの

Mapper



Reducer

、および

Combiner__クラスに対応しています。

ほとんどの開発時間は、それを使って論理計算を書いてテストすることに費やしています。

これでCrunchの知識が増えたので、それを使ってサンプルアプリケーションを作成しましょう。

3.クランチプロジェクトの設定

まず最初に、Mavenとクランチプロジェクトを立ち上げましょう。これには2つの方法があります。

  1. 既存のファイルの

    pom.xml

    ファイルに必要な依存関係を追加します.

プロジェクト
。スタータープロジェクトを生成するためにアーキタイプを使用する

両方のアプローチを簡単に見てみましょう。

3.1. Mavenの依存関係

既存のプロジェクトにCrunchを追加するには、

pom.xml

ファイルに必要な依存関係を追加しましょう。

まず、

crunch-core

ライブラリを追加しましょう。

<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>0.15.0</version>
</dependency>

次に、Hadoopと通信するための

hadoop-client

ライブラリを追加しましょう。

Hadoopのインストールと一致するバージョンを使用します。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>


crunch-core

およびhttps:/の最新バージョンについては、Maven Centralを確認してください。/search.maven.org/search?q=g:org.apache.hadoop%20AND%20a:hadoop-client&core=gav[hadoop-client]ライブラリ

3.2. メイヴンアーキタイプ

  • もう1つの方法は、Crunchが提供するMavenアーキタイプを使ってスタータープロジェクトを素早く生成することです。

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype

上記のコマンドでプロンプトが表示されたら、Crunchバージョンとプロジェクト成果物の詳細を入力します。

4.クランチパイプラインの設定

プロジェクトを設定したら、

Pipeline

オブジェクトを作成する必要があります。

Crunchは3つの

Pipeline

実装を持っています

:


  • MRPipeline

    – Hadoop MapReduce内で実行されます


  • SparkPipeline

    – 一連のSparkパイプラインとして実行されます。


  • MemPipeline

    – クライアント上でメモリ内で実行されます。

単体テスト

通常、

MemPipeline

のインスタンスを使用して開発とテストを行います。後で実際の実行には

MRPipeline

または

SparkPipeline

のインスタンスを使用します。

インメモリパイプラインが必要な場合は、静的メソッド

getInstance

を使用して

MemPipeline

インスタンスを取得できます。

Pipeline pipeline = MemPipeline.getInstance();

しかし今のところ、Hadoopでアプリケーションを実行するための

MRPipeline

のインスタンスを作成しましょう


_:

_

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5.入力データを読み込む

パイプラインオブジェクトを作成した後、入力データを読みたいです。


Pipeline

インターフェースはテキストファイルから入力を読み込むための便利なメソッドを提供します

、__readTextFile(pathName)

このメソッドを呼び出して入力テキストファイルを読みましょう。

PCollection<String> lines = pipeline.readTextFile(inputPath);

上記のコードはテキストファイルを

String

のコレクションとして読み取ります。

次のステップとして、入力を読むためのテストケースを書きましょう。

@Test
public void givenPipeLine__whenTextFileRead__thenExpectedNumberOfRecordsRead() {
    Pipeline pipeline = MemPipeline.getInstance();
    PCollection<String> lines = pipeline.readTextFile(INPUT__FILE__PATH);

    assertEquals(21, lines.asCollection()
      .getValue()
      .size());
}

このテストでは、テキストファイルを読み込むときに予想される行数が得られることを確認します。

6.データ処理手順

入力データを読んだ後、それを処理する必要があります。 ** クランチAPIには、一般的なデータ処理シナリオを処理するための

DoFn

のサブクラスがいくつか含まれています。


  • FilterFn

    – ブール値に基づいてコレクションのメンバーをフィルタリングします

調子
**

MapFn

– 各入力レコードを正確に1つの出力レコードにマップします


  • CombineFn

    – 多数の値を単一の値に結合します


  • JoinFn

    – 内部結合、左外部結合、右などの結合を実行します。

外部結合と完全外部結合

これらのクラスを使用して、次のデータ処理ロジックを実装しましょう。

  1. 入力ファイルの各行を単語に分割する

  2. ストップワードを削除

  3. ユニークな言葉を数える

6.1. 一行のテキストを単語に分割する

まず最初に、行を単語に分割するための

Tokenizer

クラスを作成しましょう。


DoFn

クラスを拡張します。このクラスには

process

という抽象メソッドがあります。このメソッドは

PCollection

からの入力レコードを処理して

Emitterに出力を送ります。

このメソッドで分割ロジックを実装する必要があります。

public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter
      .onPattern("\\s+")
      .omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

上記の実装では、https://github.com/google/guava/wiki/StringsExplained#splitter[Guava]ライブラリの

Splitter

クラスを使用して行から単語を抽出しました。

次に、

Tokenizer

クラスの単体テストを書きましょう。

@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {

    @Mock
    private Emitter<String> emitter;

    @Test
    public void givenTokenizer__whenLineProcessed__thenOnlyExpectedWordsEmitted() {
        Tokenizer splitter = new Tokenizer();
        splitter.process("  hello  world ", emitter);

        verify(emitter).emit("hello");
        verify(emitter).emit("world");
        verifyNoMoreInteractions(emitter);
    }
}

上記のテストは正しい単語が返されることを確認します。

最後に、このクラスを使って入力テキストファイルから読み込んだ行を分割しましょう。


PCollection

インタフェースの

parallelDo

メソッドは、指定された

DoFn

をすべての要素に適用し、新しい

PCollection

を返します。

linesコレクションでこのメソッドを呼び出して、

Tokenizer

のインスタンスを渡しましょう。

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

その結果、入力テキストファイル内の単語のリストが取得されます。次のステップでストップワードを削除します。

6.2. ストップワードを削除

前の手順と同様に、ストップワードを除外するための

StopWordFilter

クラスを作成しましょう。

ただし、

FoFn

ではなく

FilterFn

を拡張します。

FilterFn

には、

accept

という抽象メソッドがあります。このメソッドでフィルタリングロジックを実装する必要があります。

public class StopWordFilter extends FilterFn<String> {

   //English stop words, borrowed from Lucene.
    private static final Set<String> STOP__WORDS = ImmutableSet
      .copyOf(new String[]{ "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
        "or", "s", "such", "t", "that", "the", "their", "then", "there",
        "these", "they", "this", "to", "was", "will", "with" });

    @Override
    public boolean accept(String word) {
        return !STOP__WORDS.contains(word);
    }
}

次に、

StopWordFilter

クラスの単体テストを書きましょう。

public class StopWordFilterUnitTest {

    @Test
    public void givenFilter__whenStopWordPassed__thenFalseReturned() {
        FilterFn<String> filter = new StopWordFilter();

        assertFalse(filter.accept("the"));
        assertFalse(filter.accept("a"));
    }

    @Test
    public void givenFilter__whenNonStopWordPassed__thenTrueReturned() {
        FilterFn<String> filter = new StopWordFilter();

        assertTrue(filter.accept("Hello"));
        assertTrue(filter.accept("World"));
    }

    @Test
    public void givenWordCollection__whenFiltered__thenStopWordsRemoved() {
        PCollection<String> words = MemPipeline
          .collectionOf("This", "is", "a", "test", "sentence");
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        assertEquals(ImmutableList.of("This", "test", "sentence"),
         Lists.newArrayList(noStopWords.materialize()));
    }
}

このテストでは、フィルタリングロジックが正しく実行されたことを確認します。

最後に、

StopWordFilter

を使用して、前の手順で生成された単語のリストをフィルタリングしましょう。

PCollection

インターフェースの

filter

メソッドは、指定された

FilterFn

をすべての要素に適用し、新しい

PCollection

を返します。

単語コレクションに対してこのメ​​ソッドを呼び出し、

StopWordFilter

のインスタンスを渡します。

PCollection<String> noStopWords = words.filter(new StopWordFilter());

その結果、フィルタリングされた単語のコレクションが得られます。

6.3. ユニークな単語を数える

フィルタリングされた単語の集まりを取得したら、各単語が出現する頻度を数えます。 **

PCollection

インターフェースには、一般的な集約を実行するためのメソッドがいくつかあります。


  • min

    – コレクションの最小要素を返します


  • max

    – コレクションの最大要素を返します


  • length

    – コレクション内の要素数を返す


  • count

    – 各一意のカウントを含む

    PTable

    を返します

コレクションの要素


count

メソッドを使用して、ユニークな単語とその数を取得します。

----//The count method applies a series of Crunch primitives and returns//a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
----

7.出力を指定する

前のステップの結果として、単語とその数の表があります。この結果をテキストファイルに書き込みます。 **

Pipeline

インターフェースは出力を書くための便利なメソッドを提供します。

void write(PCollection<?> collection, Target target);

void write(PCollection<?> collection, Target target,
  Target.WriteMode writeMode);

<T> void writeTextFile(PCollection<T> collection, String pathName);

そのため、

writeTextFile

メソッドを呼び出しましょう。

pipeline.writeTextFile(counts, outputPath);

8.パイプライン実行を管理する

これまでのすべてのステップで、データパイプラインが定義されました。入力が読み取られていないか、または処理されていません。これは、Crunchが遅延実行モデルを使用しているためです。

ジョブの計画と実行を制御するメソッドがPipelineインターフェースで呼び出されるまで、MapReduceジョブは実行されません。


  • run

    – 必要な出力を作成するための実行計画を準備します。

それを同期的に実行します
**

done

– 出力を生成するために必要な残りのジョブを実行してから

作成された中間データファイルをクリーンアップします。
**

runAsync

– runメソッドに似ていますが、非ブロッキングで実行されます。

ファッション

したがって、MapReduceジョブとしてパイプラインを実行するために

done

メソッドを呼び出しましょう。

PipelineResult result = pipeline.done();

上記のステートメントはMapReduceジョブを実行して入力を読み取り、それらを処理して結果を出力ディレクトリーに書き込みます。

9.パイプラインをまとめる

これまで、入力データを読み取り、それを処理して出力ファイルに書き込むロジックを開発し、単体テストしました。

次に、それらをまとめてデータパイプライン全体を構築しましょう。

public int run(String[]args) throws Exception {
    String inputPath = args[0];
    String outputPath = args[1];

   //Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

   //Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(inputPath);

   //Define a function that splits each line in a PCollection of Strings into
   //a PCollection made up of the individual words in the file.
   //The second argument sets the serialization format.
    PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

   //Take the collection of words and remove known stop words.
    PCollection<String> noStopWords = words.filter(new StopWordFilter());

   //The count method applies a series of Crunch primitives and returns
   //a map of the unique words in the input PCollection to their counts.
    PTable<String, Long> counts = noStopWords.count();

   //Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, outputPath);

   //Execute the pipeline as a MapReduce.
    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
}

10. Hadoopの起動設定

これでデータパイプラインの準備が整いました。

しかし、それを起動するためのコードが必要です。したがって、アプリケーションを起動するための

main

メソッドを書きましょう。

public class WordCount extends Configured implements Tool {

    public static void main(String[]args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }


  • ToolRunner.run

    は、コマンドラインからHadoop設定を解析してMapReduceジョブを実行します。

11.アプリケーションを実行

完全なアプリケーションはこれで準備が整いました。次のコマンドを実行してビルドしましょう。

mvn package

上記のコマンドの結果として、パッケージ化されたアプリケーションと特別なジョブjarがターゲットディレクトリに配置されます。

Hadoopでアプリケーションを実行するために、このジョブjarを使用しましょう。

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>

アプリケーションは入力ファイルを読み取り、その結果を出力ファイルに書き込みます。出力ファイルには、次のようなユニークな単語とその数が含まれます。

----[Add,1][Added,1][Admiration,1][Admitting,1][Allowance,1]----

Hadoopに加えて、スタンドアロンのアプリケーションとして、または単体テストとして、IDE内でアプリケーションを実行できます。

12.結論

このチュートリアルでは、MapReduce上で動作するデータ処理アプリケーションを作成しました。 Apache Crunchを使えば、JavaでMapReduceパイプラインを簡単に作成、テスト、および実行できます。

いつものように、完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/libraries-data[Githubに追加]を見つけることができます。