読者です 読者をやめる 読者になる 読者になる

SIerだけど技術やりたいブログ

4年目のSIerのブログです

SpringのContextHolderいろいろ

Springには、…ContextHolderというクラスがある。スレッドローカルに値を保存しておくことで、情報をいろんなところから参照できるようにする。

スレッドローカルは…まあスレッド固有の値ですよね。(あたりまえ)

TomcatなどのAPサーバはリクエストごとにworkerスレッドを割り当てるので、
kimulla.hatenablog.com

@AsyncやExecutorServiceを利用して別スレッドでXXXContextHolderを利用しない限り、どこからでもスレッドローカルに登録した同じ値を参照できる。具体的なクラスは、

Spring MVC RequestContextHolder リクエスト情報
Spring Security SecurityContextHolder 認証情報

使い方

Spring Bootでリクエストと認証のログを出してみる。spring-boot-starter-securityがclasspathに含まれてるとデフォルトでDIGEST認証がかかるので、今回は認証にそれを使う。

ログ処理はHandlerInterceptorを使う。
引数にHttpServletRequestが取れるけど、わざとRequestContextHolderから取得する。

@Slf4j
public class LoggingInterceptor extends HandlerInterceptorAdapter {

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
    String msg = (String) request.getParameter("msg");
    log.info("request : " + msg);

    SecurityContext sc = SecurityContextHolder.getContext();
    Authentication authentication = sc.getAuthentication();
    log.info("authentication : " + authentication);
    return true;
  }
}

HandlerInterceptorを登録する。

@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter{

    @Override
  public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(new LoggingInterceptor())
        .addPathPatterns("/**");
  }
}

リクエストするためのメソッドを用意する。

@RestController
@SpringBootApplication
public class ContextholderApplication {

  public static void main(String[] args) {
    SpringApplication.run(ContextholderApplication.class, args);
  }

  @GetMapping(value = "sample", params = "msg")
  public String param() {
    return "success";
  }
}

起動するとログに認証パスワードが表示されるので、それを使ってリクエストする。

...
2017-02-18 19:39:47.925  INFO 13184 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2017-02-18 19:39:47.955  INFO 13184 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2017-02-18 19:39:48.227  INFO 13184 --- [           main] b.a.s.AuthenticationManagerConfiguration : 

Using default security password: e5ad05bf-3daa-4a1b-91ad-81d726ed712a
curl --user user:e5ad05bf-3daa-4a1b-91ad-81d726ed712a http://localhost:8080/sample?msg=hello

ログに出る。

2017-02-18 19:40:01.634  INFO 13184 --- [nio-8080-exec-1] com.example.LoggingInterceptor           : request : hello
2017-02-18 19:40:01.634  INFO 13184 --- [nio-8080-exec-1] com.example.LoggingInterceptor           : authentication : org.springframework.security.authentication.UsernamePasswordAuthenticationToken@442b5a9f: Principal: org.springframework.security.core.userdetails.User@36ebcb: Username: user; Password: [PROTECTED]; Enabled: true; AccountNonExpired: true; credentialsNonExpired: true; AccountNonLocked: true; Granted Authorities: ROLE_USER; Credentials: [PROTECTED]; Authenticated: true; Details: org.springframework.security.web.authentication.WebAuthenticationDetails@b364: RemoteIpAddress: 0:0:0:0:0:0:0:1; SessionId: null; Granted Authorities: ROLE_USER

うん。


・・・うん。

jenkinsのpipeline入門(jenkinsfile)

本記事について、Jenkinsのpipelineが最近(2017/2/20時点で)DSLがごっそり入れ替わったっぽいので内容が古いです。注意。


Jenkins2では、Groovy DSLを用いたpipelineの記述ができるようになったらしい。若干の時代遅れ感があるけど、最近仕事で使う機会があり、土日にわからないところを整理したのでメモる。

ジョブ定義を画面からぽちぽちするなんて時代遅れ!技術者として恥ずかしい!これこそがモダン!とか煽るつもりはまったくないけど、ジョブ定義の柔軟性が高いし、パイプラインに人の承認フローが組み込めたり、実行ノードを簡単に選べるし、何よりコード管理できるので、普通に良さそうだと思いました。情報が少ないことを除けばな!

Jenkins2のインストール

公式サイトからwarを落とすなりdockerで起動するなりして、Jenkinsをインストールする。(今回利用したのはver 2.32.1)

f:id:kimulla:20170128200815p:plain

ジョブを作る。

f:id:kimulla:20170128201726p:plain

通常はジョブをJenkinsfileに記述し、リポジトリのトップディレクトリに置いておく。
が、今回は勉強が目的なのでここにジョブを記述する。

f:id:kimulla:20170130224408p:plain

とりあえず動かす

とりあえず動かしてみる。

node {
    print "Hello World"
}

実行結果は

f:id:kimulla:20170130224816p:plain

コンソール出力で確かめる。

f:id:kimulla:20170130224632p:plain

とりあえず動いた。


node

実行ノードを指定できる。Jenkinsが複数のslaveを持つときに、nodeの引数にslave名を指定する。

slaveをひとつ作って試してみる。

左側のビルド実行状態 > 新規ノード作成 から、以下の通り作成する。

f:id:kimulla:20170130225138p:plain

slaveが作れた。

f:id:kimulla:20170130225159p:plain

nodeの引数にslave名を指定する。

node('slave') {
    print "hello world"
}

実行してコンソール出力を確かめると、slaveで実行されていることがわかる。

Started by user root
[Pipeline] node
Running on slave in /tmp\jenkins/slave\workspace\pipeline-sample
[Pipeline] {
[Pipeline] echo
hello world
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

環境変数の参照

job内では複数の環境変数を参照できる。

node {
    print "BUILD_NUMBER: ${env.BUILD_NUMBER}"
    print "BUILD_ID: ${env.BUILD_ID}"
    print "WORKSPACE: ${env.WORKSPACE}"
    print "JENKINS_URL: ${env.JENKINS_URL}"
    print "BUILD_URL: ${env.BUILD_URL}"
    print "JOB_URL: ${env.JOB_URL}"
}

実行してコンソール出力を確かめる。

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] echo
BUILD_NUMBER: 2
[Pipeline] echo
BUILD_ID: 2
[Pipeline] echo
WORKSPACE: /var/jenkins_home/workspace/pipeline-sample
[Pipeline] echo
JENKINS_URL: http://192.168.11.100:18080/
[Pipeline] echo
BUILD_URL: http://192.168.11.100:18080/job/pipeline-sample/2/
[Pipeline] echo
JOB_URL: http://192.168.11.100:18080/job/pipeline-sample/
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

参照できる環境変数の一覧は以下から確認できる。

f:id:kimulla:20170130230007p:plain

f:id:kimulla:20170130230017p:plain

f:id:kimulla:20170130230037p:plain


変数の定義

Groovyなので、変数定義や展開ができる。

def MESSAGE='sample'

node {
    print MESSAGE
    print "展開される ${MESSAGE}"
    print '展開されない ${MESSAGE}'
}

実行してコンソール出力を確かめる。ダブルクオートは中の変数が展開されるけど、シングルクオートは展開されないことがわかる。(Groovyの構文覚えないといけない)

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] echo
sample
[Pipeline] echo
展開される sample
[Pipeline] echo
展開されない ${MESSAGE}
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

制御文

Groovyなので、制御文も書ける。

node {
    for (i in 0..9) {
        println i
    }
}

