1. 概要

この記事は、 RedisJavaクライアントであるLettuceの紹介です。

Redisは、データベース、キャッシュ、またはメッセージブローカーとして使用できるメモリ内のKey-Valueストアです。 データは、Redisのメモリ内データ構造のキーを操作するコマンドを使用して、追加、クエリ、変更、および削除されます。

Lettuceは、データ構造、pub / subメッセージング、高可用性サーバー接続など、完全なRedisAPIの同期通信と非同期通信の両方の使用をサポートしています。

2. なぜレタス?

以前の投稿の1つでジェダイについて説明しました。レタスの違いは何ですか?

最も重要な違いは、Java8のCompletionStageインターフェイスを介した非同期サポートとリアクティブストリームのサポートです。 以下で説明するように、Lettuceは、Redisデータベースサーバーから非同期リクエストを作成し、ストリームを作成するための自然なインターフェイスを提供します。

また、サーバーとの通信にNettyを使用します。 これにより、「より重い」APIが作成されますが、複数のスレッドとの接続の共有にも適しています。

3. 設定

3.1. 依存

pom.xmlで必要な唯一の依存関係を宣言することから始めましょう。

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.0.1.RELEASE</version>
</dependency>

ライブラリの最新バージョンは、GithubリポジトリまたはMavenCentralで確認できます。

3.2. Redisのインストール

クラスタリングまたはセンチネルモードをテストする場合は、少なくとも1つのRedisインスタンスをインストールして実行する必要があります(センチネルモードが正しく機能するには3つのサーバーが必要です)。この記事では、4.0.xを使用しています。現時点での最新の安定バージョン。

LinuxおよびMacOSのダウンロードなど、Redisの使用を開始するための詳細については、こちらを参照してください。

Redisは正式にはWindowsをサポートしていませんが、サーバーのポートここがあります。 Docker でRedisを実行することもできます。これは、Windows 10のより優れた代替手段であり、起動して実行するための高速な方法です。

4. 接続

4.1. サーバーへの接続

Redisへの接続は、次の4つのステップで構成されます。

  1. RedisURIの作成
  2. URIを使用してRedisClientに接続する
  3. Redis接続を開く
  4. RedisCommandsのセットを生成する

実装を見てみましょう:

RedisClient redisClient = RedisClient
  .create("redis://password@localhost:6379/");
StatefulRedisConnection<String, String> connection
 = redisClient.connect();

StatefulRedisConnectionはそのように聞こえます。 サーバーへの接続を維持し、必要に応じて再接続するRedisサーバーへのスレッドセーフ接続。 接続が確立されると、それを使用して、同期的または非同期的にRedisコマンドを実行できます。

RedisClient は、Redisサーバーと通信するためのNettyリソースを保持しているため、かなりのシステムリソースを使用します。 複数の接続を必要とするアプリケーションは、単一のRedisClient。を使用する必要があります

4.2. URIを再利用する

静的ファクトリメソッドにURIを渡すことにより、RedisClientを作成します。

Lettuceは、RedisURIのカスタム構文を利用します。 これはスキーマです:

redis :// [password@] host [: port] [/ database]
  [? [timeout=timeout[d|h|m|s|ms|us|ns]]
  [&_database=database_]]

4つのURIスキームがあります。

  • redis –スタンドアロンのRedisサーバー
  • rediss –SSL接続を介したスタンドアロンのRedisサーバー
  • redis-socket –Unixドメインソケットを介したスタンドアロンのRedisサーバー
  • redis-sentinel –RedisSentinelサーバー

Redisデータベースインスタンスは、URLパスの一部として、または追加のパラメーターとして指定できます。 両方が指定されている場合、パラメーターの優先順位が高くなります。

上記の例では、String表現を使用しています。 レタスには、接続を構築するためのRedisURIクラスもあります。 Builderパターンを提供します。

RedisURI.Builder
  .redis("localhost", 6379).auth("password")
  .database(1).build();

そしてコンストラクター:

new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

4.3. 同期コマンド

Jedisと同様に、Lettuceはメソッドの形式で完全なRedisコマンドセットを提供します。

ただし、Lettuceは同期バージョンと非同期バージョンの両方を実装しています。 同期バージョンを簡単に見てから、チュートリアルの残りの部分では非同期実装を使用します。

接続を作成したら、それを使用してコマンドセットを作成します。

RedisCommands<String, String> syncCommands = connection.sync();

