Apache Sparkを使用したSpring Cloudデータフロー

1. 前書き

  • Spring Cloud Data Flowは、データ統合とリアルタイムのデータ処理パイプラインを構築するためのツールキットです。 *

    この場合のパイプラインは、https://cloud.spring.io/spring-cloud-stream/ [Spring Cloud Stream]またはhttps://spring.io/projects/springを使用して構築されたSpring Bootアプリケーションです。 -cloud-task [Spring Cloud Task]フレームワーク。
    このチュートリアルでは、https://www.baeldung.com/apache-spark [Apache Spark]でSpring Cloud Data Flowを使用する方法を示します。

2. データフローローカルサーバー

まず、https://www.baeldung.com/spring-cloud-data-flow-stream-processing [データフローサーバー]を実行して、ジョブを展開できるようにする必要があります。
Data Flow Serverをローカルで実行するには、https://search.maven.org/search?q = spring-cloud-starter-dataflow-server-local [the _spring-cloud-starter-dataflowを使用して新しいプロジェクトを作成する必要があります-server-local_依存関係]:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>
その後、サーバーのメインクラスに_ @ EnableDataFlowServer_の注釈を付ける必要があります。
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}
このアプリケーションを実行すると、ポート9393にローカルデータフローサーバーが作成されます。

3. プロジェクトを作成する

スタンドアロンのローカルアプリケーションとしてlink:/apache-spark[Spark Jobsを作成]を実行します。これにより、クラスターを実行する必要がなくなります。

3.1. 依存関係

まず、https://search.maven.org/artifact/org.apache.spark/spark-core_2.10/2.2.3/jar [Spark依存関係]を追加します。
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2. ジョブを作成する

そして、私たちの仕事のために、piを概算しましょう:
public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. データフローシェル

データフローシェルは、サーバーとの対話を可能にするアプリケーションです*。 シェルはDSLコマンドを使用してデータフローを記述します。
link:/spring-cloud-data-flow-stream-processing [データフローシェルを使用]するには、実行できるプロジェクトを作成する必要があります。 まず、https://search.maven.org/search?q = spring-cloud-dataflow-shell [_spring-cloud-dataflow-shell_依存関係]が必要です。
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>
依存関係を追加したら、データフローシェルを実行するクラスを作成できます。
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5. プロジェクトの展開

プロジェクトをデプロイするには、Apache Sparkで使用可能な3つのバージョン(_cluster _、_ yarn_、および_client_)で使用されるいわゆるタスクランナーを使用します。 ローカル_client_バージョンを続行します。
*タスクランナーは、Sparkジョブを実行するものです。*
これを行うには、まずData Flow Shellを使用してタスクを登録する必要があります*:
app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
このタスクにより、複数の異なるパラメーターを指定できます。それらの一部はオプションですが、Sparkジョブを適切にデプロイするにはいくつかのパラメーターが必要です。
  • spark.app-class、送信されたジョブのメインクラス

  • spark.app-jar、ジョブを含むファットjarへのパス

  • spark.app-name、私たちの仕事に使用される名前

  • spark.app-args、ジョブに渡される引数

    登録されたタスク_spark-client_を使用して、必要なパラメーターを提供することを忘れずにジョブを送信できます。
task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
_spark.app-jar_は、ジョブのfat-jarへのパスであることに注意してください。
タスクが正常に作成されたら、次のコマンドを使用してタスクを実行できます。
task launch spark1
これにより、タスクの実行が呼び出されます。

6. 概要

このチュートリアルでは、Spring Cloud Data Flowフレームワークを使用してApache Sparkでデータを処理する方法を示しました。 Spring Cloud Data Flowフレームワークの詳細については、https://cloud.spring.io/spring-cloud-dataflow/ [ドキュメント]をご覧ください。
すべてのコードサンプルhttps://github.com/eugenp/tutorials/tree/master/spring-cloud-data-flow/apache-spark-job[GitHubにあります。]