ただセキュリティの観点から、Groovy DSLは色々と機能を絞ったサンドボックス上で実行されるので、デフォルトだとRejectedAccessExceptionになる。

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
org.jenkinsci.plugins.scriptsecurity.sandbox.RejectedAccessException: Scripts not permitted to use staticMethod org.codehaus.groovy.runtime.ScriptBytecodeAdapter createRange java.lang.Object java.lang.Object boolean
	at org.jenkinsci.plugins.scriptsecurity.sandbox.whitelists.StaticWhitelist.rejectStaticMethod(StaticWhitelist.java:192)
	at org.jenkinsci.plugins.scriptsecurity.sandbox.groovy.SandboxInterceptor.onStaticCall(SandboxInterceptor.java:142)
	at org.kohsuke.groovy.sandbox.impl.Checker$2.call(Checker.java:180)
	at org.kohsuke.groovy.sandbox.impl.Checker.checkedStaticCall(Checker.java:177)

実行したければ Use Groovy Sandbox のチェックを外すか、

f:id:kimulla:20170130230516p:plain

Jenkinsの管理からScriptを許可する。

f:id:kimulla:20170130230420p:plain

f:id:kimulla:20170130230428p:plain


ここらへんは詳しく書いている人がいたのでそっちを参照する。

arasio.hatenablog.com


Groovyのメソッド

Groovyのメソッドも書ける。(やっぱりセキュリティにひっかかったりする)

node {
    println new Date().format("yyyyMMddHHmmssSSS")
}

実行してコンソール出力を確かめると、確かにメソッドが実行できる。

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] echo
20170128044540483
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

stage

テスト→ビルド→デプロイ、みたいにビルドのステージを定義できる。

node {
    stage('build') {
        print "build"
    }
    
    stage ('test') {
        print "test"
    }
    
    stage ('deploy') {
        print "deploy"
    }
}

実行するとステージごとにステータスが見れる。

f:id:kimulla:20170130231201p:plain


あるステージで失敗した場合、ステージの表示が赤になる。
error という処理を途中で中断したいときに使うコマンドを使って失敗させてみる。

node {
    stage('build') {
        print "build"
    }
    
    stage ('test') {
        error "failed"
    }
    
    stage ('deploy') {
        print "deploy"
    }
}

f:id:kimulla:20170130231304p:plain

コンソール出力からもERRORが起きてることがわかる。

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] stage
[Pipeline] { (build)
[Pipeline] echo
build
[Pipeline] }
[Pipeline] // stage
[Pipeline] stage
[Pipeline] { (test)
[Pipeline] error
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
ERROR: failed
Finished: FAILURE

throw new Exceptionでも同じことができる。
ただしコンソール出力にスタックトレースが出る点がerrorコマンドと異なる。

node {
    stage('build') {
        print "build"
    }
    
    stage ('test') {
        throw new Exception("failed")
    }
    
    stage ('deploy') {
        print "deploy"
    }
}
Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] stage
[Pipeline] { (build)
[Pipeline] echo
build
[Pipeline] }
[Pipeline] // stage
[Pipeline] stage
[Pipeline] { (test)
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
org.jenkinsci.plugins.scriptsecurity.sandbox.RejectedAccessException: Scripts not permitted to use new java.lang.Exception java.lang.String
	at org.jenkinsci.plugins.scriptsecurity.sandbox.whitelists.StaticWhitelist.rejectNew(StaticWhitelist.java:187)
	at org.jenkinsci.plugins.scriptsecurity.sandbox.groovy.SandboxInterceptor.onNewInstance(SandboxInterceptor.java:130)
	at org.kohsuke.groovy.sandbox.impl.Checker$3.call(Checker.java:191)

※ errorコマンドも内部ではAbortExceptionという例外をスローしてるっぽいけど、正直よくわかってない。


失敗しても処理を継続したい場合

何を失敗とするのかはstepによるけど、各stepは、失敗した場合にExceptionをスローするようになっている。

失敗しても処理を継続したい場合は、try-catchすればいい。

node {
    stage('stage1') {
        try{
            print "try"
            error "failed"
        } catch(Exception e) {
            print "catch"
        } finally {
            print "finally"
        }
    }
}

ただしこの場合は例外をにぎりつぶしてるので、ビルド結果がsuccessになってしまう。

f:id:kimulla:20170130231623p:plain

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] stage
[Pipeline] { (stage1)
[Pipeline] echo
try
[Pipeline] error
[Pipeline] echo
catch
[Pipeline] echo
finally
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

currentBuild.result

例外ハンドリングしつつ全体のステータスをfailにしたい場合は、currentBuild.resultを利用する。

node {
    stage('stage1') {
        try{
            print "try"
            error "failed"
        } catch(Exception e) {
            currentBuild.result = 'FAILURE'
        } 
    }
    stage('stage2') {
        print "stage2"
    }
}

こうすると、次のステージに進みつつ全体をFAILUREにできる。

f:id:kimulla:20170130231659p:plain


ステージは進めるけどステージごとに成功、失敗を表示させる、というのはできないらしい。ただし、JenkinsのJIRAに上がってるのでそのうち対応される感はある。
https://issues.jenkins-ci.org/browse/JENKINS-26522

もしステージ内でエラーハンドリングして次のステージに進まずに失敗させたければ、例外を投げるかerrorを使うと良さそう。


sh

引数に与えられたシェルスクリプトを実行する。

node {
    sh "date"
}  

実行してコンソール出力を確かめる。

Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] sh
[pipeline-sample] Running shell script
+ date
Wed Jan 18 22:57:12 UTC 2017
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
Finished: SUCCESS

shは別シェルとして実行されるため、シェル変数は引き継がれない点に注意。

node {
    sh "HOGE=xxx"
    sh "$HOGE"
}
Started by user root
[Pipeline] node
Running on master in /var/jenkins_home/workspace/pipeline-sample
[Pipeline] {
[Pipeline] sh
[pipeline-sample] Running shell script
+ HOGE=xxx
[Pipeline] }
[Pipeline] // node
[Pipeline] End of Pipeline
groovy.lang.MissingPropertyException: No such property: HOGE for class: groovy.lang.Binding
	at groovy.lang.Binding.getVariable(Binding.java:63)
	at org.jenkinsci.plugins.scriptsecurity.sandbox.groovy.SandboxInterceptor.onGetProperty(SandboxInterceptor.java:224)
        at org.kohsuke.groovy.sandbox.impl.Checker$4.call(Checker.java:241)
	at org.kohsuke.groovy.sandbox.impl.Checker.checkedGetProperty(Checker.java:238)
	at org.kohsuke.groovy.sandbox.impl.Checker.checkedGetProperty(Checker.java:221)
	at com.cloudbees.groovy.cps.sandbox.SandboxInvoker.getProperty(SandboxInvoker.java:28)

exit codeが0以外の場合、例外が投げられる。

node {
    stage('stage1') {
        sh "true"
    }
    
    stage('stage2') {
        sh "false"    
    }
}

f:id:kimulla:20170130232701p:plain


input

パイプラインに、人のチェックを組み込める。

node {
    stage('test') {
        print "this is test"
    }
    
    stage('permit'){
        input 'Ready to go?'
    }
    stage('deploy') {
        print "start"
    }
}

実行してみると、permitのstageで止まる。

f:id:kimulla:20170130232841p:plain

permitのstageにカーソルを合わせると、人のチェックができる。

f:id:kimulla:20170130232923p:plain

input ステップを実行する前に、mattermostやmailに通知しておけばチェック依頼も自動化できる。当然、mattermostやmailに通知するステップもある。


その他のStep

便利なステップが山ほどある。
https://jenkins.io/doc/pipeline/steps/

が、もっとお手軽に、Pipeline Syntaxでstepを作成できる。

f:id:kimulla:20170130233145p:plain

値を打ち込めばDSLを生成してくれる。

f:id:kimulla:20170130233204p:plain


その他のpluginを呼ぶには

調べてもよくわからなかったけど、現状、pipeline pluginに対応したものしか呼べないらしい。

stackoverflow.com

なので今時点(2017/1/30)では、emotional-jenkins-pluginを入れてJenkinsおじさんを怒らせることができないっぽい。

