SparkDataFrame
1. 概要
Apache Spark は、大規模なデータエンジニアリングとデータサイエンスを可能にするオープンソースの分散型分析および処理システムです。 データ転送、大規模な変換、および配布のための統合APIを提供することにより、分析指向のアプリケーションの開発を簡素化します。
DataFrame は、SparkAPIの重要かつ不可欠なコンポーネントです。 このチュートリアルでは、簡単な顧客データの例を使用して、Spark DataFrameAPIのいくつかを調べます。
2. SparkのDataFrame
論理的には、 DataFrameは、名前付き列に編成された不変のレコードセットです。 これは、RDBMSのテーブルまたはJavaのResultSetと類似点を共有します。
APIとして、 DataFrame は、 Spark SQL、Spark Streaming、MLib、GraphXなどの複数のSparkライブラリへの統合アクセスを提供します。
Javaでは、 データセット
基本的に、 Row は、 Tungsten と呼ばれる効率的なストレージを使用します。これは、の前身と比較してSpark操作を高度に最適化します。
3. Mavenの依存関係
spark-coreおよびspark-sqlの依存関係をpom.xmlに追加することから始めましょう。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
</dependency>
4. DataFrameとスキーマ
基本的に、DataFrameはスキーマを持つRDDです。 スキーマは、StructTypeとして推測または定義できます。
StructTypeは、StructFieldオブジェクトのコレクションを表すために使用するSparkSQLの組み込みデータ型です。
サンプルのCustomerスキーマStructTypeを定義しましょう。
public static StructType minimumCustomerDataSchema() {
return DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("gender", DataTypes.StringType, true),
DataTypes.createStructField("transaction_amount", DataTypes.IntegerType, true) }
);
}
ここで、各 StructField には、 DataFrame 列の名前、タイプ、およびnull許容かどうかを表すboolean値を表す名前があります。
5. DataFramesの構築
すべてのSparkアプリケーションの最初の操作は、マスターを介してSparkSessionを取得することです。
DataFramesにアクセスするためのエントリポイントを提供します。 SparkSessionを作成することから始めましょう。
public static SparkSession getSparkSession() {
return SparkSession.builder()
.appName("Customer Aggregation pipeline")
.master("local")
.getOrCreate();
}
ここでは、ローカルマスターを使用してSparkに接続していることに注意してください。 クラスタに接続する場合は、代わりにクラスタアドレスを指定します。
SparkSession を取得したら、さまざまなメソッドを使用してDataFrameを作成できます。 それらのいくつかを簡単に見てみましょう。
5.1. DataFrame からリスト
構築しましょうリスト
List<Customer> customers = Arrays.asList(
aCustomerWith("01", "jo", "Female", 2000),
aCustomerWith("02", "jack", "Male", 1200)
);
次に、構築しましょう DataFrame からリスト
Dataset<Row> df = SPARK_SESSION
.createDataFrame(customerList, Customer.class);
5.2. DataFrame from Dataset
データセットがある場合、データセットで toDF を呼び出すことにより、データフレームに簡単に変換できます。
を作成しましょうデータセット
Dataset<Customer> customerPOJODataSet = SPARK_SESSION
.createDataset(CUSTOMERS, Encoders.bean(Customer.class));
次に、それをDataFrameに変換しましょう。
Dataset<Row> df = customerPOJODataSet.toDF();
5.3. RowFactoryを使用したPOJOからのRow
以来 DataFrame 本質的にデータセット
基本的に、実装することによって MapFunction
public class CustomerToRowMapper implements MapFunction<Customer, Row> {
@Override
public Row call(Customer customer) throws Exception {
Row row = RowFactory.create(
customer.getId(),
customer.getName().toUpperCase(),
StringUtils.substring(customer.getGender(),0, 1),
customer.getTransaction_amount()
);
return row;
}
}
ここでCustomerデータを操作してから、Rowに変換できることに注意してください。
5.4. DataFrame からリスト|
RowオブジェクトのリストからDataFrameを作成することもできます。
List<Row> rows = customer.stream()
.map(c -> new CustomerToRowMapper().call(c))
.collect(Collectors.toList());
さて、これをあげましょうリスト
Dataset<Row> df = SparkDriver.getSparkSession()
.createDataFrame(rows, SchemaFactory.minimumCustomerDataSchema());
ここで注意してくださいリスト
5.5。構造化ファイルおよびデータベースからのDataFrame
DataFrames は、CSVファイルのような列情報、およびJSONファイルのようなネストされたフィールドと配列を格納できます。
DataFrame APIは、CSVファイル、JSONファイル、その他の形式、およびデータベースを使用しているかどうかに関係なく、同じままです。
複数行のJSONデータからDataFrameを作成しましょう。
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/minCustomerData.json");
同様に、データベースから読み取る場合は、次のようになります。
Dataset<Row> df = SparkDriver.getSparkSession()
.read()
.option("url", "jdbc:postgresql://localhost:5432/customerdb")
.option("dbtable", "customer")
.option("user", "user")
.option("password", "password")
.option("serverTimezone", "EST")
.format("jdbc")
.load();
6. DataFrameをDatasetに変換しています
それでは、DataFrameをDatasetに変換する方法を見てみましょう。 この変換は、DataFrameにのみ適用される既存のPOJOと拡張APIを操作する場合に役立ちます。
前のセクションでJSONから作成されたDataFrameを続行します。
の各行を取得するマッパー関数を呼び出しましょうデータセット
Dataset<Customer> ds = df.map(
new CustomerMapper(),
Encoders.bean(Customer.class)
);
ここでは、 CustomerMapper 実装 MapFunction
public class CustomerMapper implements MapFunction<Row, Customer> {
@Override
public Customer call(Row row) {
Customer customer = new Customer();
customer.setId(row.getAs("id"));
customer.setName(row.getAs("name"));
customer.setGender(row.getAs("gender"));
customer.setTransaction_amount(Math.toIntExact(row.getAs("transaction_amount")));
return customer;
}
}
注意する必要があります MapFunction
7. DataFrame操作と変換
それでは、顧客データの例を使用して簡単なパイプラインを構築しましょう。 2つの異なるファイルソースからDataFrames として顧客データを取り込み、それらを正規化してから、データに対していくつかの変換を実行します。
最後に、変換されたデータをデータベースに書き込みます。
これらの変革の目的は、性別と出所順に並べられた年間支出を見つけることです。
7.1. データの取り込み
まず、JSONデータから始まるSparkSessionのreadメソッドを使用して、いくつかのソースからデータを取り込みましょう。
Dataset<Row> jsonDataToDF = SPARK_SESSION.read()
.format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
.option("multiline", true)
.load("data/customerData.json");
それでは、CSVソースでも同じことをしましょう。
Dataset<Row> csvDataToDF = SPARK_SESSION.read()
.format("csv")
.option("header", "true")
.schema(SchemaFactory.customerSchema())
.option("dateFormat", "m/d/YYYY")
.load("data/customerData.csv");
csvDataToDF.show();
csvDataToDF.printSchema();
return csvData;
重要なのは、このCSVデータを読み取るために、列のデータ型を決定するStructTypeスキーマを提供していることです。
データを取り込んだら、showメソッドを使用してDataFrameの内容を検査できます。
さらに、でサイズを指定することで行を制限することもできます見せる方法 。 そして、私たちは使用することができます printSchema 新しく作成されたスキーマを検査します
2つのスキーマにはいくつかの違いがあることに気付くでしょう。 したがって、変換を実行する前に、スキーマを正規化する必要があります。
7.2. DataFramesの正規化
次に、CSVおよびJSONデータを表す生のDataFramesを正規化します。
ここで、実行された変換のいくつかを見てみましょう。
private Dataset<Row> normalizeCustomerDataFromEbay(Dataset<Row> rawDataset) {
Dataset<Row> transformedDF = rawDataset
.withColumn("id", concat(rawDataset.col("zoneId"),lit("-"), rawDataset.col("customerId")))
.drop(column("customerId"))
.withColumn("source", lit("ebay"))
.withColumn("city", rawDataset.col("contact.customer_city"))
.drop(column("contact"))
.drop(column("zoneId"))
.withColumn("year", functions.year(col("transaction_date")))
.drop("transaction_date")
.withColumn("firstName", functions.split(column("name"), " ")
.getItem(0))
.withColumn("lastName", functions.split(column("name"), " ")
.getItem(1))
.drop(column("name"));
return transformedDF;
}
上記の例のDataFrameに対するいくつかの重要な操作は次のとおりです。
- c oncat は、複数の列とリテラルからのデータを結合して、新しいid列を作成します
- lit 静的関数は、リテラル値を持つ列を返します
機能。 年 から年を抽出するには transactionDate- function.split は、nameをfirstname列とlastname列に分割します
- drop メソッドは、データフレームの列を削除します
- col メソッドは、その名前に基づいてデータセットの列を返します
- withColumnRenamed は、値が変更された列を返します
最終的に、両方のデータフレームは以下のように同じスキーマに正規化されます。
root
|-- gender: string (nullable = true)
|-- transaction_amount: long (nullable = true)
|-- id: string (nullable = true)
|-- source: string (nullable = false)
|-- city: string (nullable = true)
|-- year: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- lastName: string (nullable = true)
7.3. DataFramesを組み合わせる
次に、正規化されたDataFramesを組み合わせてみましょう。
Dataset<Row> combineDataframes(Dataset<Row> df1, Dataset<Row> df2) {
return df1.unionByName(df2);
}
重要なのは、次の点に注意する必要があります。
- 2つのDataFramesを結合するときに列名を気にする場合は、unionByNameを使用する必要があります。
- 2つのDataFramesを結合するときに列名を気にしない場合は、unionを使用する必要があります。
7.4. Aggregating DataFrames
次に、組み合わせた DataFrames をグループ化して、年、ソース、性別ごとの年間支出を調べましょう。
次に、集計データを列 year の昇順、および年間使用量の降順で並べ替えます。
Dataset<Row> aggDF = dataset
.groupBy(column("year"), column("source"), column("gender"))
.sum("transactionAmount")
.withColumnRenamed("sum(transaction_amount)", "yearly spent")
.orderBy(col("year").asc(), col("yearly spent").desc());
上記の例のDataFrameに対するいくつかの重要な操作は次のとおりです。
- groupBy は、 DataFrame で同一のデータをグループに配置し、 SQL“ GROUP BY”句と同様の集計関数を実行するために使用されます。
- sum は、グループ化後に列transactionAmountに集計関数を適用します
- orderBy は、DataFrameを1つ以上の列で並べ替えます
- 列クラスのascおよびdesc関数を使用して、並べ替え順序を指定できます
最後に、 show メソッドを使用して、変換後のデータフレームがどのようになるかを確認しましょう。
+----+------+------+---------------+
|year|source|gender|annual_spending|
+----+------+------+---------------+
|2018|amazon| Male| 10600|
|2018|amazon|Female| 6200|
|2018| ebay| Male| 5500|
|2021| ebay|Female| 16000|
|2021| ebay| Male| 13500|
|2021|amazon| Male| 4000|
|2021|amazon|Female| 2000|
+----+------+------+---------------+
したがって、最終変換後のスキーマは次のようになります。
root
|-- source: string (nullable = false)
|-- gender: string (nullable = true)
|-- year: integer (nullable = true)
|-- yearly spent: long (nullable = true)
7.5. DataFrameからリレーショナルデータベースへの書き込み
最後に、変換されたDataFrameをリレーショナルデータベースのテーブルとして記述して終了しましょう。
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
次に、Sparkセッションを使用してデータベースに書き込むことができます。
String connectionURL = dbProperties.getProperty("connectionURL");
dataset.write()
.mode(SaveMode.Overwrite)
.jdbc(connectionURL, "customer", dbProperties);
8. テスト
これで、postgresとpgAdmin Dockerイメージを使用して、2つの取り込みソースを使用してパイプラインをエンドツーエンドでテストできます。
@Test
void givenCSVAndJSON_whenRun_thenStoresAggregatedDataFrameInDB() throws Exception {
Properties dbProps = new Properties();
dbProps.setProperty("connectionURL", "jdbc:postgresql://localhost:5432/customerdb");
dbProps.setProperty("driver", "org.postgresql.Driver");
dbProps.setProperty("user", "postgres");
dbProps.setProperty("password", "postgres");
pipeline = new CustomerDataAggregationPipeline(dbProps);
pipeline.run();
String allCustomersSql = "Select count(*) from customer";
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(allCustomersSql);
resultSet.next();
int count = resultSet.getInt(1);
assertEquals(7, count);
}
これを実行した後、DataFrameに対応する列と行を持つテーブルが存在することを確認できます。 最後に、pgAdmin4クライアントを介してこの出力を確認することもできます。
ここでいくつかの重要な点に注意する必要があります。
- customer テーブルは、write操作の結果として自動的に作成されます。
- 使用されるモードはSaveMode.Overwrite。です。したがって、これにより、テーブルにすでに存在するものがすべて上書きされます。 使用可能な他のオプションは、 Append 、 Ignore 、およびErrorIfExistsです。
さらに、 write を使用して、DataFrameデータをCSV、 JSON 、またはparquetとしてエクスポートすることもできます。他の形式の中で。
9. 結論
このチュートリアルでは、DataFramesを使用してApacheSparkでデータ操作と集計を実行する方法について説明しました。
まず、さまざまな入力ソースからDataFramesを作成しました。 次に、いくつかのAPIメソッドを使用して、データを正規化し、結合してから集計しました。
最後に、DataFrameをリレーショナルデータベースのテーブルとしてエクスポートしました。
いつものように、完全なソースコードはGitHubでから入手できます。