1. 概要

Apache Spark は、大規模なデータエンジニアリングとデータサイエンスを可能にするオープンソースの分散型分析および処理システムです。 データ転送、大規模な変換、および配布のための統合APIを提供することにより、分析指向のアプリケーションの開発を簡素化します。

DataFrame は、SparkAPIの重要かつ不可欠なコンポーネントです。 このチュートリアルでは、簡単な顧客データの例を使用して、Spark DataFrameAPIのいくつかを調べます。

2. SparkのDataFrame

論理的には、 DataFrameは、名前付き列に編成された不変のレコードセットです。 これは、RDBMSのテーブルまたはJavaのResultSetと類似点を共有します。

APIとして、 DataFrame は、 Spark SQL、Spark Streaming、MLib、GraphXなどの複数のSparkライブラリへの統合アクセスを提供します。

Javaでは、 データセットを表すために DataFrame

基本的に、 Row は、 Tungsten と呼ばれる効率的なストレージを使用します。これは、の前身と比較してSpark操作を高度に最適化します。

3. Mavenの依存関係

spark-coreおよびspark-sqlの依存関係をpom.xmlに追加することから始めましょう。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.8</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.8</version>
</dependency>

4. DataFrameとスキーマ

基本的に、DataFrameはスキーマを持つRDDです。 スキーマは、StructTypeとして推測または定義できます。

StructTypeは、StructFieldオブジェクトのコレクションを表すために使用するSparkSQLの組み込みデータ型です

サンプルのCustomerスキーマStructTypeを定義しましょう。

public static StructType minimumCustomerDataSchema() {
    return DataTypes.createStructType(new StructField[] {
      DataTypes.createStructField("id", DataTypes.StringType, true),
      DataTypes.createStructField("name", DataTypes.StringType, true),
      DataTypes.createStructField("gender", DataTypes.StringType, true),
      DataTypes.createStructField("transaction_amount", DataTypes.IntegerType, true) }
    );
}

ここで、各 StructField には、 DataFrame 列の名前、タイプ、およびnull許容かどうかを表すboolean値を表す名前があります。

5. DataFramesの構築

すべてのSparkアプリケーションの最初の操作は、マスターを介してSparkSessionを取得することです。

DataFramesにアクセスするためのエントリポイントを提供します。 SparkSessionを作成することから始めましょう。

public static SparkSession getSparkSession() {
    return SparkSession.builder()
      .appName("Customer Aggregation pipeline")
      .master("local")
      .getOrCreate();
}

ここでは、ローカルマスターを使用してSparkに接続していることに注意してください。 クラスタに接続する場合は、代わりにクラスタアドレスを指定します。

SparkSession を取得したら、さまざまなメソッドを使用してDataFrameを作成できます。 それらのいくつかを簡単に見てみましょう。

5.1. DataFrame からリスト

構築しましょうリスト最初:

List<Customer> customers = Arrays.asList(
  aCustomerWith("01", "jo", "Female", 2000), 
  aCustomerWith("02", "jack", "Male", 1200)
);

次に、構築しましょう DataFrame からリストを使用して createDataFrame

Dataset<Row> df = SPARK_SESSION
  .createDataFrame(customerList, Customer.class);

5.2. DataFrame from Dataset

データセットがある場合、データセット toDF を呼び出すことにより、データフレームに簡単に変換できます。

を作成しましょうデータセットまず、 createDataset 、それはかかります org.apache.spark.sql.Encoders

Dataset<Customer> customerPOJODataSet = SPARK_SESSION
  .createDataset(CUSTOMERS, Encoders.bean(Customer.class));

次に、それをDataFrameに変換しましょう。

Dataset<Row> df = customerPOJODataSet.toDF();

5.3. RowFactoryを使用したPOJOからのRow

以来 DataFrame 本質的にデータセット 、作成する方法を見てみましょうからお客様 POJO。

基本的に、実装することによって MapFunction とオーバーライド電話メソッド、それぞれをマッピングできますお客様を使用して RowFactory.create

public class CustomerToRowMapper implements MapFunction<Customer, Row> {
    
    @Override
    public Row call(Customer customer) throws Exception {
        Row row = RowFactory.create(
          customer.getId(),
          customer.getName().toUpperCase(),
          StringUtils.substring(customer.getGender(),0, 1),
          customer.getTransaction_amount()
        );
        return row;
    }
}

ここでCustomerデータを操作してから、Rowに変換できることに注意してください。

5.4. DataFrame からリスト

RowオブジェクトのリストからDataFrameを作成することもできます。

List<Row> rows = customer.stream()
  .map(c -> new CustomerToRowMapper().call(c))
  .collect(Collectors.toList());

さて、これをあげましょうリスト SparkSession 一緒に StructType スキーマ:

Dataset<Row> df = SparkDriver.getSparkSession()
  .createDataFrame(rows, SchemaFactory.minimumCustomerDataSchema());

ここで注意してくださいリストスキーマ定義に基づいてDataFrameに変換されます 。 スキーマに存在しないフィールドは、DataFrame。の一部にはなりません。