なんだって?jenkinsおじさんが怒ってくれないだと?使うわけないだろ!こんなもん!!!…とお怒りの皆さま、安心してください。

emotional-jenkins-pluginにpull requestが出されてるので、そのうち使えるんじゃないかと思います。
https://github.com/jenkinsci/emotional-jenkins-plugin/pull/2

またpipelineはJenkinsの標準機能なので、他のpluginもそのうち対応されていくと思いたい。

最後に

柔軟性が高いし、パイプライン中に人の承認フローが組み込めたり、実行ノードが簡単に選べるので良さげ。ただし、あまり柔軟性を持たせてもビルド職人を生むだけなので、意識的にDSLに閉じた操作にとどめたほうが無難だと思いました。


makeのshell関数の実行タイミングは直感と異なる

linux

makeのshell関数の実行タイミングは直感と異なる

Makefile作成時にshell関数の実行タイミングが直感と異なっていてハマったのでメモ。

やりたかったこと

これ。
www.rhoboro.com

sphinxドキュメントをビルドするときに、なるべくビルド環境を汚さず、CIツールへの依存も減らしたかった。

想定する流れ。

  • makeでDockerをビルド・実行しsphinxドキュメントを作成(docker build ,docker run)
  • コンテナに生成された成果物をローカルに持ってくる(docker cp)
  • コンテナを削除(docker rm)

このとき、コンテナの削除を以下のコマンドで削除できるかと思ったらできず。

build:
        docker build -t xxx .
        docker run --name yyy xxx 
        ...
        docker rm $(shell docker ps -aqf name=yyy)

以下のように $(shell docker ps -aqf name=yyy) の実行結果が空になる。なぜ…。

docker rm
docker: "rm" requires a minimum of 1 argument.
See '/usr/bin/docker-current rm --help'.

Usage:  docker rm [OPTIONS] CONTAINER [CONTAINER...]

Remove one or more containers
make: *** [build] エラー 1

原因

上から順に処理が実行されるのではなく、まず初めにshell関数が実行される。

例えば以下の場合、

task:
        touch sample.txt
        echo $(shell ls)
        rm sample.txt

上から順にコマンドが実行されないので、echo $(shell ls) の結果にsample.txtが含まれない。

$ make
touch sample.txt
echo Makefile shell.sh
Makefile shell.sh
rm sample.txt

シェルスクリプトの場合

シェルで別コマンドの実行結果を利用するといえばコマンド置換($(),``)。コマンド置換はそのコマンドが書かれた行で実行される。

#!/bin/sh

touch sample.txt
echo $(ls)
rm sample.txt

上から順にコマンドが実行されるので、 echo $(ls) の結果にsample.txtが含まれる。

$ ./shell.sh
Makefile sample.txt shell.sh

解決策

makeでlinuxと同じような順序で実行したければ、シェルのコマンド置換を利用する。
そのためには、$をエスケープすればいい。

task:
        touch sample.txt
        echo $$(ls)
        rm sample.txt
$ make
touch sample.txt
echo $(ls)
Makefile sample.txt shell.sh
rm sample.txt

はーもう悩むのめんどくさいし、シェルでいいものはシェルで書くことにする。

ブロッキングとかノンブロッキングを理解したい

Java Servlet

Spring Framework5では「Reactive」対応が目玉だと言われているが、そもそもその前段のブロッキングやノンブロッキングというのが何なのか、いまいちしっくりこなかったので基本から調べた。(今回は特にネットワーク部分に絞って調べた)

また、tomcatの実装ではブロッキングI/OやノンブロッキングI/Oをどのように使っているのか、servlet3.1のノンブロッキングI/Oとは何なのかについても調べた。

調べたら余計にわからないところが増えたけど、とりあえず今の理解をまとめる。
ご指摘ありましたら是非ともコメントお願いします。

ブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされる。

実装方法(ソケット)

クライアント SocketChannel
サーバ ServerSocketChannel

コード例

8080ポートにリクエストが来たらリクエスト内容をレスポンスとして返事する、エコーサーバを作ってみる。

Socketのaccept()を実行した時点で処理がブロックされ、次の行が実行されずに待機する。(ここがブロッキング)
socketをaccept()しているスレッドでリクエストに対する処理をすると、処理中は別のリクエストが受け付けられなくなる。そのため、最大でも1つのリクエストしか同時に処理できない。

