Nettyの紹介
1前書き
この記事では、非同期イベント駆動型ネットワーク・アプリケーション・フレームワークであるNettyについて見ていきます。
Nettyの主な目的は、ネットワークとビジネスロジックコンポーネントを分離し、疎結合にして、NIO(またはおそらくNIO.2)に基づいて高性能プロトコルサーバーを構築することです。それはHTTPのような広く知られているプロトコル、またはあなた自身の特定のプロトコルを実装するかもしれません。
2コアコンセプト
Nettyはノンブロッキングフレームワークです。これにより、ブロッキングIOに比べて高いスループットが得られます。
ノンブロッキングIOを理解することは、Nettyのコアコンポーネントとそれらの関係を理解するために不可欠です。
2.1. チャネル
Channel
はJava NIOの基盤です。読み書きなどのIO操作が可能なオープン接続を表します。
2.2. 未来
Nettyの
Channel
上のすべてのIO操作はノンブロッキングです。
つまり、呼び出しの直後にすべての操作が返されます。
標準のJavaライブラリには
Future
インターフェースがありますが、Nettyの目的には不都合です。操作が完了するか、または操作が完了するまで現在のスレッドをブロックすることだけを
Future
に依頼できます。
-
Nettyが独自の
ChannelFuture
インターフェースを持っているのはそのためです。
**
__.
コールバックを
ChannelFuture__に渡すことができます。これは操作の完了時に呼び出されます。
2.3. イベントとハンドラ
Nettyはイベント駆動型のアプリケーションパラダイムを使用しているので、データ処理のパイプラインはハンドラを通過する一連のイベントです。イベントとハンドラは、受信データフローと送信データフローに関連付けることができます。受信イベントは次のとおりです。
-
チャンネルの有効化と無効化
-
読み取り操作イベント
-
例外イベント
-
ユーザーイベント
アウトバウンドイベントはより単純で、一般に、接続のオープン/クローズおよびデータの書き込み/フラッシュに関連しています。
Nettyアプリケーションは、2つのネットワーキングおよびアプリケーションロジックイベントとそれらのハンドラーで構成されています。チャネルイベントハンドラの基本インタフェースは
ChannelHandler
とその上位クラスの
ChannelOutboundHandler
および
ChannelInboundHandler
です。
Nettyは
ChannelHandlerの実装の巨大な階層を提供します。
ChannelInboundHandlerAdapter
および
ChannelOutboundHandlerAdapter__。
すべてのイベントのサブセットのみを処理する必要がある場合は、これらのアダプタを拡張できます。
また、HTTPのような特定のプロトコルの多くの実施態様がある。
HttpRequestDecoder、HttpResponseEncoder、HttpObjectAggregator.
NettyのJavadocで彼らと知り合いになるといいでしょう。
2.4. エンコーダとデコーダ
ネットワークプロトコルを扱うときは、データのシリアル化とシリアル化解除を実行する必要があります。この目的のために、Nettyは、入ってくるデータをデコードすることができる
デコーダ
のために
ChannelInboundHandler
の特別な拡張を導入します。ほとんどのデコーダの基本クラスは__ByteToMessageDecoderです。
送信データをエンコードするために、Nettyには** エンコーダと呼ばれる
ChannelOutboundHandler
の拡張機能があります。
3サーバーアプリケーションの例
リクエストを受信し、計算を実行し、レスポンスを送信する単純なプロトコルサーバーを表すプロジェクトを作成しましょう。
3.1. 依存関係
まず最初に、__pom.xmlでNetty依存関係を提供する必要があります。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
3.2. データ・モデル
要求データクラスは次のような構造になります。
public class RequestData {
private int intValue;
private String stringValue;
//standard getters and setters
}
サーバーがリクエストを受信し、
intValue
に2を掛けた値を返すとします。レスポンスは単一のint値を持つことになります。
public class ResponseData {
private int intValue;
//standard getters and setters
}
3.3. リクエストデコーダ
今度は、プロトコルメッセージ用のエンコーダとデコーダを作成する必要があります。
-
Nettyはソケット受信バッファ** と連携して動作することに注意してください。これはキューとしてではなく単なるバイトの束として表されます。これは、完全なメッセージがサーバーによって受信されていないときに、インバウンドハンドラを呼び出すことができることを意味します。
-
処理前に全メッセージを受信したことを確認する必要があります** そしてそれを行うには多くの方法があります。
まず最初に、一時的な
ByteBuf
を作成し、必要なバイト数になるまで、すべての受信バイトに追加することができます。
public class SimpleProcessingHandler
extends ChannelInboundHandlerAdapter {
private ByteBuf tmp;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("Handler added");
tmp = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
System.out.println("Handler removed");
tmp.release();
tmp = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
tmp.writeBytes(m);
m.release();
if (tmp.readableBytes() >= 4) {
//request processing
RequestData requestData = new RequestData();
requestData.setIntValue(tmp.readInt());
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() ** 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
上の例は少し奇妙に見えますが、Nettyがどのように機能するのかを理解するのに役立ちます。対応するイベントが発生すると、ハンドラのすべてのメソッドが呼び出されます。そのため、ハンドラが追加されたときにバッファを初期化し、新しいバイトを受信したときにデータを埋め、十分なデータが得られたときに処理を開始します。
意図的に
stringValue
を使わなかった – そのような方法でのデコードは不必要に複雑になるだろう。 Nettyが
ChannelInboundHandler
の実装である便利なデコーダクラスを提供するのはそのためです。
ByteToMessageDecoder
および
ReplayingDecoder.
上記のとおり、Nettyを使用してチャネル処理パイプラインを作成できます。そのため、私たちのデコーダを最初のハンドラとし、その後に処理ロジックハンドラを置くことができます。
RequestDataのデコーダを次に示します。
public class RequestDecoder extends ReplayingDecoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
RequestData data = new RequestData();
data.setIntValue(in.readInt());
int strLen = in.readInt();
data.setStringValue(
in.readCharSequence(strLen, charset).toString());
out.add(data);
}
}
このデコーダの考え方はとても簡単です。これは
ByteBuf
の実装を使用しています。これは読み込み操作に十分なデータがバッファにない場合に例外をスローします。
例外が捕捉されると、バッファは先頭まで巻き戻され、デコーダはデータの新しい部分を待ちます。
decode
実行後に
out
リストが空でない場合、デコードは停止します。
3.4. 応答エンコーダ
RequestData
をデコードする以外に、メッセージをエンコードする必要があります。書き込み操作が発生したときに完全なメッセージデータがあるため、この操作は簡単です。
メインハンドラで
Channel
にデータを書き込むことも、ロジックを分離してwrite
ResponseData
オペレーションをキャッチする
MessageToByteEncoder
を拡張するハンドラを作成することもできます。
public class ResponseDataEncoder
extends MessageToByteEncoder<ResponseData> {
@Override
protected void encode(ChannelHandlerContext ctx,
ResponseData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
}
}
3.5. リクエスト処理
デコードとエンコードは別々のハンドラで実行したので、
ProcessingHandler
を変更する必要があります。
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
RequestData requestData = (RequestData) msg;
ResponseData responseData = new ResponseData();
responseData.setIntValue(requestData.getIntValue() ** 2);
ChannelFuture future = ctx.writeAndFlush(responseData);
future.addListener(ChannelFutureListener.CLOSE);
System.out.println(requestData);
}
}
3.6. サーバーブートストラップ
それでは、すべてまとめてサーバーを実行しましょう。
public class NettyServer {
private int port;
//constructor
public static void main(String[]args) throws Exception {
int port = args.length > 0
? Integer.parseInt(args[0]);
: 8080;
new NettyServer(port).run();
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
}
}).option(ChannelOption.SO__BACKLOG, 128)
.childOption(ChannelOption.SO__KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
上記のサーバーブートストラップの例で使用されているクラスの詳細は、それぞれのJavadocにあります。最も興味深い部分はこの行です。
ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
ここでは、リクエストを処理し正しい順序で出力するインバウンドハンドラとアウトバウンドハンドラを定義します。
4クライアントアプリケーション
クライアントは逆エンコーディングとデコーディングを実行する必要があるので、
RequestDataEncoder
と
ResponseDataDecoder
が必要です。
public class RequestDataEncoder
extends MessageToByteEncoder<RequestData> {
private final Charset charset = Charset.forName("UTF-8");
@Override
protected void encode(ChannelHandlerContext ctx,
RequestData msg, ByteBuf out) throws Exception {
out.writeInt(msg.getIntValue());
out.writeInt(msg.getStringValue().length());
out.writeCharSequence(msg.getStringValue(), charset);
}
}
public class ResponseDataDecoder
extends ReplayingDecoder<ResponseData> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) throws Exception {
ResponseData data = new ResponseData();
data.setIntValue(in.readInt());
out.add(data);
}
}
また、リクエストを送信し、サーバーからレスポンスを受信する
ClientHandler
を定義する必要があります。
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
RequestData msg = new RequestData();
msg.setIntValue(123);
msg.setStringValue(
"all work and no play makes jack a dull boy");
ChannelFuture future = ctx.writeAndFlush(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println((ResponseData)msg);
ctx.close();
}
}
それでは、クライアントをブートストラップしましょう。
public class NettyClient {
public static void main(String[]args) throws Exception {
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO__KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(),
new ResponseDataDecoder(), new ClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
ご覧のとおり、サーバーのブートストラップには多くの詳細が共通しています。
これで、クライアントのmainメソッドを実行してコンソールの出力を確認できます。予想通り、
intValue
が246に等しい
ResponseData
が得られました。
5結論
この記事では、Nettyについて簡単に紹介しました。
Channel
や
ChannelHandler
などのコアコンポーネントを紹介しました。また、単純なノンブロッキングプロトコルサーバーとそのクライアントを作成しました。
いつものように、すべてのコードサンプルはhttps://github.com/eugenp/tutorials/tree/master/libraries-server[GitHubで利用可能]で利用可能です。