1. 概要

グローバリゼーションとインターネットを介した物理的な距離の無効化とともに、私たちがますますつながり、デジタルの存在を体現するにつれて、私たちが処理および作成するデータの量は日々ますます増加しています。

MapReduceプログラミングモデル、コンピュータークラスターの並列処理、データを操作するための分散データシステムを提供する Apache Spark フレームワークなど、これらのデータ量を処理するためにいくつかの手法とテクノロジーが実装されています。

このチュートリアルでは、Sparkの1つの基本データ構造であるResilientDistributedDatasetまたはRDDについて説明します。

2. Spark RDD

RDDは、クラスター内のすべてのノードに分割されたレコードのコレクションの不変で復元力のある、分散された表現です。

Sparkプログラミングでは、RDDは基本的なデータ構造です。 データセットとデータフレームはRDDの上に構築されています。

Spark RDDは、データセットがオブジェクトとして表されるAPIを介して提示され、メソッドを使用して、それにロジックを適用できます。 SparkがこのAPIを使用してすべての変換を実行および実行する方法を定義します。

また、この低レベルAPIを使用すると、型の安全性を実現し、データを柔軟に操作できます。

2.1. Sparkアーキテクチャ

Apache Sparkは、マシンのクラスターから最高のパフォーマンスを利用するように設計されています。 アーキテクチャは、ドライバプログラム、クラスタマネージャ、およびワーカーの3つの主要コンポーネントで構成されています。

ドライバープログラムはsparkアプリケーションを調整し、クラスターマネージャーは使用されるすべてのリソースとワーカーを管理し、最後にワーカーがプログラムのタスクを実行します。

2.2. 特徴

RDDデータ構造は、コードを制御および最適化するための原則と機能に基づいています。 それらのいくつかは次のとおりです。

  • 不変性:これは関数型プログラミングの重要な概念であり、並列処理を容易にするという利点があります。 RDDの状態を変更したいときはいつでも、すべての変換が実行された新しいものを作成します。
  • メモリ内計算:Sparkを使用すると、ディスクではなくRAM内のデータを処理できます。 ディスクと比較してRAMを使用すると、ロードと処理のパフォーマンスが向上するためです。
  • フォールトトレラント:このアーキテクチャで最終的に障害が発生した場合でも、システムは正常に動作し続けます。
  • パーティショニング:RDDデータはノード全体に分散され、より適切な計算を実行します。
  • 遅延評価:パフォーマンスに加えて、Spark RDDは遅延評価され、必要なものだけを処理し、今後最適化されます(DataFramesおよびDatasetsにはによって最適化されたクエリプランがありますX215X] Catalyst )。

RDDは、ApacheSparkの最初の構造でした。 さらに、最近の他の構造は、場合によってはより効率的であることが証明されています。 ただし、RDDは非推奨ではなく、一般的に使用されています。

すでに述べたように、DataFramesDatasetsはRDDの上に構築されているため、Sparkのコアです。

また、DataFramesDatasetsは、Catalystによるコードの最適化、Tungstenによるスペース効率、および構造化データについて話すときに最適です。 さらに、ハイエンドAPIは、コーディングと理解が容易です。

ただし、 RDDは依然として適切な選択であり、低レベルAPIを使用してデータセットを柔軟に制御し、DSL(ドメイン固有言語)を使用せずにデータを操作します。

特に、構造化されていないデータを操作する場合、パフォーマンスが優先されることはありません。

2.3. RDDを作成する

この理論的な情報をすべて考慮して、作成しましょう。

コレクションを並列化する方法と、ソースファイルからデータを読み取る方法の2つがあります。

コレクションを並列化するRDDを作成する方法を見てみましょう。

val animals = List("dog", "cat", "frog", "horse")
val animalsRDD = sc.parallelize(animals)

上記の例では、 animalsRDD:RDDがあります。

2番目の方法は、どこかからデータをロードすることです。 見てみましょう:

val data = sc.textFile("data.txt")
// rdd: RDD[String] = data.txt MapPartitionsRDD[1] at textFile...

さらに、いつでも DataFrame / Dataset として読み取り、メソッド.rddを使用してRDDに変換できます。

val dataDF = spark.read.csv("data.csv")
val rdd = df.rdd

rdd: RDD

3. オペレーション

関数はRDDですべての操作を行います。 これらの機能には、データ操作、永続性、相互作用、ロードが含まれます。 例: map() filter() save()

Spark関数が分類するカテゴリには、変換とアクションの2つがあります。

3.1. 変換

変換演算は、指定されたRDD変換を返す関数です。 これらの機能は怠惰です。 つまり、アクションが発生したときにのみ実行されます

つまり、RDDの変更を実行する機能は、必要な場合にのみ実行されます。 たとえば、 map() join() sort() filter()、は変換です。

つまり、この怠惰のために、私たちは不必要に労働者を運転手に送りません。 例:RDDをマップして縮小する場合、ドライバーにとって重要なのは縮小された結果だけなので、マップ結果は送信しません。

