1. 序章

Apache Pulsarは、Yahooで開発された分散型のオープンソースのパブリケーション/サブスクリプションベースのメッセージングシステムです。

これは、Yahoo Mail、Yahoo Finance、YahooSportsなどのYahooの重要なアプリケーションを強化するために作成されました。 その後、2016年に、ApacheSoftwareFoundationの下でオープンソース化されました。

2. 建築

Pulsarは、サーバー間メッセージング用のマルチテナントの高性能ソリューションです。 ブローカーとブックメーカーのセットと、構成および管理用の組み込みの ApacheZooKeeperで構成されています。 ブックメーカーはApacheBookKeeper からのもので、メッセージが消費されるまでメッセージを保存します。

クラスターでは、次のようになります。

  • プロデューサーからの着信メッセージを処理し、メッセージをコンシューマーにディスパッチする複数のクラスターブローカー
  • メッセージの永続性をサポートするApacheBookKeeper
  • クラスタ構成を保存するApacheZooKeeper

これをよりよく理解するために、ドキュメントのアーキテクチャ図を見てみましょう。

3. 主な機能

主な機能のいくつかを簡単に見てみましょう。

  • 複数のクラスターの組み込みサポート
  • 複数のクラスターにわたるメッセージのジオレプリケーションのサポート
  • 複数のサブスクリプションモード
  • 何百万ものトピックにスケーラブル
  • ApacheBookKeeperを使用してメッセージ配信を保証します。
  • 低遅延

それでは、いくつかの重要な機能について詳しく説明しましょう。

3.1. メッセージングモデル

フレームワークは、柔軟なメッセージングモデルを提供します。 一般に、メッセージングアーキテクチャには2つのメッセージングモデルがあります。 キューイングとパブリッシャー/サブスクライバー。 パブリッシャー/サブスクライバーは、メッセージがすべてのコンシューマーに送信されるブロードキャストメッセージングシステムです。 一方、キューイングはポイントツーポイント通信です。

Pulsarは、両方の概念を1つの一般化されたAPIに組み合わせています。 パブリッシャーは、メッセージをさまざまなトピックに公開します。 次に、これらのメッセージはすべてのサブスクリプションにブロードキャストされます。

消費者はメッセージを受け取るためにサブスクライブします。 ライブラリを使用すると、コンシューマーは、排他的、共有、フェイルオーバーなど、同じサブスクリプションでメッセージを消費するさまざまな方法を選択できます。 これらのサブスクリプションタイプについては、後のセクションで詳しく説明します。

3.2. 展開モード

Pulsarには、さまざまな環境での展開のサポートが組み込まれています。 これは、標準のオンプレミスマシンで使用することも、Kubernetesクラスター、Google、またはAWSクラウドにデプロイすることもできることを意味します。

開発およびテストの目的で、単一ノードとして実行できます。 この場合、すべてのコンポーネント(ブローカー、BookKeeper、およびZooKeeper)は単一のプロセスで実行されます。

3.3. ジオレプリケーション

ライブラリは、データの地理的複製をすぐにサポートします。異なる地理的領域を構成することにより、複数のクラスター間でメッセージの複製を有効にできます。

メッセージデータはほぼリアルタイムで複製されます。 クラスタ間でネットワーク障害が発生した場合、データは常に安全で、BookKeeperに保存されます。 レプリケーションシステムは、レプリケーションが成功するまで再試行を続けます。

ジオレプリケーション機能により、組織はさまざまなクラウドプロバイダーにPulsarをデプロイし、データをレプリケートすることもできます。 これにより、独自のクラウドプロバイダーAPIの使用を回避できます。

3.4. 永続

Pulsarがデータを読み取って確認した後、データが失われないことを保証します。 データの耐久性は、データを保存するために構成されたディスクの数に関連しています。

Pulsarは、ストレージノードで実行されているbookies(Apache BookKeeperインスタンス)を使用して耐久性を確保します。 ブックメーカーはメッセージを受信するたびに、コピーをメモリに保存し、データをWAL(ログ先行書き込み)に書き込みます。 このログは、データベースWALと同じように機能します。 Bookiesはデータベーストランザクションの原則に基づいて動作し、マシンに障害が発生した場合でもデータが失われないようにします。