これで、Redisと通信するための直感的なインターフェイスができました。

文字列値を設定および取得できます:

syncCommands.set("key", "Hello, Redis!");

String value = syncommands.get(“key”);

ハッシュを使用できます。

syncCommands.hset("recordName", "FirstName", "John");
syncCommands.hset("recordName", "LastName", "Smith");
Map<String, String> record = syncCommands.hgetall("recordName");

この記事の後半で、Redisについて詳しく説明します。

Lettuce同期APIは非同期APIを使用します。 ブロッキングはコマンドレベルで行われます。 これは、複数のクライアントが同期接続を共有できることを意味します。

4.4. 非同期コマンド

非同期コマンドを見てみましょう。

RedisAsyncCommands<String, String> asyncCommands = connection.async();

同期セットを取得したのと同様に、接続からRedisAsyncCommandsのセットを取得します。 これらのコマンドは、 RedisFuture (内部的には CompleteableFuture を返します。

RedisFuture<String> result = asyncCommands.get("key");

CompleteableFuture の操作ガイドは、ここにあります。

4.5. リアクティブAPI

最後に、ノンブロッキングリアクティブAPIを使用する方法を見てみましょう。

RedisStringReactiveCommands<String, String> reactiveCommands = connection.reactive();

これらのコマンドは、ProjectReactorからのMonoまたはFluxでラップされた結果を返します。

ProjectReactorの操作ガイドはここにあります。

5. Redisデータ構造

上記の文字列とハッシュについて簡単に説明しました。LettuceがRedisの残りのデータ構造をどのように実装するかを見てみましょう。 予想どおり、各Redisコマンドには同様の名前のメソッドがあります。

5.1. リスト

リストは、挿入順序が保持された文字列のリストです。値は、どちらかの端から挿入または取得されます。

asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
RedisFuture<String> redisFuture = asyncCommands.rpop("tasks");

String nextTask = redisFuture.get();

この例では、nextTaskは「firstTask」と同じです。 Lpush は値をリストの先頭にプッシュし、rpopはリストの最後から値をポップします。

もう一方の端から要素をポップすることもできます。

asyncCommands.del("tasks");
asyncCommands.lpush("tasks", "firstTask");
asyncCommands.lpush("tasks", "secondTask");
redisFuture = asyncCommands.lpop("tasks");

String nextTask = redisFuture.get();

del でリストを削除することから、2番目の例を開始します。 次に、同じ値を再度挿入しますが、 lpop を使用してリストの先頭から値をポップするため、nextTaskは「secondTask」テキストを保持します。

5.2. セット

Redisセットは、Javaセットと同様の文字列の順序付けられていないコレクションです。 重複する要素はありません:

asyncCommands.sadd("pets", "dog");
asyncCommands.sadd("pets", "cat");
asyncCommands.sadd("pets", "cat");
 
RedisFuture<Set<String>> pets = asyncCommands.smembers("nicknames");
RedisFuture<Boolean> exists = asyncCommands.sismember("pets", "dog");

RedisセットをSetとして取得すると、重複する「cat」が無視されたため、サイズは2になります。 sismemberを使用して「dog」の存在をRedisに照会すると、応答はtrueです。

5.3. ハッシュ

以前、ハッシュの例を簡単に見てきました。 それらは簡単に説明する価値があります。

Redisハッシュは、文字列フィールドと値を持つレコードです。各レコードには、プライマリインデックスにもキーがあります。

asyncCommands.hset("recordName", "FirstName", "John");
asyncCommands.hset("recordName", "LastName", "Smith");

RedisFuture<String> lastName 
  = syncCommands.hget("recordName", "LastName");
RedisFuture<Map<String, String>> record 
  = syncCommands.hgetall("recordName");

hset を使用してフィールドをハッシュに追加し、ハッシュの名前、フィールドの名前、および値を渡します。

次に、 hget、レコードの名前とフィールドを使用して、個々の値を取得します。 最後に、hgetall。を使用してレコード全体をハッシュとしてフェッチします。

5.4. ソートされたセット

並べ替えられたセットには、値とそれらが並べ替えられるランクが含まれます。ランクは64ビット浮動小数点値です。

アイテムはランク付きで追加され、次の範囲で取得されます。

asyncCommands.zadd("sortedset", 1, "one");
asyncCommands.zadd("sortedset", 4, "zero");
asyncCommands.zadd("sortedset", 2, "two");

RedisFuture<List<String>> valuesForward = asyncCommands.zrange(key, 0, 3);
RedisFuture<List<String>> valuesReverse = asyncCommands.zrevrange(key, 0, 3);

zaddの2番目の引数はランクです。 ランクごとに範囲を取得します。昇順はzrange、降順はzrevrangeです。

ランク4の「zero」を追加したため、valuesForwardの最後とvaluesReverse。の最初に表示されます。

6. トランザクション

トランザクションにより、単一のアトミックステップで一連のコマンドを実行できます。 これらのコマンドは、順番に排他的に実行されることが保証されています。 別のユーザーからのコマンドは、トランザクションが終了するまで実行されません。

すべてのコマンドが実行されるか、実行されないかのいずれかです。 Redisは、いずれかが失敗した場合、ロールバックを実行しません。 exec()が呼び出されると、すべてのコマンドが指定された順序で実行されます。

例を見てみましょう:

asyncCommands.multi();
    
RedisFuture<String> result1 = asyncCommands.set("key1", "value1");
RedisFuture<String> result2 = asyncCommands.set("key2", "value2");
RedisFuture<String> result3 = asyncCommands.set("key3", "value3");

RedisFuture<TransactionResult> execResult = asyncCommands.exec();

TransactionResult transactionResult = execResult.get();

String firstResult = transactionResult.get(0);
String secondResult = transactionResult.get(0);
String thirdResult = transactionResult.get(0);

multi を呼び出すと、トランザクションが開始されます。 トランザクションが開始されると、 exec()が呼び出されるまで、後続のコマンドは実行されません。

同期モードでは、コマンドは nullを返します。非同期モードでは、コマンドはRedisFutureを返します。 Exec は、応答のリストを含むTransactionResultを返します。

RedisFutures も結果を受け取るため、非同期APIクライアントは2か所でトランザクション結果を受け取ります。

7. バッチ処理

通常の状態では、LettuceはAPIクライアントによって呼び出されるとすぐにコマンドを実行します。

これは、特にコマンド結果を連続して受信することに依存している場合、ほとんどの通常のアプリケーションが望んでいることです。

ただし、アプリケーションがすぐに結果を必要としない場合、または大量のデータが大量にアップロードされている場合、この動作は効率的ではありません。

非同期アプリケーションは、この動作をオーバーライドできます。

commands.setAutoFlushCommands(false);

List<RedisFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < iterations; i++) {
    futures.add(commands.set("key-" + i, "value-" + i);
}
commands.flushCommands();

boolean result = LettuceFutures.awaitAll(5, TimeUnit.SECONDS,
  futures.toArray(new RedisFuture[0]));

setAutoFlushCommandsがfalseに設定されている場合、アプリケーションはflushCommandsを手動で呼び出す必要があります。 この例では、複数の set コマンドをキューに入れてから、チャネルをフラッシュしました。 AwaitAll は、すべてのRedisFuturesが完了するのを待ちます。

この状態は接続ごとに設定され、接続を使用するすべてのスレッドに影響します。 この機能は、同期コマンドには適用されません。

8. パブリッシュ/サブスクライブ

Redisは、シンプルなパブリッシュ/サブスクライブメッセージングシステムを提供します。 サブスクライバーは、subscribeコマンドを使用してチャネルからのメッセージを消費します。 メッセージは永続化されません。 ユーザーがチャンネルに登録している場合にのみ、ユーザーに配信されます。

Redisはpub/subシステムを使用して、Redisデータセットに関する通知を行い、クライアントがキーの設定、削除、期限切れなどに関するイベントを受信できるようにします。

詳細については、ドキュメントこちらを参照してください。

8.1. サブスクライバー

RedisPubSubListenerはpub/subメッセージを受信します。 このインターフェースはいくつかのメソッドを定義しますが、ここではメッセージを受信するためのメソッドを示します。

public class Listener implements RedisPubSubListener<String, String> {

    @Override
    public void message(String channel, String message) {
        log.debug("Got {} on channel {}",  message, channel);
        message = new String(s2);
    }
}

RedisClient を使用して、pub / subチャネルを接続し、リスナーをインストールします。

StatefulRedisPubSubConnection<String, String> connection
 = client.connectPubSub();
connection.addListener(new Listener())

RedisPubSubAsyncCommands<String, String> async
 = connection.async();
async.subscribe("channel");

リスナーがインストールされている状態で、 RedisPubSubAsyncCommands のセットを取得し、チャネルにサブスクライブします。

8.2. 出版社

公開は、Pub / Subチャネルを接続し、コマンドを取得するだけです。

StatefulRedisPubSubConnection<String, String> connection 
  = client.connectPubSub();

RedisPubSubAsyncCommands<String, String> async 
  = connection.async();
async.publish("channel", "Hello, Redis!");

公開にはチャネルとメッセージが必要です。

8.3. リアクティブサブスクリプション

Lettuceは、pub/subメッセージをサブスクライブするためのリアクティブインターフェイスも提供します。

StatefulRedisPubSubConnection<String, String> connection = client
  .connectPubSub();

RedisPubSubAsyncCommands<String, String> reactive = connection
  .reactive();

reactive.observeChannels().subscribe(message -> {
    log.debug("Got {} on channel {}",  message, channel);
    message = new String(s2);
});
reactive.subscribe("channel").subscribe();

observeChannelsによって返されるFluxは、すべてのチャネルのメッセージを受信しますが、これはストリームであるため、フィルタリングは簡単に実行できます。

9. 高可用性

Redisは、高可用性とスケーラビリティのためのいくつかのオプションを提供します。 完全に理解するには、Redisサーバー構成の知識が必要ですが、Lettuceがそれらをどのようにサポートするかについて簡単に説明します。

9.1. マスター/スレーブ

Redisサーバーは、マスター/スレーブ構成で自分自身を複製します。 マスターサーバーは、マスターキャッシュをスレーブに複製するコマンドのストリームをスレーブに送信します。 Redisは双方向レプリケーションをサポートしていないため、スレーブは読み取り専用です。

Lettuceは、マスター/スレーブシステムに接続し、トポロジを照会してから、読み取り操作用のスレーブを選択できます。これにより、スループットが向上します。

RedisClient redisClient = RedisClient.create();

StatefulRedisMasterSlaveConnection<String, String> connection
 = MasterSlave.connect(redisClient, 
   new Utf8StringCodec(), RedisURI.create("redis://localhost"));
 
connection.setReadFrom(ReadFrom.SLAVE);

9.2. センチネル

Redis Sentinelは、マスターインスタンスとスレーブインスタンスを監視し、マスターフェイルオーバーが発生した場合にスレーブへのフェイルオーバーを調整します。

LettuceはSentinelに接続し、それを使用して現在のマスターのアドレスを検出してから、Sentinelへの接続を返すことができます。

これを行うには、別の RedisURI を作成し、RedisClientをそれに接続します。

RedisURI redisUri = RedisURI.Builder
  .sentinel("sentinelhost1", "clustername")
  .withSentinel("sentinelhost2").build();
RedisClient client = new RedisClient(redisUri);

RedisConnection<String, String> connection = client.connect();

最初のSentinelのホスト名(またはアドレス)とクラスター名、続いて2番目のSentinelアドレスを使用してURIを作成しました。 Sentinelに接続すると、Lettuceはトポロジについてクエリを実行し、現在のマスターサーバーへの接続を返します。

完全なドキュメントはこちらから入手できます。

9.3. クラスター

Redis Clusterは分散構成を使用して、高可用性と高スループットを提供します。

最大1000ノードにまたがるシャードキーをクラスター化するため、クラスター内でトランザクションを使用することはできません。

RedisURI redisUri = RedisURI.Builder.redis("localhost")
  .withPassword("authentication").build();
RedisClusterClient clusterClient = RedisClusterClient
  .create(rediUri);
StatefulRedisClusterConnection<String, String> connection
 = clusterClient.connect();
RedisAdvancedClusterCommands<String, String> syncCommands = connection
  .sync();

RedisAdvancedClusterCommands は、クラスターでサポートされている一連のRedisコマンドを保持し、それらをキーを保持するインスタンスにルーティングします。

完全な仕様はこちらで入手できます。

10. 結論

このチュートリアルでは、Lettuceを使用して、アプリケーション内からRedisサーバーに接続してクエリを実行する方法について説明しました。

Lettuceは、完全にスレッドセーフな非同期インターフェイスのボーナスとともに、Redis機能の完全なセットをサポートします。 また、Java8のCompletionStage インターフェイスを多用して、アプリケーションがデータを受信する方法をきめ細かく制御できるようにします。

コードサンプルは、いつものように、GitHubにあります。