@Slf4j
public class BlockingAndSingleEchoServer {
  public void start() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open();) {
      ssc.socket().bind(new InetSocketAddress(8080));
      log.info("サーバを起動しました");
      while (true) {
        try (SocketChannel sc = ssc.accept();//ブロックされる
           BufferedReader in = new BufferedReader(new InputStreamReader(sc.socket().getInputStream()));
           PrintWriter out = new PrintWriter(sc.socket().getOutputStream(), true);
        ) {
          String line;
          while ((line = in.readLine()) != null) {
            log.info("echo " + line + " to " + sc.socket().getRemoteSocketAddress());
            out.println(line);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}


具体的な処理は別スレッドで実行すると、あるリクエストの処理中にも平行して別のリクエストを処理することができる。

public class BlockingAndMultiUserServer  {
  public void start() {
    try (ServerSocketChannel channel = ServerSocketChannel.open()) {
      channel.socket().bind(new InetSocketAddress(8080));
      while (true) {
        // Socketがcloseされるとレスポンスが書き込めないので
        // レスポンスを返却するスレッドでcloseする
        final Socket socket = channel.socket().accept();//ブロックされる
        new Thread(new HelloWorldTask(socket)).start();
      }
    } catch (IOException e) {
        e.printStackTrace();
    }
  }


レスポンスを返却する処理で忘れずにsocketをクローズする。

class EchoTask implements Runnable {
  private final Socket socket;

  EchoTask(Socket socket) {
    this.socket = socket;
  }

  @Override
  public void run() {
    try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
       PrintWriter out = new PrintWriter(socket.getOutputStream(), true);) {
      String line;
      while ((line = in.readLine()) != null) {
        log.info("recieved " + line + " from " + socket.getRemoteSocketAddress());
        log.info("echo " + line + " to " + socket.getRemoteSocketAddress());
        out.println(line);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      try {
        socket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}

ブロッキングI/Oで何がいいか

  • read/writeすればブロックして処理が完了するのを待つため、シンプルにコーディングできる

ブロッキングI/Oで何が困るか

  • 複数リクエストを処理するためには、リクエストに対する具体的な処理は別スレッドで実行するしか方法がない。
  • スレッドを生成するためにはメモリを割り当てないといけないため、同時に処理したいリクエスト数分だけメモリが必要になる。(仮に1スレッドに1MB使うとすると10Kリクエストを同時にさばくためには10GB必要になる)
  • ソケットの読み書きが終わるまでスレッドがブロックされるため、低速なネットワークの場合に非効率。

ノンブロッキングI/O

I/Oをする際(read,write)に、処理がブロックされない。

実装方法(ソケット)

利用するクラスはブロッキングI/Oと同じだけど、実装方法が違う。

クライアント SocketChannel
サーバ ServerSocketChannel

コード例(サーバ側)

ノンブロッキングI/Oは読み書きしたときに読み書きができない状態ならば、即座にリターンされる。そのため、読み書き可能かどうかをSelectorを使って監視して、読み書きできるときにだけ処理する。

@Slf4j
public class NonBlockingEchoServer {
  public void start() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open();
       Selector selector = Selector.open();) {
      // ノンブロッキングモードにしてSelectorに受付チャネルを登録する
      ssc.configureBlocking(false);
      ssc.socket().bind(new InetSocketAddress(8080));
      ssc.register(selector, SelectionKey.OP_ACCEPT);
      log.info("サーバが起動しました " + ssc.socket().getLocalSocketAddress());

      // チャネルにイベントが登録されるまで待つ
      while (selector.select() > 0) {
        for (Iterator it = selector.selectedKeys().iterator(); it.hasNext(); ) {
          SelectionKey key = (SelectionKey) it.next();
          it.remove();

          if (key.isAcceptable()) {
            doAccept((ServerSocketChannel) key.channel(), selector);
          } else if (key.isReadable()) {
            doRead((SocketChannel) key.channel(), selector);
          } else if (key.isWritable()) {
            byte[] message = (byte[]) key.attachment();
            doWrite((SocketChannel) key.channel(), selector, message);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


ソケットを受け付けたときのメソッド。
読み込み可能かどうかを監視するためにSelectorにchannelを登録する。

  private void doAccept(ServerSocketChannel ssc, Selector selector) {
    try {
      SocketChannel channel = ssc.accept();
      log.info("connected " + channel.socket().getRemoteSocketAddress());
      channel.configureBlocking(false);
      channel.register(selector, SelectionKey.OP_READ);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


読み込み可能になったときのメソッド。
読み込みにはByteBufferを使う。つらい。
読み込み終わったらレスポンスに書き込むので、Selectorにchannelを登録する。

  public void doRead(SocketChannel channel, Selector selector) {
    try {
      ByteBuffer buffer = ByteBuffer.allocate(1024);

      // ソケットから入力を読み込む
      // コネクションが切れていればチャネルをクローズし、読み込めなければリターンする
      int readBytes = channel.read(buffer);
      if (readBytes == -1) {
        log.info("disconnected " + channel.socket().getRemoteSocketAddress());
        channel.close();
        return;
      }
      if (readBytes == 0) {
        return;
      }

      // 入力されたメッセージを取り出し、チャネルに登録する
      buffer.flip();
      byte[] bytes = new byte[buffer.limit()];
      buffer.get(bytes);

      String line = new String(buffer.array(), "UTF-8").replaceAll(System.getProperty("line.separator"), "");
      log.info("recieved " + line + " from " + channel.socket().getRemoteSocketAddress());

      channel.register(selector, SelectionKey.OP_WRITE, bytes);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }


書き込み可能になったときのメソッド。
書き込みにもByteBufferを使う。ほんとつらい。
全てを書き込めるとは限らないのでhasRemaining()で残りがあれば再度書き込む。
全部レスポンスに書き出したらまた読み込みを受け付ける。

  public void doWrite(SocketChannel channel, Selector selector, byte[] message) {
    try {
      ByteBuffer byteBuffer = ByteBuffer.wrap(message);
      channel.write(byteBuffer);
      ByteBuffer restByteBuffer = byteBuffer.slice();

      // ログに送信したメッセージを表示する
      byteBuffer.flip();
      byte[] sendBytes = new byte[byteBuffer.limit()];
      byteBuffer.get(sendBytes);
      String line = new String(sendBytes, "UTF-8").replaceAll(System.getProperty("line.separator"), "");
      log.info("echo " + line + " to " + channel.socket().getRemoteSocketAddress());

      // メッセージを最後まで出力したら入力を受け付ける
      if (restByteBuffer.hasRemaining()) {
        byte[] restBytes = new byte[restByteBuffer.limit()];
        restByteBuffer.get(restBytes);
        channel.register(selector, SelectionKey.OP_WRITE, restBytes);
      } else {
        channel.register(selector, SelectionKey.OP_READ);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

コード例(クライアント側)

Telnetでもいいけど今回はJavaでクライアントを用意する。
コンソールから1行読み込み、サーバに送信する。
サーバからレスポンスが返ってきたらログに出し、コンソールからまた1行読み込む。

@Slf4j
public class Client {
  public static void main(String[] args) {
     try (
        Socket socket = new Socket("localhost", 8080);
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        BufferedReader keyIn = new BufferedReader(new InputStreamReader(System.in));
    ) {
      log.info("サーバに接続しました " + socket.getLocalSocketAddress() + " to " + socket.getRemoteSocketAddress());

      String input;
      while ((input = keyIn.readLine()).length() > 0) {
        out.println(input);
        String line = in.readLine();
        if (line != null) {
          log.info("recieved " + line + " from " + socket.getRemoteSocketAddress());
        } else {
          break;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }  
  }
}


サーバを起動する。

java -jar target\server-1.0-SNAPSHOT-jar-with-dependencies.jar


クライアントを起動する。(x2)

java -jar target\client-1.0-SNAPSHOT-jar-with-dependencies.jar
aaaa //コンソールから入力する
23:24:57.086 [main] INFO Client - send aaaa to localhost/127.0.0.1:8080


1スレッドで複数クライアントの接続を同時に処理できることがわかる。
また、ソケットにデータを流すたびに(読み込み可能になるたびに)リクエストが処理される。

11:54:54.988 [main] INFO server.nonblocking.NonBlockingEchoServer - サーバが起動しました /0:0:0:0:0:0:0:0:8080
11:55:05.855 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /127.0.0.1:58961
11:55:10.026 [main] INFO server.nonblocking.NonBlockingEchoServer - connected /127.0.0.1:58964
11:55:16.350 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved aaaaa from /127.0.0.1:58961
11:55:16.350 [main] INFO server.nonblocking.NonBlockingEchoServer - echo aaaaa to /127.0.0.1:58961
11:55:19.156 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved bbbbb from /127.0.0.1:58964
11:55:19.157 [main] INFO server.nonblocking.NonBlockingEchoServer - echo bbbbb to /127.0.0.1:58964
11:58:29.292 [main] INFO server.nonblocking.NonBlockingEchoServer - recieved ccccc from /127.0.0.1:58961
11:58:29.292 [main] INFO server.nonblocking.NonBlockingEchoServer - echo ccccc to /127.0.0.1:58961

ノンブロッキングI/Oで何がいいか

  • 1つのスレッドで複数のリクエストを処理することが可能
  • 低速なネットワークでも効率的に処理できる

1つのスレッドで複数のリクエストを処理することが"可能"と書いたのは、実際には、リクエストごとにスレッドを割り当てるアーキテクチャ(Tomcatなど)と本当に1つのスレッドで複数のリクエストを処理するアーキテクチャ(Node.jsなど)の2通りあるため。

シングルスレッドのイベントループモデルの場合、あるリクエストを処理している間は
別のリクエストが処理されないため、重たい処理をする用途には向かない(と思う)。

ノンブロッキングI/Oで何が困るか

  • 実装が難しい

Tomcatではどうなっているか

TomcatブロッキングI/OやノンブロッキングI/Oの仕組みをどう利用しているのか調べた。

動作確認環境

BIO Tomcat 8.0.33
NIO Tomcat 8.5.8
Servlet3.0 Tomcat 8.5.8
Servlet3.1 Tomcat 8.5.8

Tomcatの仕組み

Tomcatは役割ごとに複数のコンポーネントから構成されている。(外部からの接続を受け付けるコネクタや実行エンジン部分など)

この本にめちゃめちゃ詳しく載ってるので、中身を知りたい方は必読。
https://www.amazon.co.jp/%e8%a9%b3%e8%a7%a3-tomcat-%e8%97%a4%e9%87%8e-%e5%9c%ad%e4%b8%80/dp/4873117054

ブロッキングI/O

Tomcat7以前のデフォルトのHTTP1.1のコネクタであるHttp11Protocol(JIoEndpoint+Http11ConnectionHandler)はブロッキングI/Oを利用している。
https://tomcat.apache.org/tomcat-8.0-doc/config/http.html#connector_comparison

f:id:kimulla:20161210011429p:plain

JIoEndpointクラスのAcceptorでsocketをaccept()して、workerスレッドでリクエスト処理を実行する。

protected class Acceptor extends AbstractEndpoint.Acceptor {
  @Override
  public void run() {
    // Loop until we receive a shutdown command
    while (running) {
      ...
      // (socket.accept()で待ち受ける)
      socket = serverSocketFactory.acceptSocket(serverSocket);
      ...
      // 
      if (!processSocket(socket)) {
        countDownConnection();
        // Close socket right away
        closeSocket(socket);
      }
      ... 
    }

  protected boolean processSocket(Socket socket) {
    ...
    //別スレッドで実行する
    getExecutor().execute(new SocketProcessor(wrapper));
    ...
    return true;
  }
..
public class DefaultServerSocketFactory implements ServerSocketFactory {
  ...
  @Override
  public Socket acceptSocket(ServerSocket socket) throws IOException {
    return socket.accept();
  }
  ...
}
HTTP Keep alive

HTTP Keep aliveを実現するためにはSocketを開きっぱなしで次のリクエストを待たないといけないため、workerを占有して次のリクエストを待つことになる。

f:id:kimulla:20161210011859p:plain

HTTP Keep aliveを実現するために、リクエスト処理が完了したあともスレッドがRUNNINGのままになる。

f:id:kimulla:20161208232936p:plain

今までありがとうBIOコネクタ

Tomcat8.5.0からはBIOコネクタが無くなり、デフォルトはNioコネクタになった。
今までありがとうBIOコネクタ!あったばっかりだけど達者でな!

ノンブロッキングI/O

TomcatのHttp11NioProtocol(+NioEndpoint+Http11ConnectionHandler)はノンブロッキングI/O。ソースコードの抜粋が難しいので図だけ。

f:id:kimulla:20161210012011p:plain

SocketProcessorの実行タイミング

本当にSocketがReadableになったらSocketProcessorを起動するのか確かめる。

まずHTTPリクエストをラインごとに送信できるクライアントを用意する。

@Slf4j
public class Client {
  public static void main(String[] args) {
  try {
      try (
          Socket socket = new Socket("localhost", 8080);
          PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
          BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
          BufferedReader keyIn = new BufferedReader(new InputStreamReader(System.in));
      ) {
        log.info("サーバに接続しました " + socket.getLocalSocketAddress() + " to " + socket.getRemoteSocketAddress());

        String input;
        while (!(input = keyIn.readLine()).equals("enter")) {
          out.println(input);
          log.info("send " + input + " to "  + socket.getRemoteSocketAddress());
        }

        while (true) {
          input = in.readLine();
          if (input == null) {
            break;
          }
          log.info(input);
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}


コンソールから以下のHTTPリクエストを1行ずつ送信してみる。

GET / HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: ja,en-US;q=0.8,en;q=0.6


このとき、NioEndpointのPollerにブレークを打つと、1行送信のたびにworkerがリクエスト解析処理を始めるのがわかった。

以下がSelectorの監視イベントをループして拾ってるところ。

public class Poller implements Runnable {
  @Override
  public void run() {
    ...
   Iterator<SelectionKey> iterator =
     keyCount > 0 ? selector.selectedKeys().iterator() : null;
    ...
    // Walk through the collection of ready keys and dispatch
    // any active event.
    while (iterator != null && iterator.hasNext()) {
      SelectionKey sk = iterator.next();
      NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
      // Attachment may be null if another thread has called
      // cancelledKey()
      if (attachment == null) {
        iterator.remove();
      } else {
        iterator.remove();
        processKey(sk, attachment);
      }
    }//while
    ...
  }
...
}
Tomcatはリクエストごとにworkerスレッドを消費する

PollerがSelectorを監視するスレッドは共有されるが、リクエスト解析以降の処理はリクエストごとにworkerスレッドを占有する。そのため、worker数の上限が同時処理数の上限になる。

tomcatコネクタについてはこれが参考になる。
http://events.linuxfoundation.org/sites/events/files/slides/tomcat%20connectors.pdf

HTTP Keep alive

NioはSelectorを利用すれば読み込み可能になったタイミングで通知されるため、リクエストがあったタイミングでSocketProcessorを割り当てることができる。
そのため、HTTP Keep aliveのためにworkerを占有しなくて済む。ここがNioにしてよかった一番のポイントな気がする。

f:id:kimulla:20161210012113p:plain

1つのリクエスト処理が完了したあとは、スレッドがWAITになる。

f:id:kimulla:20161208233402p:plain

同時処理数の上限はworker数のため、BIOと比較してHTTP Keep aliveして次のリクエスト待ちになっているソケット分を節約できるようになった。

Servlet3.0の非同期処理

重たい処理や長い処理をするときにworkerスレッドを解放するための仕組み。
ロングポーリングやSSEなどでHTTPのコネクションを維持しているとworkerスレッドを占有するため、同時リクエスト数の上限にひっかかる可能性がある。

f:id:kimulla:20161210012123p:plain

そのため、workerスレッドは重たくない処理だけで使うようにする。

レスポンスをまとめて書き込むパターン。

f:id:kimulla:20161210012159p:plain

chunckedで送るパターン。

f:id:kimulla:20161210013317p:plain

コード例

ServletのdoGet()の処理は何もせずにとりあえず抜ける。
別スレッドを起動してAsyncContextを利用してレスポンスを書き込む。

今回はchunckedで送る方法を試す。

@Slf4j
@WebServlet(urlPatterns = "/", asyncSupported = true)
public class EchoServlet extends HttpServlet {
  Executor executor = new ScheduledThreadPoolExecutor(10);

  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    log.info("begin doGet");
    AsyncContext ac = req.startAsync();
    executor.execute(new SlowTask(ac));
    log.info("end doGet");
  }
}

@Slf4j
class SlowTask implements Runnable {
  private AsyncContext ac;

  SlowTask(AsyncContext ac) {
    this.ac = ac;
  }

  @Override
  public void run() {
    log.info("begin AsyncContext#start");

    try {
      PrintWriter writer = ac.getResponse().getWriter();
      for (int i = 0; i < 5; i++) {
        writer.println("task :" + i);
        writer.flush();
        log.info("send chuncked data");
        TimeUnit.SECONDS.sleep(2);
      }
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
    ac.complete();
    log.info("end AsyncContext#start");
  }
}

動作確認

curlでリクエストを送るとちょっとずつ受信されるのがわかる。

$ curl localhost:8080/async-context/
task :0 // チャンクでちょっとずつ返ってくる
task :1 // チャンクでちょっとずつ返ってくる
task :2 // チャンクでちょっとずつ返ってくる
task :3 // チャンクでちょっとずつ返ってくる
task :4 // チャンクでちょっとずつ返ってくる


またログを見るとSlowTaskが別スレッドで実行されていることがわかる。

22:26:45.003 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - begin doGet
22:26:45.009 [http-nio-8080-exec-1] INFO example.com.echo.EchoServlet - end doGet
22:26:45.009 [pool-1-thread-1] INFO example.com.echo.SlowTask - begin AsyncContext#start
22:26:45.013 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:47.015 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:49.016 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:51.018 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:53.019 [pool-1-thread-1] INFO example.com.echo.SlowTask - send chuncked data
22:26:55.021 [pool-1-thread-1] INFO example.com.echo.SlowTask - end AsyncContext#start

疑問

結局はアプリケーション内でスレッドを生成するんだから、APサーバ全体で見たときのスレッド生成コストは変わらない気がする。
アプリケーション内でスレッドを生成せずに、workerスレッド数の上限を上げればいいと思った。いまいち理解できてない。

Servlet APIブロッキングAPI

HttpServletRequest,HttpServletResponseのreadやwriteをするためのAPIとして
HttpServletRequest#getInputStream(),HttpServletResponse#OutputStream()がある
が、これはブロッキングAPI

なぜなら、ServletInputStreamの実装クラスはInputStream#read()を実装しないといけない。http://docs.oracle.com/javaee/7/api/javax/servlet/servletinputstream.html

This is an abstract class that a servlet container implements. Subclasses of this class must implement the java.io.InputStream.read() method.

で、InputStreamのAPIhttps://docs.oracle.com/javase/jp/8/docs/api/java/io/inputstream.html#read--

入力ストリームからデータの次のバイトを読み込みます。バイト値は、0 - 255の範囲のintとして返されます。ストリームの終わりに達したために読み込むバイトがない場合は、-1が返されます。入力データが読み込めるようになるか、ストリームの終わりが検出されるか、または例外が発生するまで、このメソッドはブロックされます。

つまりブロッキングAPI。HttpServletResponse#OutputStream()も同様。

Nioコネクタのマニュアルを見ても、HTTPヘッダを読み込むまではノンブロッキングだが、肝心のHTTPボディを読み込む箇所はブロッキングな処理らしい。
Tomcatのマニュアルhttp://tomcat.apache.org/tomcat-8.0-doc/config/http.html#connector_comparison

疑問

Tomcatサーバはリクエストを読み込み終わる前にHttpServletを実行するけど、なんでなんだろう。Servletの仕様書やTomcatのリファレンスに記載が見当たらないので正解はよくわからないけど、無限にリクエストボディを送り続けられたときに処理できなくなるからかな?リクエストボディの値が処理で必要になるとも限らないので、HttpServletで実際に使いたいときに読み込めばよい、ということかな?

Servlet3.1のノンブロッキングI/O

HTTPのボディに対する読み書きをノンブロッキングにするための仕組み。
https://blogs.oracle.com/wlc/entry/javaee_c116

実装方法

読み込み ReadListner
書き込み WriteListener

コード例

とりあえず読み込みしたらログに出すだけ。

@Slf4j
@WebServlet(urlPatterns = "/non-blocking", asyncSupported = true)
public class NonBlockingIOServlet extends HttpServlet {

  @Override
  public void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    log.info("begin doGet");
    AsyncContext ctx = req.startAsync();
    ServletInputStream input = req.getInputStream();
    input.setReadListener(new NonBlockingReadListener(input, ctx));
    log.info("end doGet");
  }
}

@Slf4j
class NonBlockingReadListener implements ReadListener {
  private final ServletInputStream inputStream;
  private final AsyncContext ctx;

  NonBlockingReadListener(ServletInputStream inputStream, AsyncContext ctx) {
    log.info("ReadListener is initialized");
    this.inputStream = inputStream;
    this.ctx = ctx;
  }

  @Override
  public void onDataAvailable() throws IOException {
    log.info("onDataAvaliable");

    StringBuilder sb = new StringBuilder();
    int len = -1;
    byte b[] = new byte[1024];

    while (!inputStream.isFinished() && inputStream.isReady() && (len = inputStream.read(b)) != -1) {
      String data = new String(b, 0, len);
      log.info("recieved : " + data);
    }
  }

  @Override
  public void onAllDataRead() throws IOException {
    log.info("onAllDataRead");
    ctx.complete();
  }

  @Override
  public void onError(Throwable throwable) {
    log.info("onError : " + throwable);
    ctx.complete();
  }
}


またクライアントを起動してリクエストを投げてみる。(Telnetでもいいです)

java -jar target\client-1.0-SNAPSHOT-jar-with-dependencies.jar
POST /servlet-api/non-blocking HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Content-Length: 210000003

request body .... //ここを1行ずつ送る
request body .... //ここを1行ずつ送る


データ到着のたびにonDataAvailable()が実行される。

12 08, 2016 11:03:34 午後 org.apache.coyote.AbstractProtocol start
情報: Starting ProtocolHandler [http-nio-8080]
23:03:48.905 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - begin doGet
23:03:48.909 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - ReadListener is initialized
23:03:48.909 [http-nio-8080-exec-1] INFO example.com.NonBlockingIOServlet - end doGet
23:03:48.910 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - onDataAvaliable
23:03:48.910 [http-nio-8080-exec-1] INFO example.com.NonBlockingReadListener - recieved : request body ....

23:04:04.735 [http-nio-8080-exec-2] INFO example.com.NonBlockingReadListener - onDataAvaliable
23:04:04.735 [http-nio-8080-exec-2] INFO example.com.NonBlockingReadListener - recieved : request body ....

何がいいのか

  • HTTPのボディも効率的に読み書きできるようになるため、でかいファイルアップロードや低速なネットワークからのアクセスを効率的に処理できる。

何が困るか

  • 実装が難しい

そして時代はSpring5へ

Controllerの引数や戻り値をReactorのAPIにしとくと、Servlet3.1のノンブロッキングI/Oを利用して読み書きしてくれるようになる。
http://www.slideshare.net/takuyaiwatsuka/spring-5

結論

IOTやモバイル端末の普及により、低速なネットワークからの大量のアクセスが増えてきた昨今、cpuを効率的に利用しつつ大量のリクエストを処理できるノンブロッキングな処理というのが注目を集めている。
ただし、ノンブロッキング処理はたいがい難易度が上がるので、必要になったら採用を検討するくらいでちょうど良さそう。

ということで俺はギョウミーアプリに戻るぞー!ジョジョー!

あとがき

これ調べるのにまるまる1ヶ月くらいかかったので、虚無感がすごい。

サンプルコード

github.com

Springの@RequestScopeや@SessionScopeは結局どこに保存されるのか?我々は真相に迫った

Java Spring

ある日の出来事

SpringのBeanのスコープ、便利ですよね。ライフサイクル管理を任せられるのはDIコンテナを利用するメリットの大きなところだと思います。

いつもは何も考えずに以下のようにコーディングして、Springコンテナにスコープ管理を任せていました。

@Component
@SessionScope
public class User {}

が、ある日、ある疑問が…。このBeanって結局どこに格納されてるんだろう…@SessionScopeっていうくらいだからセッションに格納される?

我々は真相に迫った

まずはデバッグ

HttpSession#setAttribute(String name, Object value)にブレークを打ってみた。
そうすると、name="scopedTarget.user"のメソッド呼び出しで以下のスタックトレースが取得できた。equalsで比較したら同じインスタンスだった(厳密にいうと、今回は@Autowiredで取得できるbeanでcglibがかかるので、interfaceベースのproxyに変更してproxyを外したuserインスタンスとhttpSessionから取得したインスタンスを比較した)。

これはSessionScopeクラスがあやしそう。

"http-nio-8080-exec-1"@4,828 in group "main": RUNNING
setAttribute():137, StandardSessionFacade {org.apache.catalina.session}
setAttribute():172, ServletRequestAttributes {org.springframework.web.context.request}
get():45, AbstractRequestAttributesScope {org.springframework.web.context.request}
get():93, SessionScope {org.springframework.web.context.request}
doGetBean():340, AbstractBeanFactory {org.springframework.beans.factory.support}
getBean():197, AbstractBeanFactory {org.springframework.beans.factory.support}
getTarget():35, SimpleBeanTargetSource {org.springframework.aop.target}
getTargetObject():73, PrintRestController {com.example}
printOnlyScopeBean():62, PrintRestController {com.example}
print():54, PrintRestController {com.example}
print():30, PrintRestController {com.example}

Javadocを読んでみる

SessionScope(@SessionScopeとは違う)はScopeインタフェース(@Scopeとは違う)を実装したクラスのひとつ。そのほかにもRequestScopeなど、スコープに応じたクラスが存在するらしい。

そしてScopeインタフェースは、スコープごとのCRUD操作(SessionScopeの場合はbeanをセッションに永続化したりセッションから削除したりする)を定義する、と。

public interface Scope {
  /**
   * Return the object with the given name from the underlying scope,
   * {@link org.springframework.beans.factory.ObjectFactory#getObject() creating it}
   * if not found in the underlying storage mechanism.
   * <p>This is the central operation of a Scope, and the only operation
   * that is absolutely required.
   * @param name the name of the object to retrieve
   * @param objectFactory the {@link ObjectFactory} to use to create the scoped
   * object if it is not present in the underlying storage mechanism
   * @return the desired object (never {@code null})
   * @throws IllegalStateException if the underlying scope is not currently active
   */
  Object get(String name, ObjectFactory<?> objectFactory);

  Object remove(String name);

Scope#getの具体的な処理はAbstractRequestAttributesScope#getで定義されている。永続化されてないbeanがあれば永続化する。

public abstract class AbstractRequestAttributesScope implements Scope {

  @Override
  public Object get(String name, ObjectFactory<?> objectFactory) {
    RequestAttributes attributes = RequestContextHolder.currentRequestAttributes();
    Object scopedObject = attributes.getAttribute(name, getScope());
    if (scopedObject == null) {
      scopedObject = objectFactory.getObject();
      attributes.setAttribute(name, scopedObject, getScope());
    }
    return scopedObject;
  }

永続化先への具体的な操作はRequestAttributesクラスで行われる。
デバッグしてみたら実装クラスにはServletRequestAttributesが利用されてた。

 /**
  * ..
  * <p>Relies on a thread-bound {@link RequestAttributes} instance, which
  * can be exported through {@link RequestContextListener},
  * {@link org.springframework.web.filter.RequestContextFilter} or
  * {@link org.springframework.web.servlet.DispatcherServlet}.
  * ..
  */
 public class SessionScope extends AbstractRequestAttributesScope {

確かにServletRequestAttributesでHttpSession#setAttributeが呼ばれていた。

public class ServletRequestAttributes extends AbstractRequestAttributes {
  @Override
  public void setAttribute(String name, Object value, int scope) {
    if (scope == SCOPE_REQUEST) {
      if (!isRequestActive()) {
        throw new IllegalStateException(
         "Cannot set request attribute - request is not active anymore!");
      }
      this.request.setAttribute(name, value);
    }
    else {
      HttpSession session = getSession(true);
      this.sessionAttributesToUpdate.remove(name);
      session.setAttribute(name, value);
    }
  }

@RequestScopeも似たような処理で、書き込み先はHttpServletRequestだった。

結論

Springの@RequestScopeや@SessionScopeはどこから来てどこへ行くのか?我々は真相に迫ったが、あまり驚きはなかった。

追加

調べたことがそのままspringのリファレンスに書いてた。ちゃんと読まないとダメですね。
http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#beans-factory-scopes-custom

Springの@Asyncで非同期処理をする

Java SpringBoot Spring

@Asyncアノテーション

Spring Frameworkは、@Asyncアノテーションを付与すると別スレッドで処理を実行できるようになる。

34. Task Execution and Scheduling

サンプルコード

ChildBeanのexecute()メソッドを別スレッドで実行したいとする。

@Slf4j
@Component
class ParentBean {
  @Autowired
  ChildBean child;

  public String execute(){
      log.info("hello");
      child.execute();
  }
}

非同期にしたい処理に@Asyncを付与する。
(@Asyncはクラス単位にも付与できる)

@Slf4j
@Component
class ChildBean {
  @Async
  public String execute(){
      log.info("hello");
  }
}

非同期処理はデフォルトだと有効になっていないので、@EnableAsyncで有効化する。

@EnableAsync
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Autowired
    Parent parent;

    @Bean
    public CommandLineRunner getCommandLineRunner() {
        return args -> {
            parent.execute();
        };
    }
}

メインメソッドを実行すると、ログからも別スレッドで実行されていることが読み取れる。

2016-11-01 22:32:21.273  INFO 8712 --- [  restartedMain] com.example.Controller                   : hello
2016-11-01 22:32:21.293  INFO 8712 --- [cTaskExecutor-1] com.example.Hoge                         : hello

スレッド生成ルールのカスタマイズ

デフォルトのスレッド生成クラスはSimpleAsyncTaskExecutorで、要求ごとにスレッドを生成する。そのため、ThreadPoolTaskExecutorなどを利用して、スレッドを生成しすぎないように、また、再利用できるように設定すべき。

34. Task Execution and Scheduling

WEBシステムで使うときの疑問点

WEBシステムで@Transactionalつけてるときなんかでも、@Asyncって使えるの?
よくわからなかったので、かるーく検証してみた。

検証の題材

1. 「タスクを登録する」ボタンを押すと、サーバ側でDBに0-10件のランダムなタスクを登録する。
2. サーバ側では、@Asyncアノテーションをつけたクラスを用意し、非同期にタスクを消化する。
3. 非同期処理を開始したら、クライアントにレスポンスをとりあえず返す
4. タスクは1sごとに1件消化し、その都度コミットする。このとき、@Transactionalをつけたメソッドでコミットされるかを調べる

f:id:kimulla:20161101225250p:plain

f:id:kimulla:20161101225254p:plain

dbにはh2を利用。h2のデフォルトのisolationレベルは「read commited」なので、コミットされたタイミングで別コネクションから参照可能になる。

ソースコード

登録と非同期更新部分を抜粋。
ソースコードgithubに。
github.com

@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("api/tasks")
public class TaskRestController {
  final TaskService service;
  ...

  @PostMapping
  public Task execute() {
    Task task = service.register();
    service.execute(task.getId());
    return task;
  }
}

1sごとに1タスク消化してコミットする。
そのために、トランザクションの区切れを別サービスに切り出し、メソッドを呼び出す。
(privateメソッドの呼び出しだと、@Transctionalが有効にならないため。これはそもそものAOPの制約。)

@Slf4j
@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
 ...

  @Override
  @Async
  public void execute(int id) {
    Task task = taskMapper.findOne(id);
    log.info(task.toString());

    // 1sごとにtaskを1こずつ消化していく
    while (task.getDone() < task.getAmount()) {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      service.execute(task);
    }
  }
}

メソッドに@Transactionalをつけ、コミットの境界にする。
このメソッドが実行されたタイミングでコミットされていれば、ポーリングしているコネクションからも変更内容が見えるはず。

@Slf4j
@Service
@AllArgsConstructor
public class ExecuteServiceImpl implements ExecuteService {
  final TaskMapper taskMapper;

  @Override
  @Transactional
  public void execute(Task task) {
    task.setDone(task.getDone() + 1);
    taskMapper.update(task);
    log.info(task.toString());
  }
}

検証結果

@Transactionalが意図したとおりに、ExecuteServiceImpl#executeの単位でコミットされているっぽい。(DBの変更がポーリングで別コネクションから参照できているため)

今回利用したSpringのTxマネージャはDataSourceTransactionManager。
Javadocを見ると以下のように書かれているので、@Transactionalがついたメソッドが1スレッド内ならば、例えリクエストスレッドとは違っても動く様子。
逆に、@Transactionalついたメソッドから@Asyncと@Tranasctionalがついたメソッドを呼び出しても、呼び出し元とは別のコネクションを張る(Txが分かれる)と読める。

 ...
Binds a JDBC Connection from the specified DataSource to the current thread,
potentially allowing for one thread-bound Connection per DataSource.

@Async使うときの注意点

@Asyncを付けたメソッドは別スレッドで実行されるため、スレッドローカルで管理している値は参照できない。まあ別スレッドで実行してるんだから、あたりまえっちゃあ、あたりまえな動きですが。

@RequestScopeのBeanや@SessionScopeのBeanを@Autowiredしてみようとしたけど、以下のエラーがでた。リクエストスレッドごとにBeanを管理してるんでしょう、きっと。

2016-11-01 23:23:54.308 ERROR 6720 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.services.TaskServiceImpl.execute(int)'.

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.sessionBean': Scope 'session' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:355) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.getTarget(CglibAopProxy.java:687) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:637) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at com.example.model.SessionBean$$EnhancerBySpringCGLIB$$161bd67b.toString(<generated>) ~[classes/:na]

そのほか、スレッドローカルな値(例えばSecurityContextHolderなど)も、参照時にエラーとなるはずなので、業務で使いたいならメソッド引数として渡す必要がある。

Spring Framework で同一アプリ内でのイベントを扱う(ApplicationEvent、EventPublisher)

Java Spring SpringBoot Servlet

Spring Frameworkには、イベントを扱うための機能がある。
イベントの登場人物は以下。

ロール 役割 実装方法
Publisher イベントを発行する ApplicationEventPublisher
Listener イベントを受け取る @EventListener、ApplicationListener

なぜイベントを使うのか

イベントを利用すると、コンポーネント間を疎結合に実装することができる。
あるイベントが起きたときに、そのイベントきっかけで実施しないといけない処理が多くなるようなときに使うと、そのイベントに対してどのような処理をするべきかという責務が特定のServiceクラスにふくれあがることなく、それぞれのListenerにもっていけるので見通しがよくなる。(と思う)
また、ライブラリでイベントを発行しておけば、ライブラリ本体に手を入れなくても特定タイミングで処理をはさみこめるようになる。(Springコンテナの初期化が終わったらxxxの処理をしたい、とか)

デフォルトで提供されているイベント

Spring Frameworkにはデフォルトでいくつかのイベントが定義されているので、コンテナのCRUDに関する任意のタイミングで、アプリケーション実装者が任意の処理をはさみこめるようになっている。

イベント名 イベントが発生するタイミング
ContextRefreshedEvent ConfigurableApplicationContext のrefresh()
ContextStartedEvent ConfigurableApplicationContextのstart()
ContextStoppedEvent ConfigurableApplicationContextのstop()
ContextClosedEvent ConfigurableApplicationContextのclose()
RequestHandledEvent リクエスト処理が終わったとき(WEB限定)

Listenerの実装方法

Listenerの実装方法は2通りある。

  • ApplicationListenerを実装する方法(~spring4.1)
  • @EventListenerアノテーションを使う方法(spring4.2~)

ApplicationListenerを実装する方法(~spring4.1)

ApplicationListener<T>のTにハンドリングしたいイベントを指定する。

@Slf4j
@Component
public class BeforeSpring42Listener implements ApplicationListener<ContextClosedEvent> {
  @Override
  public void onApplicationEvent(ContextClosedEvent event) {
    log.info("bood bye");
  }
}

@Eventlistenerアノテーションを使う方法(spring4.2~)

メソッド引数にハンドリングしたいイベントを指定し、@EventListenerを付与する。

@Component
@Slf4j
public class AfterSpring42listener {
  @EventListener
  public void processContextClosedEvent(ContextClosedEvent event) {
    log.info("good bye");
  }
}

カスタムイベントの実装方法

上記以外に、任意のイベントを作成することもできる。

サンプルアプリの題材説明

カスタムイベントのサンプルを書いた。
題材はスロットマシーン。

通知するApplicationEventをカスタムイベントにする。

Publisher -> (ApplicationEvent) -> Listener
クラス 役割
SlotMachine 3回スロットを回すクラス。スロットを回すときにSlotStartEventを発行する。
Rotation スロット1回を表現するクラス
SlotStartEvent スロットが始まったことを表現するイベント
BeforeSpring42Listener ApplicationListnerで実装したリスナ
AfterSpring42listener @EventListenerで実装したリスナ
FeverEvent 大当たりが発生したことを表現するイベント


github.com

ApplicationEvent

ApplicationEventを拡張して任意のイベントを作成する。

public class SlotStartEvent extends ApplicationEvent {
  private final Rotation rotation;

  public SlotStartEvent(Object source, Rotation rotation) {
    super(source);
    this.rotation = rotation;
  }

  public Rotation getRotation() {
    return rotation;
  }
}

Publisher

ApplicationEventPublisher を使ってイベントを発行する。

@Component
@Data
@Slf4j
public class SlotMachine {
  private final ApplicationEventPublisher publisher;
  public void execute() {
    LongStream.rangeClosed(1, 3).forEach(i -> {
      log.info(">>>-------------------------");
      this.publisher.publishEvent(new SlotStartEvent(this, new Rotation(i)));
      log.info("<<<-------------------------");
    });
  }
}

Listener

ApplicationEventを受け取って処理する。

@Component
@Slf4j
public class BeforeSpring42Listener implements ApplicationListener<SlotStartEvent> {
  @Override
  public void onApplicationEvent(SlotStartEvent event) {
    log.info("before ver4.2 listen : " + event.getRotation());
  }
}
@Component
@Slf4j
public class AfterSpring42listener {
  @EventListener
  public void processExecuteStartEvent(SlotStartEvent event) {
    log.info("after ver4.2 listen : " + event.getRotation());
  }

実行結果

2016-09-23 11:50:52.290  INFO 5136 --- [           main] com.example.SlotMachine                  : >>>-------------------------
2016-09-23 11:50:52.310  INFO 5136 --- [           main] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=1)
2016-09-23 11:50:52.310  INFO 5136 --- [           main] com.example.BeforeSpring42Listener       : before ver4.2 listen : Rotation(id=1)
2016-09-23 11:50:52.310  INFO 5136 --- [           main] com.example.SlotMachine                  : <<<-------------------------
2016-09-23 11:50:52.310  INFO 5136 --- [           main] com.example.SlotMachine                  : >>>-------------------------
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=2)
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.BeforeSpring42Listener       : before ver4.2 listen : Rotation(id=2)
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.SlotMachine                  : <<<-------------------------
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.SlotMachine                  : >>>-------------------------
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=3)
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.BeforeSpring42Listener       : before ver4.2 listen : Rotation(id=3)
2016-09-23 11:50:52.311  INFO 5136 --- [           main] com.example.SlotMachine                  : <<<-------------------------

注意点

デフォルトの動作だと、イベントを発行したスレッドと同一のスレッドでListenerが実行される。(実行結果のログをみても、すべてmainスレッドで処理されているとわかる)
同期的に処理していればスレッドに紐づいたコンテキスト(トランザクションコンテキストやセキュリティコンテキスト)を取得できるため、ある時には有効だが、非同期に実行したい場合もある。

非同期にListenerを実行する

まず、@EnableAsyncで非同期を有効にしたうえでTaskExecutorを用意する。

@SpringBootApplication
@EnableAsync
public class EventExampleApplication {
  public static void main(String[] args) {
    SpringApplication.run(EventExampleApplication.class, args);
  }

  @Autowired
  private SlotMachine machine;

  @Bean
  public CommandLineRunner execute() {
    return args -> {
      machine.execute();
    };
  }

  @Bean
  public TaskExecutor getTaskExecutor(){
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    return executor;
  }
}

@Eventlistenerを付与したメソッドに@Asyncを付与する。
ver4.2以前の場合は、SimpleApplicationEventMulticasterにThreadPoolTaskExecutorを指定してBean定義すればいけそうだけど、めんどくさいから試してない。

@Component
@Slf4j
public class AfterSpring42listener {
  @EventListener
  @Async
  public void processExecuteStartEvent(SlotStartEvent event) {
    log.info("after ver4.2 listen : " + event.getRotation());
  }

実行結果

2016-09-23 12:10:03.511  INFO 3556 --- [tTaskExecutor-1] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=1)
2016-09-23 12:10:03.521  INFO 3556 --- [tTaskExecutor-4] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=3)
2016-09-23 12:10:03.521  INFO 3556 --- [tTaskExecutor-3] com.example.AfterSpring42listener        : after ver4.2 listen : Rotation(id=2)

@EventListenerでSpEL式を使う

Spring Framework4.3からは、@EventListenerにSpEL式が記述できるようになった。
また、メソッドの戻り値にApplicationEventを取ると、イベントを発行することができる。(ver4.2~)

  @EventListener(condition = "#event.rotation.id == 2")
  public FeverEvent conditionOn2(SlotStartEvent event) {
    log.info("fever event start");
    return new FeverEvent(event, event.getRotation());
  }  

まとめ

同一ApplicationContext上でのイベントを気軽に作れる。