1前書き


SpringアプリケーションでCouchbaseを使う

のチュートリアルへのこのフォローアップでは、Couchbase SDKの非同期性とそれがバッチで永続化操作を実行するためにどのように使われるかを探ります。 Couchbaseのリソースを最大限に活用するためのアプリケーションです。


1.1.

CrudService

インターフェース

まず、一般的な

CrudService

インタフェースを拡張してバッチ操作を含めます。

public interface CrudService<T> {
    ...

    List<T> readBulk(Iterable<String> ids);

    void createBulk(Iterable<T> items);

    void updateBulk(Iterable<T> items);

    void deleteBulk(Iterable<String> ids);

    boolean exists(String id);
}


1.2.

CouchbaseEntity

インタフェース

永続化したいエンティティのインタフェースを定義します。

public interface CouchbaseEntity {

    String getId();

    void setId(String id);

}


1.3.

AbstractCrudService

クラス

それから、これらの各メソッドを一般的な抽象クラスに実装します。このクラスは

前のチュートリアル

で使用した

PersonCrudService

クラスから派生したもので、次のように始まります。

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter<T> converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }

    ...
}


2非同期バケットインタフェース

Couchbase SDKは非同期操作を実行するための

AsyncBucket

インターフェースを提供します。

Bucket

インスタンスが与えられると、

async()

メソッドを介してその非同期バージョンを取得できます。

AsyncBucket asyncBucket = bucket.async();


3バッチ操作


AsyncBucket

インターフェースを使用してバッチ操作を実行するには、


RxJava


ライブラリを使用します。


3.1. 一括読み取り

ここでは

readBulk

メソッドを実装しています。最初にRxJavaの

AsyncBucket

および

flatMap

メカニズムを使用して文書を非同期的に

Observable <JsonDocument>

に取得し、次に

RxJava



toBlocking

メカニズムを使用してこれらをエンティティのリストに変換します。

@Override
public List<T> readBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable<JsonDocument> asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.get(key);
          }
    });

    List<T> items = new ArrayList<T>();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1<JsonDocument>() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}


3.2. バッチ挿入


createBulk

メソッドを実装するために、

RxJavaのflatMap

コンストラクトを再度使用します。

一括変換リクエストはレスポンスが生成されるよりも早く生成され、過負荷状態になることがあるので、

BackpressureException

が発生したときはいつでも指数関数的な遅延で再試行を開始します。

@Override
public void createBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}


3.3. バッチ更新


updateBulk

メソッドでも同様のメカニズムを使用します。

@Override
public void updateBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}


3.4. 一括削除

そして

deleteBulk

メソッドを次のように書きます。

@Override
public void deleteBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}


4

PersonCrudService


最後に、

Person

エンティティ用に

AbstractCrudService

を拡張したSpringサービス、

PersonCrudService

を作成します。

Couchbaseの対話はすべて抽象クラスで実装されているため、エンティティクラスの実装は簡単です。すべての依存関係がインジェクトされ、バケットが読み込まれるようにするだけで済みます。

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}


5結論

このチュートリアルに示されているソースコードはhttps://github.com/eugenp/tutorials/tree/master/couchbase[githubプロジェクト]にあります。

Couchbase Java SDKの詳細については、公式のhttp://developer.couchbase.com/documentation/server/4.1/sdks/java-2.2/java-intro.html[Couchbase開発者向けドキュメントサイト]を参照してください。