1. 序章

この記事では、Spring Cloudアプリスターターの使用方法を示します。これは、ブートストラップされたすぐに使用できるアプリケーションを提供し、将来の開発の開始点として機能します。

簡単に言うと、Task App Starterはデータベースの移行や分散テストなどのユースケース専用であり、StreamAppStarterは外部システムとの統合を提供します。

全体として、55を超えるスターターがいます。 これら2つの詳細については、公式ドキュメントhereおよびhereを確認してください。

次に、Twitterの投稿をHadoop分散ファイルシステムにストリーミングする小さな分散Twitterアプリケーションを構築します。

2. セットアップの取得

コンシューマーキーアクセストークンを使用して、簡単なTwitterアプリを作成します。

次に、Hadoopをセットアップして、将来のビッグデータの目的でTwitterストリームを永続化できるようにします。

最後に、提供されているSpring GitHubリポジトリを使用して、Mavenを使用してソースプロセッサ-シンクアーキテクチャパターンのスタンドアロンコンポーネントをコンパイルおよびアセンブルするか、ソース[ X232X]、プロセッサ、およびシンクは、SpringStreamバインディングインターフェイスを介して実行されます。

これを行うための両方の方法を見ていきます。

以前は、すべてのStreamAppStarterがgithub.com/spring -cloud / spring-cloud-stream-app-startersで1つの大きなリポジトリにまとめられていたことは注目に値します。 各スターターは簡素化され、分離されています。

3. Twitterのクレデンシャル

まず、Twitter開発者の資格情報を設定しましょう。 Twitter開発者の資格情報を取得するには、手順に従ってアプリを設定し、公式のTwitter開発者向けドキュメントからアクセストークンを作成します。

具体的には、次のものが必要です。

  1. コンシューマーキー
  2. 消費者の鍵の秘密
  3. アクセストークンシークレット
  4. アクセストークン

以下のものを使用するので、必ずそのウィンドウを開いたままにするか、それらを書き留めてください。

4. Hadoopのインストール

次に、Hadoopをインストールしましょう! 公式ドキュメントに従うか、Dockerを活用することができます。

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. アプリスターターのコンパイル

独立した完全に個別のコンポーネントを使用するために、GitHubリポジトリから目的のSpringCloudStreamアプリスターターを個別にダウンロードしてコンパイルできます。

5.1. Twitter SpringCloudStreamアプリスターター

Twitter Spring Cloud Stream App Starter(org。springframework.cloud.stream.app.twitterstream.source )をプロジェクトに追加しましょう。

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

次に、Mavenを実行します。

./mvnw clean install -PgenerateApps

結果としてコンパイルされたスターターアプリは、ローカルプロジェクトルートの「/target」で利用可能になります。

次に、コンパイルされた.jarを実行し、次のように関連するアプリケーションプロパティを渡すことができます。

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

おなじみのSpringapplication.properties:を使用してクレデンシャルを渡すこともできます

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFSSpringクラウドストリームアプリスターター

ここで(Hadoopがすでに設定されている状態で)、HDFS Spring Cloud Stream App Starter(org。springframework.cloud.stream.app.hdfs.sink )依存関係をプロジェクトに追加しましょう。

まず、関連するリポジトリのクローンを作成します。

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

次に、Mavenジョブを実行します。

./mvnw clean install -PgenerateApps

結果としてコンパイルされたスターターアプリは、ローカルプロジェクトルートの「/target」で利用可能になります。 次に、コンパイルされた.jarを実行して、関連するアプリケーションプロパティを渡すことができます。

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010 / ‘はHadoopのデフォルトですが、デフォルトのHDFSポートはインスタンスの構成方法によって異なる場合があります。

データノード(およびそれらの現在のポート)のリストは、以前に渡した構成を前提として、「http://0.0.0.0:50070」で確認できます。

コンパイル前に使い慣れたSpringapplication.properties を使用してクレデンシャルを渡すこともできるため、CLIを介してこれらを常に渡す必要はありません。

デフォルトのHadoopポートを使用するようにapplication.properties を構成しましょう:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. AggregateApplicationBuilderの使用

または、SpringStreamソースシンクorg.springframework.cloud.stream.aggregate.AggregateApplicationBuilderを介して単純なSpringBootアプリケーションに結合することもできます。

まず、2つのStreamAppStarterをpom.xmlに追加します。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

次に、2つのStream App Starter依存関係を、それぞれのサブアプリケーションにラップすることで結合し始めます。

6.1. アプリコンポーネントの構築

SourceApp は、変換または消費されるSourceを指定します。

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

SourceApporg.springframework.cloud.stream.messaging.Sourceにバインドし、適切な構成クラスを挿入して、環境プロパティから必要な設定を取得することに注意してください。

次に、単純な org.springframework.cloud.stream.messaging.Processor binding をセットアップします。

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

次に、コンシューマー(シンク)を作成します。

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

ここでは、SinkApporg.springframework.cloud.stream.messaging.Sinkにバインドし、指定されたHadoop設定を使用するために正しい構成クラスを再度挿入します。

最後に、 AggregateAppメインメソッドでAggregateApplicationBuilder を使用して、 SourceApp ProcessorApp 、およびSinkAppを組み合わせます。 :

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

他のSpringBootアプリケーションと同様に、プログラムでapplication.propertiesまたはを介して環境プロパティとして指定された設定を挿入できます。

Spring Streamフレームワークを使用しているため、引数をAggregateApplicationBuilderコンストラクターに渡すこともできます。

6.2. 完成したアプリの実行

次に、次のコマンドライン命令を使用してアプリケーションをコンパイルして実行できます。

    $ mvn install
    $ java -jar twitterhdfs.jar

@SpringBootApplicationクラスを個別のパッケージに保持することを忘れないでください(そうしないと、いくつかの異なるバインディング例外がスローされます)。 AggregateApplicationBuilder の使用方法の詳細については、公式ドキュメントを参照してください。

アプリをコンパイルして実行すると、コンソールに次のようなものが表示されます(当然、内容はツイートによって異なります)。

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

これらは、ソースからデータを受信する際のプロセッサおよびシンクの正しい動作を示しています。 この例では、HDFSシンクが多くのことを実行するように構成していません。「ペイロードを受信しました!」というメッセージを出力するだけです。

7. 結論

このチュートリアルでは、2つの素晴らしいSpringストリームアプリスターターを1つの甘いSpring Bootの例に組み合わせる方法を学びました。

スプリングブートスターターカスタマイズスターターの作成方法に関するその他の優れた公式記事をいくつか紹介します。

いつものように、この記事で使用されているコードは、GitHubにあります。