1. 概要

このチュートリアルでは、SparkでRDDDataFrameに変換する方法を学習します。 異なるパラメーターを使用して各メソッドを呼び出すことにより、詳細を調べます。

途中で、概念をよりよく理解するのに役立ついくつかの興味深い例を見ていきます。

2. SparkのRDDとDataFrame

RDDとDataFrameは、データを保持および処理するためのSparkの2つの主要なAPIです。 RDDは、分散データを処理するための低レベルAPIを提供します。 一方、 DataFrameは、SQLメソッドをサポートする高レベルのAPIを提供します。

私たちのプログラムでは、RDDをDataFrameに、またはその逆に変換する必要があることがよくあります。 このチュートリアルでは、RDDをDataFrameに変換することに焦点を当てています。 これを実現するには2つの方法があり、それぞれについて詳しく説明します。

まず、一連のタプルを使用してSparkContextとRDDを作成しましょう。

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = spark.sparkContext
val rdd = sc.parallelize(
  Seq(
    ("John", "Manager", 38),
    ("Mary", "Director", 45),
    ("Sally", "Engineer", 30)
  )
)

3行を含む新しいRDDを作成しました。 次のセクションのすべての例でこれを使用します。

3. createDataFrameメソッドを使用して変換する

SparkSessionオブジェクトには、DataFrameを作成するためのユーティリティメソッド– createDataFrame。があります。このメソッドは、RDDを取得し、そこからDataFrameを作成できます。 createDataFrameはオーバーロードされたメソッドであり、RDDを単独で、またはスキーマとともに渡すことでメソッドを呼び出すことができます。

スキーマを提供せずに、持っているRDDを変換してみましょう。

val dfWitDefaultSchema = spark.createDataFrame(rdd)

それでは、新しく作成したDataFrameのスキーマを調べてみましょう。

dfWitDefaultSchema.printSchema()
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: integer (nullable = false)

列名は、デフォルトのテンプレートに基づくデフォルトの名前のシーケンスに従っていることがわかります。 デフォルトのスキーマは型推論に基づいて作成されますが、常に正しい必要はありません。

型の安全性と制御を向上させるために、事前定義されたスキーマオブジェクトを使用してDataFrameを作成することを常にお勧めします。 オーバーロードされたメソッドcreateDataFrameは、2番目のパラメーターとしてスキーマを取りますが、タイプRowのRDDのみを受け入れるようになりました。 したがって、最初のRDDをRowタイプのRDDに変換します。

val rowRDD:RDD[Row] = rdd.map(t => Row(t._1, t._2, t._3))

次に、スキーマオブジェクトが必要なので、作成しましょう。

val schema = new StructType()
  .add(StructField("Name", StringType, false))
  .add(StructField("Job", StringType, true))
  .add(StructField("Age", IntegerType, true))

追加のschemaパラメーターを使用して、メソッドをもう一度呼び出しましょう。

val dfWithSchema:DataFrame = spark.createDataFrame(rowRDD, schema)

スキーマ情報をもう一度印刷します。

dfWithSchema.printSchema()
|-- Name: string (nullable = false)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = true)

列に適切な名前が付けられ、データ型が正しく定義されていることがわかります。

4. toDF ()暗黙的メソッドを使用した変換

RDDをDataFrameに変換するもう1つの一般的な方法は、 .toDF()暗黙的メソッドを使用することです。 開始する前に、SparkSessionから暗黙をインポートする必要があります。

import spark.implicits._

これで、RDDを変換する準備が整いました。 ただし、このメソッドは、選択したタイプのRDD(Int、Long、String、または scala.Product。のサブクラス)に対してのみ機能します。のシーケンスを使用してRDDを作成しました。 X178X]タプル。 インポートした暗黙的なメソッドを使用して変換してみましょう。

val dfUsingToDFMethod = rdd.toDF("Name", "Job", "Age")

新しいDataFrameのスキーマを調べてみましょう。

dfUsingToDFMethod.printSchema()
|-- Name: string (nullable = true)
|-- Job: string (nullable = true)
|-- Age: integer (nullable = false)

5. 結論

このチュートリアルでは、RDDをDataFrameに変換するさまざまな方法を学びました。 それぞれの方法の詳細と、それぞれが取るパラメーターの種類を学びました。 RowタイプのRDDを変換するためにcreateDataFrameメソッドを使用できます。 その他の場合は、 toDF()暗黙的メソッドを使用できます。

いつものように、この記事で使用されている完全なコードは、GitHubから入手できます。