map関数の実装を見てみましょう。

val countLengthRDD = animalsRDD.map(animal => (animal, animal.length))

これらのコードを使用して、RDDを調べると、次のようになります。( “dog”、3)、( “car”、3)、( “frog”、4)…

それでは、「c」で始まるすべての動物を削除しましょう。

val noCRDD = animalsRDD.filter(_.startsWith("c"))

RDDを調べると、結果として( “cat”)が得られます。

主に使用される変換には、 map()、flatMap()、filter()、sample()、union()、join()。があります。

副作用のある操作や非関連操作を作成しないことが重要です。

3.2. 行動

一方、アクション操作はデータをドライバーに返します。 また、アクションは、タスク計算を開始するアクション、つまり、すべての変換を実行するアクションです。

RDDの内容を確認する1つの方法は、 collect メソッドを使用することです。これは、アクションであり、 Seq[T]を返します。

countLengthRDD.collect
// res0: Array[String] = Array((dog,3), (cat,3), (frog, 4), (horse, 5))

noCRDD.collect
//res1: Array[String] = Array(cat)

この場合、Array[String]の結果がわかります

もう1つの広く使用されているアクションは、reduceメソッドです。 それがどのように機能するか見てみましょう:

val numbers = sc.parallelize(List(1, 2, 3, 4, 5))
numbers.reduce(_ + _)

上記の例では、結果として15という数字が表示されます。

主に使用される変換には、 collect()、reduce()、save()、count()があります。

3.3. キーペアRDD

1つの特定のタイプのRDDであるキーペアRDDには、いくつかの特別な操作があります。

これらのメソッドは通常、キーごとにデータをグループ化および集約するための分散型の「シャッフル」操作です。

シャッフルは、ノード間でデータを再配布するためのSparkメカニズムです。 Sparkは、ディスク内のデータ操作、データのシリアル化、ネットワークトランスポートなどのコストのかかるタスクを実行して、この再配布を実行します。 また、シャッフルは中間ファイルを作成し、コストとメモリ使用量を増加させます。

シャッフルを明確にするために、結合メソッドを例として取り上げましょう。

val rddOne = sc.parallelize(List((1, "cat"), (2, "dog"), (3, "frog")))
val rddTwo = sc.parallelize(List((1, "mammal"), (3, "amphibian")))

rddOne.join(rddTwo).collect

その結果、 Array((1、(猫、哺乳類))、(3、(カエル、両生類)))が得られます。 Sparkは、この結果を達成するために、各パーティションで計算し、まとめて最終結果を再計算する必要があります。

4. 人事考課

Apache Sparkでは、パフォーマンスが考慮すべき基本的な側面です。

新しいデータ構造DataFrameおよびDatasetは、すべての最適化が適用されているため、より効率的です。 ただし、前に示したように、RDDは依然として多くの場合非常に具体的です。 もう1つの重要な情報は、RDDが他の言語よりもScalaで優れたパフォーマンスを発揮することです。

さらに、Spark RDD APIは、実行プランに関する情報を提供します。 たとえば、数値RDDを考えてみましょう。

val data = List(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)

rdd
  .filter(_ % 2 == 0)
  .map(_ * 2)

RDD系統の詳細については、.toDebugStringメソッドを使用できます。

rdd.toDebugString
res0: String =
(8) MapPartitionsRDD[2] at map at <console>:26 []
 |  MapPartitionsRDD[1] at filter at <console>:26 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

Datasets、で使用する場合、.explainメソッドがあります。

val df = rdd.toDF() 
df 
  .filter($"value" % 2 === 0) 
  .withColumn("value", $"value" * 2) 

df.explain("formatted")

Explain メソッドは、手順とそれぞれの説明を示しています。

== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * SerializeFromObject (2)
      +- Scan (1)

(1) Scan
Output [1]: [obj#1]
Arguments: obj#1: int, ParallelCollectionRDD[0] at parallelize at <console>:27

(2) SerializeFromObject [codegen id : 1]
Input [1]: [obj#1]
Arguments: [input[0, int, false] AS value#2]

(3) Filter [codegen id : 1]
Input [1]: [value#2]
Condition : ((value#2 % 2) = 0)

(4) Project [codegen id : 1]
Output [1]: [(value#2 * 2) AS value#11]
Input [1]: [value#2]

5. 結論

この記事では、Sparkが複数のデータ構造を提示して、そのすべての複雑さと機能を備えたデータ処理を最適化することを確認しました。

要約すると、RDDは非構造化データに最適であり、パフォーマンスは必須ではありません。

機能パターンを操作し、提供されたAPIを介してすべてのデータを制御できるため、柔軟性があり、Sparkアーキテクチャのメリットを享受できます。

結論として、Spark自体とRDDの理論的な説明をいくつか示しました。 私たちはそれを利用するための使用法と最良の事例を学んでいました。

ソースコードは、GitHubから入手できます。