5.5。構造化ファイルおよびデータベースからのDataFrame

DataFrames は、CSVファイルのような列情報、およびJSONファイルのようなネストされたフィールドと配列を格納できます。

DataFrame APIは、CSVファイル、JSONファイル、その他の形式、およびデータベースを使用しているかどうかに関係なく、同じままです。

複数行のJSONデータからDataFrameを作成しましょう。

Dataset<Row> df = SparkDriver.getSparkSession()
  .read()
  .format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
  .option("multiline", true)
  .load("data/minCustomerData.json");

同様に、データベースから読み取る場合は、次のようになります。

Dataset<Row> df = SparkDriver.getSparkSession()
  .read()
  .option("url", "jdbc:postgresql://localhost:5432/customerdb")
  .option("dbtable", "customer")
  .option("user", "user")
  .option("password", "password")
  .option("serverTimezone", "EST")
  .format("jdbc")
  .load();

6. DataFrameDatasetに変換しています

それでは、DataFrameDatasetに変換する方法を見てみましょう。 この変換は、DataFrameにのみ適用される既存のPOJOと拡張APIを操作する場合に役立ちます。

前のセクションでJSONから作成されたDataFrameを続行します。

の各行を取得するマッパー関数を呼び出しましょうデータセットそしてそれをに変換しますお客様物体:

Dataset<Customer> ds = df.map(
  new CustomerMapper(),
  Encoders.bean(Customer.class)
);

ここでは、 CustomerMapper 実装 MapFunction

public class CustomerMapper implements MapFunction<Row, Customer> {

    @Override
    public Customer call(Row row) {
        Customer customer = new Customer();
        customer.setId(row.getAs("id"));
        customer.setName(row.getAs("name"));
        customer.setGender(row.getAs("gender"));
        customer.setTransaction_amount(Math.toIntExact(row.getAs("transaction_amount")));
        return customer;
    }
}

注意する必要があります MapFunction 処理する必要のあるレコードの数に関係なく、インスタンス化されるのは1回だけです

7. DataFrame操作と変換

それでは、顧客データの例を使用して簡単なパイプラインを構築しましょう。 2つの異なるファイルソースからDataFrames として顧客データを取り込み、それらを正規化してから、データに対していくつかの変換を実行します。

最後に、変換されたデータをデータベースに書き込みます。

これらの変革の目的は、性別と出所順に並べられた年間支出を見つけることです。

7.1. データの取り込み

まず、JSONデータから始まるSparkSessionreadメソッドを使用して、いくつかのソースからデータを取り込みましょう。

Dataset<Row> jsonDataToDF = SPARK_SESSION.read()
  .format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
  .option("multiline", true)
  .load("data/customerData.json");

それでは、CSVソースでも同じことをしましょう。

Dataset<Row> csvDataToDF = SPARK_SESSION.read()
  .format("csv")
  .option("header", "true")
  .schema(SchemaFactory.customerSchema())
  .option("dateFormat", "m/d/YYYY")
  .load("data/customerData.csv"); 

csvDataToDF.show(); 
csvDataToDF.printSchema(); 
return csvData;

重要なのは、このCSVデータを読み取るために、列のデータ型を決定するStructTypeスキーマを提供していることです。

データを取り込んだら、showメソッドを使用してDataFrameの内容を検査できます。

さらに、でサイズを指定することで行を制限することもできます見せる方法 そして、私たちは使用することができます printSchema 新しく作成されたスキーマを検査します DataFrames。

2つのスキーマにはいくつかの違いがあることに気付くでしょう。 したがって、変換を実行する前に、スキーマを正規化する必要があります。

7.2. DataFramesの正規化

次に、CSVおよびJSONデータを表す生のDataFramesを正規化します。

ここで、実行された変換のいくつかを見てみましょう。

private Dataset<Row> normalizeCustomerDataFromEbay(Dataset<Row> rawDataset) {
    Dataset<Row> transformedDF = rawDataset
      .withColumn("id", concat(rawDataset.col("zoneId"),lit("-"), rawDataset.col("customerId")))
      .drop(column("customerId"))
      .withColumn("source", lit("ebay"))
      .withColumn("city", rawDataset.col("contact.customer_city"))
      .drop(column("contact"))
      .drop(column("zoneId"))
      .withColumn("year", functions.year(col("transaction_date")))
      .drop("transaction_date")
      .withColumn("firstName", functions.split(column("name"), " ")
        .getItem(0))
      .withColumn("lastName", functions.split(column("name"), " ")
        .getItem(1))
      .drop(column("name"));

    return transformedDF; 
}

上記の例のDataFrameに対するいくつかの重要な操作は次のとおりです。

  • c oncat は、複数の列とリテラルからのデータを結合して、新しいid列を作成します
  • lit 静的関数は、リテラル値を持つ列を返します
  • 機能。 年から年を抽出するには transactionDate
  • function.split は、namefirstname列とlastname列に分割します
  • drop メソッドは、データフレームの列を削除します
  • col メソッドは、その名前に基づいてデータセットの列を返します
  • withColumnRenamed は、値が変更された列を返します

