1前書き

この記事では、Spring Cloud Appスターター(ブートストラップされたすぐに使えるアプリケーションを提供する)の使用方法を説明します。これは将来の開発の出発点として役立ちます。

簡単に言うと、Task App Starterはデータベースの移行や分散テストなどのユースケース専用で、Stream App Starterは外部システムとの統合を提供します。

全体として、55以上のスターターがあります。公式ドキュメントhttps://cloud.spring.io/spring-cloud-task-app-starters/[ここ]およびhttps://cloud.spring.io/spring-cloud-stream-app-starters/[こちら]をご覧ください。これら2つの詳細についてはここ。

次に、Twitterの投稿をHadoop Distributed File Systemにストリーミングする小さな分散Twitterアプリケーションを作成します。

2.セットアップを始める

シンプルなTwitterアプリケーションを作成するには、

consumer-key



access-token

を使用します。

それから、今後のビッグデータの目的のためにTwitterストリームを永続化できるようにHadoopを設定します。

最後に、提供されているSpring GitHubリポジトリを使用して、Mavenを使用して

sources



processor-sinks

アーキテクチャパターンのスタンドアロンコンポーネントをコンパイルおよびアセンブルするか、またはSpring Streamバインディングインタフェースを介して

sources



processors

、および

sinks

を組み合わせることができます。

これを行うには両方の方法を検討します。

注目すべきは、以前はすべてのStream App Starterがhttps://github.com/spring-cloud/spring-cloud-stream-app-starter/tree/master/hdfs/spring-cloud-で1つの大きなリポジトリにまとめられていたことです。 starter-stream-sink-hdfs[github.com/spring-cloud/spring-cloud-stream-app-starters]。

各スターターは単純化され分離されました。


3 Twitterのクレデンシャル

まず、Twitterデベロッパーの認証情報を設定しましょう。 Twitter開発者の資格情報を取得するには、手順に従ってアプリを設定し、アクセストークンhttps://apps.twitter.com/[Twitter開発者公式文書]を作成します。

具体的には、次のものが必要です。

  1. 消費者キー

  2. 消費者鍵の秘密

  3. アクセストークンの秘密

  4. アクセストークン

以下のウィンドウを使用するので、ウィンドウを開いたままにしておくか、閉じないでください。

4. Hadoopのインストール

次に、Hadoopをインストールしましょう!

公式ドキュメント

をフォローするか、単にDockerを利用することができます。

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5.アプリスターターのコンパイル

完全に独立した独立したコンポーネントを使用するために、GitHubリポジトリから希望するSpring Cloud Stream App Starterを個別にダウンロードしてコンパイルすることができます。


5.1. Twitter Spring Cloud Streamアプリスターター

Twitter Spring Cloud Streamアプリスターター(

org.springframework.cloud.stream.app.twitterstream.source

)をプロジェクトに追加しましょう。

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

次に、Mavenを実行します。

./mvnw clean install -PgenerateApps

結果のコンパイル済みスターターアプリは、ローカルプロジェクトルートの「/target」で利用可能になります。

それからコンパイルした.jarを実行して、関連するアプリケーションプロパティを次のように渡します。

java -jar twitter__stream__source.jar --consumerKey=<CONSUMER__KEY> --consumerSecret=<CONSUMER__SECRET> \
    --accessToken=<ACCESS__TOKEN> --accessTokenSecret=<ACCESS__TOKEN__SECRET>

身近なSpring

application.properties:

を使って私たちの資格を渡すこともできます

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...


5.2. HDFS Spring Cloud Streamアプリスターター

(Hadoopが既に設定されている)では、HDFS Spring Cloud Streamアプリスターター(

org.springframework.cloud.stream.app.hdfs.sink

)依存関係を私たちのプロジェクトに追加しましょう。

まず、関連レポをクローンします。

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

次に、Mavenのジョブを実行します。

./mvnw clean install -PgenerateApps

