1. 概要

このチュートリアルでは、 Apache Storm 分散型リアルタイム計算システムの概要を説明します。

以下に焦点を当てて説明します。

  • Apache Stormとは正確には何であり、どのような問題を解決しますか
  • そのアーキテクチャ、および
  • プロジェクトでの使用方法

2. Apache Stormとは何ですか?

Apache Stormは、リアルタイム計算用の無料のオープンソース分散システムです。

フォールトトレランスとスケーラビリティを提供し、データ処理を保証します。特に、無制限のデータストリームの処理に優れています。 

Stormのいくつかの良い使用例は、クレジットカード操作を処理して不正を検出したり、スマートホームからのデータを処理して障害のあるセンサーを検出したりすることです。

Stormを使用すると、市場で入手可能なさまざまなデータベースやキューイングシステムと統合できます。

3. Mavenの依存関係

Apache Stormを使用する前に、プロジェクトにストームコア依存関係を含める必要があります。

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

Stormクラスターでアプリケーションを実行する場合は、が提供するスコープのみを使用する必要があります。

アプリケーションをローカルで実行するには、ローカルプロセスでStormクラスターをシミュレートするいわゆるローカルモードを使用できます。そのような場合は、提供されているを削除する必要があります。

4. データ・モデル

Apache Stormのデータモデルは、タプルとストリームの2つの要素で構成されています。

4.1. タプル

Tuple は、動的型を持つ名前付きフィールドの順序付きリストです。これは、フィールドの型を明示的に宣言する必要がないことを意味します。

Stormは、タプルで使用されるすべての値をシリアル化する方法を知る必要があります。 デフォルトでは、プリミティブ型、Stringsおよびbyte配列をすでにシリアル化できます。

また、StormはKryoシリアル化を使用するため、カスタムタイプを使用するには、Configを使用してシリアライザーを登録する必要があります。 これは、次の2つの方法のいずれかで実行できます。

まず、フルネームを使用してシリアル化するクラスを登録できます。

Config config = new Config();
config.registerSerialization(User.class);

このような場合、Kryoはを使用してクラスをシリアル化します FieldSerializer。 デフォルトでは、これにより、プライベートとパブリックの両方で、クラスのすべての非一時フィールドがシリアル化されます。

または、代わりに、シリアル化するクラスと、Stormがそのクラスに使用するシリアライザーの両方を提供できます。

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

カスタムシリアライザーを作成するには、でジェネリッククラス Serializerを拡張する必要があります。このクラスにはwritereadの2つのメソッドがあります。

4.2. ストリーム

Stream は、Stormエコシステムのコア抽象化です。 ストリームはタプルの無制限のシーケンスです。

Stormsを使用すると、複数のストリームを並行して処理できます。

すべてのストリームには、宣言時に提供および割り当てられるIDがあります。

5. トポロジー

リアルタイムStormアプリケーションのロジックは、トポロジーにパッケージ化されています。 トポロジは、スパウトボルトで構成されています。

5.1. 注ぎ口

注ぎ口は小川の源です。 それらはトポロジーにタプルを放出します。

タプルは、Kafka、Kestrel、ActiveMQなどのさまざまな外部システムから読み取ることができます。

注ぎ口は、信頼できるまたは信頼できないにすることができます。 Reliable は、ストームによる処理に失敗したタプルに注ぎ口が応答できることを意味します。 信頼性が低いは、ファイアアンドフォーゲットメカニズムを使用してタプルを放出するため、注ぎ口が応答しないことを意味します。

カスタムスパウトを作成するには、 IRichSpout インターフェイスを実装するか、インターフェイスをすでに実装しているクラス(たとえば、抽象 BaseRichSpout クラス)を拡張する必要があります。

信頼性の低いスパウトを作成しましょう。

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

カスタムRandomIntSpoutは、ランダムな整数とタイムスタンプを毎秒生成します。

5.2. ボルト

ボルトはストリーム内のタプルを処理します。フィルタリング、集計、カスタム関数などのさまざまな操作を実行できます。

いくつかの操作は複数のステップを必要とするので、そのような場合には複数のボルトを使用する必要があります。

カスタムBoltを作成するには、 IRichBolt を実装するか、より簡単な操作IBasicBoltインターフェイスを実装する必要があります。

実装に利用できる複数のヘルパークラスもありますボルト。 この場合、使用します BaseBasicBolt

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

このカスタムPrintingBoltは、すべてのタプルをコンソールに出力するだけです。

6. 単純なトポロジの作成

これらのアイデアを単純なトポロジーにまとめましょう。 このトポロジには、1つの注ぎ口と3つのボルトがあります。

6.1.  RandomNumberSpout

最初に、信頼性の低い注ぎ口を作成します。 毎秒(0,100)の範囲からランダムな整数を生成します。

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2.  FilteringBolt

次に、operationが0に等しいすべての要素を除外するボルトを作成します。

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3.  AggregatingBolt

次に、より複雑なボルトを作成して、毎日のすべてのポジティブな操作を集約しましょう。

この目的のために、単一のタプルではなくウィンドウで動作するボルトを実装するために特別に作成された特定のクラスBaseWindowedBoltを使用します。

Windows は、ストリーム処理の重要な概念であり、無限のストリームを有限のチャンクに分割します。 次に、各チャンクに計算を適用できます。 通常、ウィンドウには2つのタイプがあります。

タイムウィンドウは、タイムスタンプを使用して特定の期間の要素をグループ化するために使用されます。 時間枠には、異なる数の要素が含まれる場合があります。

カウントウィンドウは、定義されたサイズのウィンドウを作成するために使用されます。 このような場合、すべてのウィンドウのサイズは同じになり、定義されたサイズよりも要素が少ない場合、ウィンドウは放出されません。

AggregatingBolt は、タイムウィンドウからのすべての正の操作の合計と、その開始タイムスタンプと終了タイムスタンプを生成します。

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

この場合、リストの最初の要素を直接取得するのが安全であることに注意してください。 これは、各ウィンドウがタプルのタイムスタンプフィールドを使用して計算されるため、したがって、各ウィンドウに少なくとも1つの要素が必要です。

6.4.  FileWritingBolt

最後に、 sumOfOperations が2000より大きいすべての要素を取得し、それらをシリアル化してファイルに書き込むボルトを作成します。

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

これがトポロジの最後のボルトになるため、出力を宣言する必要がないことに注意してください

6.5. トポロジの実行

最後に、すべてをまとめてトポロジを実行できます。

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

トポロジ内の各部分を介してデータフローを作成するには、それらを接続する方法を示す必要があります。 shuffleGroup を使用すると、filteringBoltのデータがrandomNumberSpoutから取得されることを指定できます。

ボルトごとに、このボルトの要素のソースを定義するshuffleGroupを追加する必要があります。 要素のソースは、 注ぎ口または別のボルト。 また、同じソースを複数のボルトに設定した場合 ソースはそれらのそれぞれにすべての要素を放出します。

この場合、トポロジはLocalClusterを使用してジョブをローカルで実行します。

7. 結論

このチュートリアルでは、分散型リアルタイム計算システムであるApacheStormを紹介しました。 注ぎ口といくつかのボルトを作成し、それらをまとめて完全なトポロジにしました。

そして、いつものように、すべてのコードサンプルはGitHubにあります。