重要なのは、DataFrameが不変であることがわかります。 したがって、何かを変更する必要があるときはいつでも、新しいものを作成する必要があります DataFrame

最終的に、両方のデータフレームは以下のように同じスキーマに正規化されます。

root
 |-- gender: string (nullable = true)
 |-- transaction_amount: long (nullable = true)
 |-- id: string (nullable = true)
 |-- source: string (nullable = false)
 |-- city: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)

7.3. DataFramesを組み合わせる

次に、正規化されたDataFramesを組み合わせてみましょう。

Dataset<Row> combineDataframes(Dataset<Row> df1, Dataset<Row> df2) {
    return df1.unionByName(df2); 
}

重要なのは、次の点に注意する必要があります。

  • 2つのDataFramesを結合するときに列名を気にする場合は、unionByNameを使用する必要があります。
  • 2つのDataFramesを結合するときに列名を気にしない場合は、unionを使用する必要があります。

7.4. Aggregating DataFrames

次に、組み合わせた DataFrames をグループ化して、年、ソース、性別ごとの年間支出を調べましょう。

次に、集計データを列 year の昇順、および年間使用量の降順で並べ替えます。

Dataset<Row> aggDF = dataset
  .groupBy(column("year"), column("source"), column("gender"))
  .sum("transactionAmount")
  .withColumnRenamed("sum(transaction_amount)", "yearly spent")
  .orderBy(col("year").asc(), col("yearly spent").desc());

上記の例のDataFrameに対するいくつかの重要な操作は次のとおりです。

  • groupBy は、 DataFrame で同一のデータをグループに配置し、 SQL“ GROUP BY”句と同様の集計関数を実行するために使用されます。
  • sum は、グループ化後に列transactionAmountに集計関数を適用します
  • orderBy は、DataFrameを1つ以上の列で並べ替えます
  • クラスのascおよびdesc関数を使用して、並べ替え順序を指定できます

最後に、 show メソッドを使用して、変換後のデータフレームがどのようになるかを確認しましょう。

+----+------+------+---------------+
|year|source|gender|annual_spending|
+----+------+------+---------------+
|2018|amazon|  Male|          10600|
|2018|amazon|Female|           6200|
|2018|  ebay|  Male|           5500|
|2021|  ebay|Female|          16000|
|2021|  ebay|  Male|          13500|
|2021|amazon|  Male|           4000|
|2021|amazon|Female|           2000|
+----+------+------+---------------+

したがって、最終変換後のスキーマは次のようになります。

root
 |-- source: string (nullable = false)
 |-- gender: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- yearly spent: long (nullable = true)

7.5. DataFrameからリレーショナルデータベースへの書き込み

最後に、変換されたDataFrameをリレーショナルデータベースのテーブルとして記述して終了しましょう。

Properties dbProps = new Properties();

dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");

次に、Sparkセッションを使用してデータベースに書き込むことができます。

String connectionURL = dbProperties.getProperty("connectionURL");

dataset.write()
  .mode(SaveMode.Overwrite)
  .jdbc(connectionURL, "customer", dbProperties);

8. テスト

これで、postgrespgAdmin Dockerイメージを使用して、2つの取り込みソースを使用してパイプラインをエンドツーエンドでテストできます。

@Test
void givenCSVAndJSON_whenRun_thenStoresAggregatedDataFrameInDB() throws Exception {
    Properties dbProps = new Properties();
    dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
    dbProps.setProperty("driver", "org.postgresql.Driver");
    dbProps.setProperty("user", "postgres");
    dbProps.setProperty("password", "postgres");

    pipeline = new CustomerDataAggregationPipeline(dbProps);
    pipeline.run();

    String allCustomersSql = "Select count(*) from customer";

    Statement statement = conn.createStatement();
    ResultSet resultSet = statement.executeQuery(allCustomersSql);
    resultSet.next();
    int count = resultSet.getInt(1);
    assertEquals(7, count);
}

これを実行した後、DataFrameに対応する列と行を持つテーブルが存在することを確認できます。 最後に、pgAdmin4クライアントを介してこの出力を確認することもできます。

ここでいくつかの重要な点に注意する必要があります。

  • customer テーブルは、write操作の結果として自動的に作成されます。
  • 使用されるモードはSaveMode.Overwrite。です。したがって、これにより、テーブルにすでに存在するものがすべて上書きされます。 使用可能な他のオプションは、 Append Ignore 、およびErrorIfExistsです。

さらに、 write を使用して、DataFrameデータをCSV JSON 、またはparquetとしてエクスポートすることもできます。他の形式の中で。

9. 結論

このチュートリアルでは、DataFramesを使用してApacheSparkでデータ操作と集計を実行する方法について説明しました。

まず、さまざまな入力ソースからDataFramesを作成しました。 次に、いくつかのAPIメソッドを使用して、データを正規化し、結合してから集計しました。

最後に、DataFrameをリレーショナルデータベースのテーブルとしてエクスポートしました。

いつものように、完全なソースコードはGitHubから入手できます。