読者です 読者をやめる 読者になる 読者になる

SIerだけど技術やりたいブログ

4年目のSIerのブログです

ブロッキングとかノンブロッキングを理解したい

Java Servlet

Spring Framework5では「Reactive」対応が目玉だと言われているが、そもそもその前段のブロッキングやノンブロッキングというのが何なのか、いまいちしっくりこなかったので基本から調べた。(今回は特にネットワーク部分に絞って調べた)

また、tomcatの実装ではブロッキングI/OやノンブロッキングI/Oをどのように使っているのか、servlet3.1のノンブロッキングI/Oとは何なのかについても調べた。

調べたら余計にわからないところが増えたけど、とりあえず今の理解をまとめる。
ご指摘ありましたら是非ともコメントお願いします。

ブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされる。

実装方法(ソケット)

クライアント SocketChannel
サーバ ServerSocketChannel

コード例

8080ポートにリクエストが来たらリクエスト内容をレスポンスとして返事する、エコーサーバを作ってみる。

Socketのaccept()を実行した時点で処理がブロックされ、次の行が実行されずに待機する。(ここがブロッキング)
socketをaccept()しているスレッドでリクエストに対する処理をすると、処理中は別のリクエストが受け付けられなくなる。そのため、最大でも1つのリクエストしか同時に処理できない。