結果のコンパイル済みスターターアプリは、ローカルプロジェクトルートの「/target」で利用可能になります。その後、コンパイルした.jarを実行して、関連するアプリケーションプロパティを渡します。

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/----

「__hdfs://127.0.0.1:50010/__」はHadoopのデフォルトですが、デフォルトのHDFSポートはインスタンスの設定方法によって異なる場合があります。

前に渡した設定を使用して、データノード(およびその現在のポート)の一覧を '__http://0.0.0.0:50070__'に表示できます。

コンパイル前に、おなじみのSpring __application.properties__を使用して資格情報を渡すこともできます。そのため、常にCLI経由でこれらを渡す必要はありません。

__application.properties__を設定しましょう

[source,text,gutter:,true]

hdfs.fs-uri=hdfs://127.0.0.1:50010/—-


6.

AggregateApplicationBuilder


を使用する

あるいは、

org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder

を通じて、Spring Streamの

Source



Sink

を単純なSpring Bootアプリケーションに統合することもできます。

まず、2つのStream App Starterをpom.xmlに追加します。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>1.3.1.BUILD-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>1.3.1.BUILD-SNAPSHOT</version>
    </dependency>
</dependencies>

それから、2つのStream App Starter依存関係をそれぞれのサブアプリケーションにラップすることによって結合します。


6.1. アプリコンポーネントの構築


SourceApp

は、変換または消費される

Source

を指定します。

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}


SourceApp



org.springframework.cloud.stream.messaging.Source

にバインドし、適切な設定クラスを挿入して、環境プロパティから必要な設定を取得します。

次に、簡単な

org.springframework.cloud.stream.messaging.Processor

binding ____を設定します。

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

次に、コンシューマ(

Sink

)を作成します。

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

ここでは、

SinkApp



org.springframework.cloud.stream.messaging.Sink

にバインドし、指定したHadoop設定を使用するために正しい設定クラスを再度挿入します。

最後に、

AggregateApp

mainメソッドで

AggregateApplicationBuilder

を使用して、

SourceApp



ProcessorApp

、および

SinkApp

を結合します。

@SpringBootApplication
public class AggregateApp {
    public static void main(String[]args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

他のSpring Bootアプリケーションと同様に、__application.propertiesを通じて、またはプログラムによって、指定された設定を環境プロパティとしてインジェクトでき​​ます。

Spring Streamフレームワークを使用しているので、引数を

AggregateApplicationBuilder

コンストラクタに渡すこともできます。


6.2. 完成したアプリケーションの実行

その後、次のコマンドラインの指示に従ってアプリケーションをコンパイルして実行できます。

    $ mvn install
    $ java -jar twitterhdfs.jar



@ SpringBootApplication

クラスを別々のパッケージに入れておくことを忘れないでください(そうでなければ、いくつかの異なるバインディング例外がスローされます)。

AggregateApplicationBuilder

の使用方法の詳細については、https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

programming

model.html#__aggregation[official docs]をご覧ください。

アプリをコンパイルして実行すると、コンソールに次のようなものが表示されます(当然、内容はTweetによって異なります)。

2018-01-15 04:38:32.255  INFO 28778 ---[itterSource-1-1]
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 ---[itterSource-1-1]
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created__at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id__str":
"952761898239385601","text":"RT @mighty__jimin: 180114 ...

これらは、

Source

!からデータを受信したときの

Processor



Sink

の正しい動作を示しています。この例では、HDFSシンクを設定していないので、「ペイロードを受信しました」というメッセージが表示されるだけです。


7. 結論

このチュートリアルでは、2つのすばらしいSpring Stream App Starterを1つのすばらしいSpring Bootの例に組み合わせる方法を学びました。

リンクに関するその他の優れた公式記事があります。

いつものように、この記事で使われているコードはhttps://github.com/eugenp/tutorials/tree/master/spring-cloud/spring-cloud-stream[over on GitHub]にあります。