上記とは別に、Pulsarは複数のノードの障害にも耐えることができます。 ライブラリはデータを複数のブックに複製してから、確認メッセージをプロデューサーに送信します。 このメカニズムにより、複数のハードウェア障害が発生した場合でもデータ損失がゼロになることが保証されます。

4. シングルノードセットアップ

それでは、ApachePulsarのシングルノードクラスターをセットアップする方法を見てみましょう。

Apacheは、Java、Python、およびC++のバインディングを備えたシンプルなクライアントAPIも提供します。 後で、簡単なJavaプロデューサーとサブスクリプションの例を作成します。

4.1. インストール

Apache Pulsarは、バイナリディストリビューションとして利用できます。 それをダウンロードすることから始めましょう:

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz

ダウンロードが完了すると、zipファイルのアーカイブを解除できます。 アーカイブされていないディストリビューションには、 bin、conf、例、licensesおよびlibフォルダーが含まれます。

その後、組み込みのコネクタをダウンロードする必要があります。 これらは現在、個別のパッケージとして出荷されています。

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz

コネクタのアーカイブを解除し、コネクタフォルダをPulsarフォルダにコピーしてみましょう。

4.2. インスタンスの開始

スタンドアロンインスタンスを開始するには、次のコマンドを実行できます。

bin/pulsar standalone

5. Javaクライアント

次に、メッセージを生成および消費するJavaプロジェクトを作成します。 また、さまざまなサブスクリプションタイプの例を作成します。

5.1. プロジェクトの設定

プロジェクトにpulsar-client依存関係を追加することから始めます。

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.1.1-incubating</version>
</dependency>

5.2. プロデューサー

Producerの例を作成して続けましょう。 ここでは、トピックとプロデューサーを作成します。

最初に、独自のプロトコルを使用して特定のホストとポート上のPulsarサービスに接続するPulsarClientを作成する必要があります。 多くのプロデューサーとコンシューマーは、単一のクライアントオブジェクトを共有できます。

次に、特定のトピック名でProducerを作成します。

private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
  .serviceUrl(SERVICE_URL)
  .build();

Producer<byte[]> producer = client.newProducer()
  .topic(TOPIC_NAME)
  .compressionType(CompressionType.LZ4)
  .create();

プロデューサーは5つのメッセージを送信します。

IntStream.range(1, 5).forEach(i -> {
    String content = String.format("hi-pulsar-%d", i);

    Message<byte[]> msg = MessageBuilder.create()
      .setContent(content.getBytes())
      .build();
    MessageId msgId = producer.send(msg);
});

5.3. 消費者

次に、プロデューサーによって作成されたメッセージを取得するためのコンシューマーを作成します。 コンシューマーは、サーバーに接続するために同じPulserClientも必要とします。

Consumer<byte[]> consumer = client.newConsumer()
  .topic(TOPIC_NAME)
  .subscriptionType(SubscriptionType.Shared)
  .subscriptionName(SUBSCRIPTION_NAME)
  .subscribe();

ここでは、クライアントを作成しました共有サブスクリプションタイプ 。 これにより、複数のコンシューマーが同じサブスクリプションに接続してメッセージを取得できます。

5.4. 消費者向けのサブスクリプションタイプ

上記の消費者の例では、 共有タイプのサブスクリプションを作成しました。 排他的サブスクリプションとフェイルオーバーサブスクリプションを作成することもできます。

排他的サブスクリプションでは、1人のコンシューマーのみをサブスクリプションできます。

一方、af ailover Subscription を使用すると、次のApacheダイアグラムに示すように、1つのコンシューマーに障害が発生した場合に、ユーザーがフォールバックコンシューマーを定義できます。

6. 結論

この記事では、メッセージングモデル、ジオレプリケーション、強力な耐久性の保証など、Pulsarメッセージングシステムの機能に焦点を当てました。

また、単一ノードのセットアップ方法とJavaクライアントの使用方法も学びました。

いつものように、このチュートリアルの完全な実装は、Githubにあります。