Apache Sparkの紹介
1前書き
-
Apache Spark
は、オープンソースのクラスタコンピューティングフレームワーク** です。これは、開発者がHDFS、Cassandra、HBase、S3などを含む多様なデータソースにわたってデータ集約型のさまざまなワークロードを実行することを可能にするScala、Java、Python、およびR用の洗練された開発APIを提供します。
歴史的に、HadoopのMapReduceはいくつかの反復的でインタラクティブなコンピューティング仕事には非効率的であることを証明し、それは結局Sparkの開発につながりました。 ** Sparkを使用すると、Hadoopをメモリに使用する場合よりも最大2桁速く、またはディスクを使用する場合より1桁速くなります。
** 2スパーク建築
Sparkアプリケーションは、下記のhttps://spark.apache.org/docs/latest/cluster-overview.html[diagram]に記載されているように、クラスター上で独立したプロセスのセットとして実行されます。
これらの一連のプロセスは、メインプログラム(ドライバプログラムと呼ばれます)内の
SparkContext
オブジェクトによって調整されます。
SparkContext
は、複数の種類のクラスターマネージャー(Spark独自のスタンドアロンクラスターマネージャー、MesosまたはYARNのいずれか)に接続します。これらは、アプリケーション間でリソースを割り当てます。
いったん接続されると、Sparkはクラスタ内のノード上でエグゼキュータを取得します。これは計算を実行し、アプリケーションのデータを格納するプロセスです。
次に、アプリケーションコード(
SparkContext
に渡されるJARファイルまたはPythonファイルで定義されています)をエグゼキュータに送信します。最後に、
SparkContext
は
を実行するためにexecutorにタスクを送ります。
3コアコンポーネント
次のhttps://intellipaat.com/tutorial/spark-tutorial/apache-spark-components/[diagram]には、Sparkのさまざまなコンポーネントの概要がわかります。
3.1. スパークコア
Spark Coreコンポーネントは、すべての基本的なI/O機能、スパーククラスター上でのジョブのスケジューリングと監視、タスクのディスパッチ、さまざまなストレージシステムとのネットワーク、障害回復、および効率的なメモリ管理について責任を負います。
Hadoopとは異なり、SparkはRDD(Resilient Distributed Datasets)として知られる特別なデータ構造を使用することによって、共有データがAmazon S3やHDFSのような中間ストアに格納されるのを避けます。
レジリエントな分散データセットは不変であり、分割されたレコードの集合であり、並行して操作でき、フォールトトレラントな「メモリ内」計算を可能にします。
RDDは2種類の操作をサポートします。
-
変換 – Spark RDD変換は、
既存のRDDから新しいRDD。
トランスフォーマは入力としてRDDを取り、出力として1つ以上のRDDを生成します
。変換は本質的に怠惰です。つまり、アクションを呼び出すと実行されます。
-
Action
–
変換は互いにRDDを作成しますが、
実際のデータセットで作業したい場合は、その時点でアクションを実行します。したがって、**
Actions
は、RDD以外の値を与えるSpark RDD操作です。
アクションは、Executorからドライバにデータを送信する方法の1つです。
エグゼキュータは、タスクの実行を担当するエージェントです。ドライバはワーカーとタスクの実行を調整するJVMプロセスです。 Sparkのアクションの中には、数え切れないものがあります。
3.2. スパークSQL
Spark SQLは構造化データ処理のためのSparkモジュールです。主にSQLクエリの実行に使用されます。
DataFrame
は、Spark SQLの主な抽象概念です。名前付き列に順序付けられたデータの分散コレクションは、Sparkでは
DataFrame
と呼ばれています。
Spark SQLは、Hive、Avro、Parquet、ORC、JSON、JDBCなどのさまざまなソースからのデータの取得をサポートしています。また、Sparkエンジンを使用して、何千ものノードと数時間のクエリにも対応しています。
3.3. スパークストリーミング
Spark Streamingは、ライブデータストリームのスケーラブルで高スループット、フォールトトレラントなストリーム処理を可能にするコアSpark APIの拡張です。データは、Kafka、Flume、Kinesis、またはTCPソケットなど、さまざまなソースから取り込むことができます。
最後に、処理されたデータをファイルシステム、データベース、およびライブダッシュボードにプッシュすることができます。
3.4. Spark Mlib
MLlibは、Sparkの機械学習(ML)ライブラリです。その目標は、実用的な機械学習をスケーラブルかつ簡単にすることです。高レベルでは、以下のようなツールを提供します。
-
MLアルゴリズム – 分類などの一般的な学習アルゴリズム
回帰、クラスタリング、および協調フィルタリング
** 特徴化 – 特徴抽出、変換、次元数
削減と選択
** パイプライン – MLを構築、評価、調整するためのツール
パイプライン
** 永続性 – アルゴリズム、モデル、およびパイプラインの保存とロード
-
効用 – 線形代数、統計、データ処理など
3.5. スパークグラフX
-
GraphXは、グラフとグラフ並列計算のためのコンポーネントです** 高度に、GraphXは新しいGraph抽象化を導入することによってSpark RDDを拡張します。
グラフの計算をサポートするために、GraphXは一連の基本演算子(
subgraph
、
joinVertices
、および
aggregateMessages
など)を公開します。
さらに、GraphXには、グラフ分析タスクを簡素化するためのグラフアルゴリズムおよびビルダーのコレクションが増えています。
4 Sparkの「Hello World」
コアコンポーネントを理解できたので、次に、単純なMavenベースのSparkプロジェクト –
単語数の計算
について説明します。
ここでは、Sparkがローカルモードで実行され、すべてのコンポーネントがマスターノード、エグゼキュータノード、またはSparkのスタンドアロンクラスタマネージャと同じマシン上でローカルに実行されることを示します。
4.1. Mavenのセットアップ
https://search.maven.org/classic/#search%7C1%7Cg%3A%22org.apache.spark%22%20AND%20a%3A%22spark-core
2.10でJava Mavenプロジェクトを設定しましょう
pom.xml__ファイルの%22[スパーク関連の依存関係]
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core__2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
4.2. ワードカウント – スパークジョブ
文章を含むファイルを処理し、ファイル内に異なる単語とその数を出力するSparkジョブを作成しましょう。
public static void main(String[]args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words
= lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
JavaPairRDD<String, Integer> ones
= words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> counts
= ones.reduceByKey((Integer i1, Integer i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple.__1() + ": " + tuple.__2());
}
ctx.stop();
}
ローカルテキストファイルのパスをSparkジョブへの引数として渡すことに注意してください。
SparkContext
オブジェクトはSparkの主なエントリポイントであり、すでに実行中のSparkクラスターへの接続を表します。アプリケーションの設定を記述するために
SparkConf
オブジェクトを使います。
SparkContext
は、メモリ内のテキストファイルを
JavaRDD
オブジェクトとして読み取るために使用されます。
次に、
flatmap
メソッドを使用して行
JavaRDD
オブジェクトを単語
JavaRDD
オブジェクトに変換し、まず各行をスペース区切りの単語に変換してから、各行処理の出力を平坦化します。
基本的に各出現語を単語のタプルと1のカウントにマッピングする変換操作
mapToPair
を再度適用します。
次に、
reduceByKey
操作を使用して、カウント1の任意の単語の複数回の出現を1つの単語のタプルにグループ化し、その総数を合計します。
最後に、最終結果を得るためにc
_ollect
_
RDDアクションを実行します。
4.3. 実行 – スパークジョブ
Mavenを使用してプロジェクトをビルドして、ターゲットフォルダに
apache-spark-1.0-SNAPSHOT.jar
を生成します。
次に、このWordCountジョブをSparkに送信する必要があります。
${spark-install-dir}/bin/spark-submit --class com.baeldung.WordCount
--master local ${WordCount-MavenProject}/target/apache-spark-1.0-SNAPSHOT.jar
${WordCount-MavenProject}/src/main/resources/spark__example.txt
上記のコマンドを実行する前に、SparkのインストールディレクトリとWordCount Mavenプロジェクトディレクトリを更新する必要があります。
提出時に、いくつかのステップが舞台裏で起こります。
-
ドライバコードから、
SparkContext
はクラスタマネージャに接続します(
このケースでは、スタンドアロンのクラスタマネージャがローカルで実行されています。
。 Cluster Managerは他のアプリケーション間でリソースを割り当てます
-
Sparkはクラスタ内のノード上のエグゼキュータを取得します. ここで、私たちの単語数
アプリケーションは独自のexecutorプロセスを取得します
。アプリケーションコード(jarファイル)がエグゼキュータに送信されます
-
タスクは
SparkContext
によってexecutorに送信されます.
最後に、sparkジョブの結果がドライバに返され、ファイル内の単語数が出力として表示されます。
Hello 1
from 2
Baledung 2
Keep 1
Learning 1
Spark 1
Bye 1
5結論
この記事では、Apache Sparkのアーキテクチャーとさまざまなコンポーネントについて説明しました。また、ファイルから単語数を計算するSparkジョブの実用的な例も示しました。
いつものように、完全なソースコードはhttps://github.com/eugenp/tutorials/tree/master/apache-spark[over on GitHub]から入手可能です。