@Slf4j
public class BlockingAndSingleEchoServer {
  public void start() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open();) {
      ssc.socket().bind(new InetSocketAddress(8080));
      log.info("サーバを起動しました");
      while (true) {
        try (SocketChannel sc = ssc.accept();//ブロックされる
           BufferedReader in = new BufferedReader(new InputStreamReader(sc.socket().getInputStream()));
           PrintWriter out = new PrintWriter(sc.socket().getOutputStream(), true);
        ) {
          String line;
          while ((line = in.readLine()) != null) {
            log.info("echo " + line + " to " + sc.socket().getRemoteSocketAddress());
            out.println(line);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}


具体的な処理は別スレッドで実行すると、あるリクエストの処理中にも平行して別のリクエストを処理することができる。

public class BlockingAndMultiUserServer  {
  public void start() {
    try (ServerSocketChannel channel = ServerSocketChannel.open()) {
      channel.socket().bind(new InetSocketAddress(8080));
      while (true) {
        // Socketがcloseされるとレスポンスが書き込めないので
        // レスポンスを返却するスレッドでcloseする
        final Socket socket = channel.socket().accept();//ブロックされる
        new Thread(new HelloWorldTask(socket)).start();
      }
    } catch (IOException e) {
        e.printStackTrace();
    }
  }


レスポンスを返却する処理で忘れずにsocketをクローズする。

class EchoTask implements Runnable {
  private final Socket socket;

  EchoTask(Socket socket) {
    this.socket = socket;
  }

  @Override
  public void run() {
    try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
       PrintWriter out = new PrintWriter(socket.getOutputStream(), true);) {
      String line;
      while ((line = in.readLine()) != null) {
        log.info("recieved " + line + " from " + socket.getRemoteSocketAddress());
        log.info("echo " + line + " to " + socket.getRemoteSocketAddress());
        out.println(line);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      try {
        socket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

ブロッキングI/Oで何がいいか

  • read/writeすればブロックして処理が完了するのを待つため、シンプルにコーディングできる

ブロッキングI/Oで何が困るか

  • 複数リクエストを処理するためには、リクエストに対する具体的な処理は別スレッドで実行するしか方法がない。
  • スレッドを生成するためにはメモリを割り当てないといけないため、同時に処理したいリクエスト数分だけメモリが必要になる。(仮に1スレッドに1MB使うとすると10Kリクエストを同時にさばくためには10GB必要になる)
  • ソケットの読み書きが終わるまでスレッドがブロックされるため、低速なネットワークの場合に非効率。

ノンブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされない。

実装方法(ソケット)

利用するクラスはブロッキングI/Oと同じだけど、実装方法が違う。

クライアント SocketChannel
サーバ ServerSocketChannel

コード例(サーバ側)

ノンブロッキングI/Oは読み書きしたときに読み書きができない状態ならば、即座にリターンされる。そのため、読み書き可能かどうかをSelectorを使って監視して、読み書きできるときにだけ処理する。

@Slf4j
public class NonBlockingEchoServer {
  public void start() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open();
       Selector selector = Selector.open();) {
      // ノンブロッキングモードにしてSelectorに受付チャネルを登録する
      ssc.configureBlocking(false);
      ssc.socket().bind(new InetSocketAddress(8080));
      ssc.register(selector, SelectionKey.OP_ACCEPT);
      log.info("サーバが起動しました " + ssc.socket().getLocalSocketAddress());

      // チャネルにイベントが登録されるまで待つ
      while (selector.select() > 0) {
        for (Iterator it = selector.selectedKeys().iterator(); it.hasNext(); ) {
          SelectionKey key = (SelectionKey) it.next();
          it.remove();

          if (key.isAcceptable()) {
            doAccept((ServerSocketChannel) key.channel(), selector);
          } else if (key.isReadable()) {
            doRead((SocketChannel) key.channel(), selector);
          } else if (key.isWritable()) {
            byte[] message = (byte[]) key.attachment();
            doWrite((SocketChannel) key.channel(), selector, message);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


ソケットを受け付けたときのメソッド。
読み込み可能かどうかを監視するためにSelectorにchannelを登録する。

  private void doAccept(ServerSocketChannel ssc, Selector selector) {
    try {
      SocketChannel channel = ssc.accept();
      log.info("connected " + channel.socket().getRemoteSocketAddress());
      channel.configureBlocking(false);
      channel.register(selector, SelectionKey.OP_READ);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


読み込み可能になったときのメソッド。
読み込みにはByteBufferを使う。つらい。
読み込み終わったらレスポンスに書き込むので、Selectorにchannelを登録する。

  public void doRead(SocketChannel channel, Selector selector) {
    try {
      ByteBuffer buffer = ByteBuffer.allocate(1024);

      // ソケットから入力を読み込む
      // コネクションが切れていればチャネルをクローズし、読み込めなければリターンする
      int readBytes = channel.read(buffer);
      if (readBytes == -1) {
        log.info("disconnected " + channel.socket().getRemoteSocketAddress());
        channel.close();
        return;
      }
      if (readBytes == 0) {
        return;
      }

      // 入力されたメッセージを取り出し、チャネルに登録する
      buffer.flip();
      byte[] bytes = new byte[buffer.limit()];
      buffer.get(bytes);

      String line = new String(buffer.array(), "UTF-8").replaceAll(System.getProperty("line.separator"), "");
      log.info("recieved " + line + " from " + channel.socket().getRemoteSocketAddress());

      channel.register(selector, SelectionKey.OP_WRITE, bytes);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


書き込み可能になったときのメソッド。
書き込みにもByteBufferを使う。ほんとつらい。
全てを書き込めるとは限らないのでhasRemaining()で残りがあれば再度書き込む。
全部レスポンスに書き出したらまた読み込みを受け付ける。

  public void doWrite(SocketChannel channel, Selector selector, byte[] message) {
    try {
      ByteBuffer byteBuffer = ByteBuffer.wrap(message);
      channel.write(byteBuffer);
      ByteBuffer restByteBuffer = byteBuffer.slice();

      // ログに送信したメッセージを表示する
      byteBuffer.flip();
      byte[] sendBytes = new byte[byteBuffer.limit()];
      byteBuffer.get(sendBytes);
      String line = new String(sendBytes, "UTF-8").replaceAll(System.getProperty("line.separator"), "");
      log.info("echo " + line + " to " + channel.socket().getRemoteSocketAddress());

      // メッセージを最後まで出力したら入力を受け付ける
      if (restByteBuffer.hasRemaining()) {
        byte[] restBytes = new byte[restByteBuffer.limit()];
        restByteBuffer.get(restBytes);
        channel.register(selector, SelectionKey.OP_WRITE, restBytes);
      } else {
        channel.register(selector, SelectionKey.OP_READ);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

コード例(クライアント側)

Telnetでもいいけど今回はJavaでクライアントを用意する。
コンソールから1行読み込み、サーバに送信する。
サーバからレスポンスが返ってきたらログに出し、コンソールからまた1行読み込む。

@Slf4j
public class Client {
  public static void main(String[] args) {
     try (
        Socket socket = new Socket("localhost", 8080);
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        BufferedReader keyIn = new BufferedReader(new InputStreamReader(System.in));
    ) {
      log.info("サーバに接続しました " + socket.getLocalSocketAddress() + " to " + socket.getRemoteSocketAddress());

      String input;
      while ((input = keyIn.readLine()).length() > 0) {
        out.println(input);
        String line = in.readLine();
        if (line != null) {
          log.info("recieved " + line + " from " + socket.getRemoteSocketAddress());
        } else {
          break;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }  
  }
}


サーバを起動する。

java -jar target\server-1.0-SNAPSHOT-jar-with-dependencies.jar


クライアントを起動する。(x2)

java -jar target\client-1.0-SNAPSHOT-jar-with-dependencies.jar
aaaa //コンソールから入力する
23:24:57.086 [main] INFO Client - send aaaa to localhost/127.0.0.1:8080


1スレッドで複数クライアントの接続を同時に処理できることがわかる。
また、ソケットにデータを流すたびに(読み込み可能になるたびに)リクエストが処理される。

11:54:54.988 [main] INFO server.nonblocking.NonBlockingEchoServer - サーバが起動しました /0:0:0:0:0:0:0:0:8080
11:55:05.855 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /127.0.0.1:58961
11:55:10.026 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /127.0.0.1:58964
11:55:16.350 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved aaaaa from /127.0.0.1:58961
11:55:16.350 [main] INFO server.nonblocking.NonBlockingEchoServer - echo aaaaa to /127.0.0.1:58961
11:55:19.156 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved bbbbb from /127.0.0.1:58964
11:55:19.157 [main] INFO server.nonblocking.NonBlockingEchoServer - echo bbbbb to /127.0.0.1:58964
11:58:29.292 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved ccccc from /127.0.0.1:58961
11:58:29.292 [main] INFO server.nonblocking.NonBlockingEchoServer - echo ccccc to /127.0.0.1:58961

ノンブロッキングI/Oで何がいいか

  • 1つのスレッドで複数のリクエストを処理することが可能
  • 低速なネットワークでも効率的に処理できる

1つのスレッドで複数のリクエストを処理することが"可能"と書いたのは、実際には、リクエストごとにスレッドを割り当てるアーキテクチャ(Tomcatなど)と本当に1つのスレッドで複数のリクエストを処理するアーキテクチャ(Node.jsなど)の2通りあるため。

シングルスレッドのイベントループモデルの場合、あるリクエストを処理している間は
別のリクエストが処理されないため、重たい処理をする用途には向かない(と思う)。

ノンブロッキングI/Oで何が困るか

  • 実装が難しい

Tomcatではどうなっているか

TomcatブロッキングI/OやノンブロッキングI/Oの仕組みをどう利用しているのか調べた。

動作確認環境

BIO Tomcat 8.0.33
NIO Tomcat 8.5.8
Servlet3.0 Tomcat 8.5.8
Servlet3.1 Tomcat 8.5.8

Tomcatの仕組み

Tomcatは役割ごとに複数のコンポーネントから構成されている。(外部からの接続を受け付けるコネクタや実行エンジン部分など)

この本にめちゃめちゃ詳しく載ってるので、中身を知りたい方は必読。
https://www.amazon.co.jp/%e8%a9%b3%e8%a7%a3-tomcat-%e8%97%a4%e9%87%8e-%e5%9c%ad%e4%b8%80/dp/4873117054

ブロッキングI/O

Tomcat7以前のデフォルトのHTTP1.1のコネクタであるHttp11Protocol(JIoEndpoint+Http11ConnectionHandler)はブロッキングI/Oを利用している。
https://tomcat.apache.org/tomcat-8.0-doc/config/http.html#connector_comparison

f:id:kimulla:20161210011429p:plain

JIoEndpointクラスのAcceptorでsocketをaccept()して、workerスレッドでリクエスト処理を実行する。

protected class Acceptor extends AbstractEndpoint.Acceptor {
  @Override
  public void run() {
    // Loop until we receive a shutdown command
    while (running) {
      ...
      // (socket.accept()で待ち受ける)
      socket = serverSocketFactory.acceptSocket(serverSocket);
      ...
      // 
      if (!processSocket(socket)) {
        countDownConnection();
        // Close socket right away
        closeSocket(socket);
      }
      ... 
    }

  protected boolean processSocket(Socket socket) {
    ...
    //別スレッドで実行する
    getExecutor().execute(new SocketProcessor(wrapper));
    ...
    return true;
  }
..
public class DefaultServerSocketFactory implements ServerSocketFactory {
  ...
  @Override
  public Socket acceptSocket(ServerSocket socket) throws IOException {
    return socket.accept();
  }
  ...
}
HTTP Keep alive

HTTP Keep aliveを実現するためにはSocketを開きっぱなしで次のリクエストを待たないといけないため、workerを占有して次のリクエストを待つことになる。

f:id:kimulla:20161210011859p:plain

HTTP Keep aliveを実現するために、リクエスト処理が完了したあともスレッドがRUNNINGのままになる。

f:id:kimulla:20161208232936p:plain

今までありがとうBIOコネクタ

Tomcat8.5.0からはBIOコネクタが無くなり、デフォルトはNioコネクタになった。
今までありがとうBIOコネクタ!あったばっかりだけど達者でな!

ノンブロッキングI/O

TomcatのHttp11NioProtocol(+NioEndpoint+Http11ConnectionHandler)はノンブロッキングI/O。ソースコードの抜粋が難しいので図だけ。

f:id:kimulla:20161210012011p:plain

SocketProcessorの実行タイミング

本当にSocketがReadableになったらSocketProcessorを起動するのか確かめる。

まずHTTPリクエストをラインごとに送信できるクライアントを用意する。

@Slf4j
public class Client {
  public static void main(String[] args) {
  try {
      try (
          Socket socket = new Socket("localhost", 8080);
          PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
          BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
          BufferedReader keyIn = new BufferedReader(new InputStreamReader(System.in));
      ) {
        log.info("サーバに接続しました " + socket.getLocalSocketAddress() + " to " + socket.getRemoteSocketAddress());

        String input;
        while (!(input = keyIn.readLine()).equals("enter")) {
          out.println(input);
          log.info("send " + input + " to "  + socket.getRemoteSocketAddress());
        }

        while (true) {
          input = in.readLine();
          if (input == null) {
            break;
          }
          log.info(input);
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}


コンソールから以下のHTTPリクエストを1行ずつ送信してみる。

GET / HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: ja,en-US;q=0.8,en;q=0.6


このとき、NioEndpointのPollerにブレークを打つと、1行送信のたびにworkerがリクエスト解析処理を始めるのがわかった。

以下がSelectorの監視イベントをループして拾ってるところ。

public class Poller implements Runnable {
  @Override
  public void run() {
    ...
   Iterator<SelectionKey> iterator =
     keyCount > 0 ? selector.selectedKeys().iterator() : null;
    ...
    // Walk through the collection of ready keys and dispatch
    // any active event.
    while (iterator != null && iterator.hasNext()) {
      SelectionKey sk = iterator.next();
      NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
      // Attachment may be null if another thread has called
      // cancelledKey()
      if (attachment == null) {
        iterator.remove();
      } else {
        iterator.remove();
        processKey(sk, attachment);
      }
    }//while
    ...
  }
...
}
Tomcatはリクエストごとにworkerスレッドを消費する

PollerがSelectorを監視するスレッドは共有されるが、リクエスト解析以降の処理はリクエストごとにworkerスレッドを占有する。そのため、worker数の上限が同時処理数の上限になる。

tomcatコネクタについてはこれが参考になる。
http://events.linuxfoundation.org/sites/events/files/slides/tomcat%20connectors.pdf

HTTP Keep alive

NioはSelectorを利用すれば読み込み可能になったタイミングで通知されるため、リクエストがあったタイミングでSocketProcessorを割り当てることができる。
そのため、HTTP Keep aliveのためにworkerを占有しなくて済む。ここがNioにしてよかった一番のポイントな気がする。

f:id:kimulla:20161210012113p:plain

1つのリクエスト処理が完了したあとは、スレッドがWAITになる。

f:id:kimulla:20161208233402p:plain

同時処理数の上限はworker数のため、BIOと比較してHTTP Keep aliveして次のリクエスト待ちになっているソケット分を節約できるようになった。

Servlet3.0の非同期処理

重たい処理や長い処理をするときにworkerスレッドを解放するための仕組み。
ロングポーリングやSSEなどでHTTPのコネクションを維持しているとworkerスレッドを占有するため、同時リクエスト数の上限にひっかかる可能性がある。

f:id:kimulla:20161210012123p:plain

そのため、workerスレッドは重たくない処理だけで使うようにする。

レスポンスをまとめて書き込むパターン。

f:id:kimulla:20161210012159p:plain

chunckedで送るパターン。

f:id:kimulla:20161210013317p:plain

コード例

ServletのdoGet()の処理は何もせずにとりあえず抜ける。
別スレッドを起動してAsyncContextを利用してレスポンスを書き込む。

今回はchunckedで送る方法を試す。

@Slf4j
@WebServlet(urlPatterns = "/", asyncSupported = true)
public class EchoServlet extends HttpServlet {
  Executor executor = new ScheduledThreadPoolExecutor(10);

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    log.info("begin doGet");
    AsyncContext ac = req.startAsync();
    executor.execute(new SlowTask(ac));
    log.info("end doGet");
  }
}

@Slf4j
class SlowTask implements Runnable {
  private AsyncContext ac;

  SlowTask(AsyncContext ac) {
    this.ac = ac;
  }

  @Override
  public void run() {
    log.info("begin AsyncContext#start");

    try {
      PrintWriter writer = ac.getResponse().getWriter();
      for (int i = 0; i < 5; i++) {
        writer.println("task :" + i);
        writer.flush();
        log.info("send chuncked data");
        TimeUnit.SECONDS.sleep(2);
      }
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
    ac.complete();
    log.info("end AsyncContext#start");
  }
}

動作確認

curlでリクエストを送るとちょっとずつ受信されるのがわかる。

$ curl localhost:8080/async-context/
task :0 // チャンクでちょっとずつ返ってくる
task :1 // チャンクでちょっとずつ返ってくる
task :2 // チャンクでちょっとずつ返ってくる
task :3 // チャンクでちょっとずつ返ってくる
task :4 // チャンクでちょっとずつ返ってくる


またログを見るとSlowTaskが別スレッドで実行されていることがわかる。

22:26:45.003 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - begin doGet
22:26:45.009 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - end doGet
22:26:45.009 [pool-1-thread-1] INFO example.com.echo.SlowTask - begin AsyncContext#start
22:26:45.013 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:47.015 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:49.016 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:51.018 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:53.019 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:55.021 [pool-1-thread-1] INFO example.com.echo.SlowTask - end AsyncContext#start

疑問

結局はアプリケーション内でスレッドを生成するんだから、APサーバ全体で見たときのスレッド生成コストは変わらない気がする。
アプリケーション内でスレッドを生成せずに、workerスレッド数の上限を上げればいいと思った。いまいち理解できてない。

Servlet APIブロッキングAPI

HttpServletRequest,HttpServletResponseのreadやwriteをするためのAPIとして
HttpServletRequest#getInputStream(),HttpServletResponse#OutputStream()がある
が、これはブロッキングAPI

なぜなら、ServletInputStreamの実装クラスはInputStream#read()を実装しないといけない。http://docs.oracle.com/javaee/7/api/javax/servlet/servletinputstream.html

This is an abstract class that a servlet container implements. Subclasses of this class must implement the java.io.InputStream.read() method.

で、InputStreamのAPIhttps://docs.oracle.com/javase/jp/8/docs/api/java/io/inputstream.html#read--

入力ストリームからデータの次のバイトを読み込みます。バイト値は、0 - 255の範囲のintとして返されます。ストリームの終わりに達したために読み込むバイトがない場合は、-1が返されます。入力データが読み込めるようになるか、ストリームの終わりが検出されるか、または例外が発生するまで、このメソッドはブロックされます。

つまりブロッキングAPI。HttpServletResponse#OutputStream()も同様。

Nioコネクタのマニュアルを見ても、HTTPヘッダを読み込むまではノンブロッキングだが、肝心のHTTPボディを読み込む箇所はブロッキングな処理らしい。
Tomcatのマニュアルhttp://tomcat.apache.org/tomcat-8.0-doc/config/http.html#connector_comparison

疑問

Tomcatサーバはリクエストを読み込み終わる前にHttpServletを実行するけど、なんでなんだろう。Servletの仕様書やTomcatのリファレンスに記載が見当たらないので正解はよくわからないけど、無限にリクエストボディを送り続けられたときに処理できなくなるからかな?リクエストボディの値が処理で必要になるとも限らないので、HttpServletで実際に使いたいときに読み込めばよい、ということかな?

Servlet3.1のノンブロッキングI/O

HTTPのボディに対する読み書きをノンブロッキングにするための仕組み。
https://blogs.oracle.com/wlc/entry/javaee_c116

実装方法

読み込み ReadListner
書き込み WriteListener

コード例

とりあえず読み込みしたらログに出すだけ。

@Slf4j
@WebServlet(urlPatterns = "/non-blocking", asyncSupported = true)
public class NonBlockingIOServlet extends HttpServlet {

  @Override
  public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    log.info("begin doGet");
    AsyncContext ctx = req.startAsync();
    ServletInputStream input = req.getInputStream();
    input.setReadListener(new NonBlockingReadListener(input, ctx));
    log.info("end doGet");
  }
}

@Slf4j
class NonBlockingReadListener implements ReadListener {
  private final ServletInputStream inputStream;
  private final AsyncContext ctx;

  NonBlockingReadListener(ServletInputStream inputStream, AsyncContext ctx) {
    log.info("ReadListener is initialized");
    this.inputStream = inputStream;
    this.ctx = ctx;
  }

  @Override
  public void onDataAvailable() throws IOException {
    log.info("onDataAvaliable");

    StringBuilder sb = new StringBuilder();
    int len = -1;
    byte b[] = new byte[1024];

    while (!inputStream.isFinished() && inputStream.isReady() && (len = inputStream.read(b)) != -1) {
      String data = new String(b, 0, len);
      log.info("recieved : " + data);
    }
  }

  @Override
  public void onAllDataRead() throws IOException {
    log.info("onAllDataRead");
    ctx.complete();
  }

  @Override
  public void onError(Throwable throwable) {
    log.info("onError : " + throwable);
    ctx.complete();
  }
}


またクライアントを起動してリクエストを投げてみる。(Telnetでもいいです)

java -jar target\client-1.0-SNAPSHOT-jar-with-dependencies.jar
POST /servlet-api/non-blocking HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Content-Length: 210000003

request body .... //ここを1行ずつ送る
request body .... //ここを1行ずつ送る


データ到着のたびにonDataAvailable()が実行される。

12 08, 2016 11:03:34 午後 org.apache.coyote.AbstractProtocol start
情報: Starting ProtocolHandler [http-nio-8080]
23:03:48.905 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - begin doGet
23:03:48.909 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - ReadListener is initialized
23:03:48.909 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - end doGet
23:03:48.910 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onDataAvaliable
23:03:48.910 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - recieved : request body ....

23:04:04.735 [http-nio-8080-exec-2] INFO example.com.NonBlockingReadListener - onDataAvaliable
23:04:04.735 [http-nio-8080-exec-2] INFO example.com.NonBlockingReadListener - recieved : request body ....

何がいいのか

  • HTTPのボディも効率的に読み書きできるようになるため、でかいファイルアップロードや低速なネットワークからのアクセスを効率的に処理できる。

何が困るか

  • 実装が難しい

そして時代はSpring5へ

Controllerの引数や戻り値をReactorのAPIにしとくと、Servlet3.1のノンブロッキングI/Oを利用して読み書きしてくれるようになる。
http://www.slideshare.net/takuyaiwatsuka/spring-5

結論

IOTやモバイル端末の普及により、低速なネットワークからの大量のアクセスが増えてきた昨今、cpuを効率的に利用しつつ大量のリクエストを処理できるノンブロッキングな処理というのが注目を集めている。
ただし、ノンブロッキング処理はたいがい難易度が上がるので、必要になったら採用を検討するくらいでちょうど良さそう。

ということで俺はギョウミーアプリに戻るぞー!ジョジョー!

あとがき

これ調べるのにまるまる1ヶ月くらいかかったので、虚無感がすごい。

サンプルコード

github.com