SIerだけど技術やりたいブログ

5年目のSIerのブログです

Springの@Asyncで非同期処理をする

@Asyncアノテーション

Spring Frameworkは、@Asyncアノテーションを付与すると別スレッドで処理を実行できるようになる。

34. Task Execution and Scheduling

サンプルコード

ChildBeanのexecute()メソッドを別スレッドで実行したいとする。

@Slf4j
@Component
class ParentBean {
  @Autowired
  ChildBean child;

  public String execute(){
      log.info("hello");
      child.execute();
  }
}

非同期にしたい処理に@Asyncを付与する。(@Asyncはクラス単位にも付与できる)

@Slf4j
@Component
class ChildBean {
  @Async
  public String execute(){
      log.info("hello");
  }
}

非同期処理はデフォルトだと有効になっていないため、@EnableAsync※で有効化する。

@EnableAsync
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Autowired
    Parent parent;

    @Bean
    public CommandLineRunner getCommandLineRunner() {
        return args -> {
            parent.execute();
        };
    }
}

メインメソッドを実行すると、ログからも別スレッドで実行されていることが読み取れる。

2016-11-01 22:32:21.273  INFO 8712 --- [  restartedMain] com.example.Controller                   : hello
2016-11-01 22:32:21.293  INFO 8712 --- [cTaskExecutor-1] com.example.Hoge                         : hello

※ @EnableAsyncアノテーションをつけると有効になる仕組みは、以下に書きました。
kimulla.hatenablog.com

スレッド生成ルールのカスタマイズ

デフォルトのスレッド生成クラスはSimpleAsyncTaskExecutorで、要求ごとにスレッドを生成する。そのため、ThreadPoolTaskExecutorなどを利用して、スレッドを生成しすぎないように、また、再利用できるように設定すべき。

34. Task Execution and Scheduling

WEBシステムで使うときの疑問点

WEBシステムで@Transactionalつけてるときなんかでも、@Asyncって使えるの?
よくわからなかったので、かるーく検証してみた。

検証の題材

f:id:kimulla:20170825094617p:plain


1. 「タスクを登録する」ボタンを押すと、サーバ側でDBに0-10件のランダムなタスクを登録する。
2. サーバ側では、@Asyncアノテーションをつけたクラスを用意し、非同期にタスクを消化する。
3. 非同期処理を開始したら、クライアントにレスポンスをとりあえず返す
4. タスクは1sごとに1件消化し、その都度コミットする。このとき、@Transactionalをつけたメソッドでコミットされるかを調べる


dbにはh2を利用。h2のデフォルトのisolationレベルは「read commited」なので、コミットされたタイミングで別コネクションから参照可能になる。

ソースコード

登録と非同期更新部分を抜粋。
ソースコードgithubに。
github.com

@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("api/tasks")
public class TaskRestController {
  final TaskService service;
  ...

  @PostMapping
  public Task execute() {
    Task task = service.register();
    service.execute(task.getId());
    return task;
  }
}

1sごとに1タスク消化してコミットする。
そのために、トランザクションの区切れを別サービスに切り出し、メソッドを呼び出す。
(privateメソッドの呼び出しだと、@Transctionalが有効にならないため。これはそもそものAOPの制約。)

@Slf4j
@Service
@AllArgsConstructor
public class TaskServiceImpl implements TaskService {
 ...

  @Override
  @Async
  public void execute(int id) {
    Task task = taskMapper.findOne(id);
    log.info(task.toString());

    // 1sごとにtaskを1こずつ消化していく
    while (task.getDone() < task.getAmount()) {
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      service.execute(task);
    }
  }
}

メソッドに@Transactionalをつけ、コミットの境界にする。
このメソッドが実行されたタイミングでコミットされていれば、ポーリングしているコネクションからも変更内容が見えるはず。

@Slf4j
@Service
@AllArgsConstructor
public class ExecuteServiceImpl implements ExecuteService {
  final TaskMapper taskMapper;

  @Override
  @Transactional
  public void execute(Task task) {
    task.setDone(task.getDone() + 1);
    taskMapper.update(task);
    log.info(task.toString());
  }
}

検証結果

@Transactionalが意図したとおりに、ExecuteServiceImpl#executeの単位でコミットされているっぽい。(DBの変更がポーリングで別コネクションから参照できているため)

f:id:kimulla:20170825095811g:plain

今回利用したSpringのTxマネージャはDataSourceTransactionManager。Javadocを見ると以下のように書かれている。

 ...
Binds a JDBC Connection from the specified DataSource to the current thread,
potentially allowing for one thread-bound Connection per DataSource.

@Transactionalがついたメソッドが1スレッド内ならば、例えリクエストスレッドとは違っても動く様子。
逆に、@Transactionalついたメソッドから@Asyncと@Tranasctionalがついたメソッドを呼び出しても、呼び出し元とは別のコネクションを張る(Txが分かれる)と読める。

@Async使うときの注意点

@Asyncを付けたメソッドは別スレッドで実行されるため、スレッドローカルで管理している値は参照できない。まあ別スレッドで実行してるんだから、あたりまえっちゃあ、あたりまえな動きですが。

@RequestScopeのBeanや@SessionScopeのBeanを@Autowiredしてみようとしたけど、以下のエラーがでた。リクエストスレッドごとにBeanを管理してるんでしょう、きっと。

2016-11-01 23:23:54.308 ERROR 6720 --- [cTaskExecutor-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected error occurred invoking async method 'public void com.example.services.TaskServiceImpl.execute(int)'.

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.sessionBean': Scope 'session' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet/DispatcherPortlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:355) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.getTarget(CglibAopProxy.java:687) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:637) ~[spring-aop-4.3.3.RELEASE.jar:4.3.3.RELEASE]
        at com.example.model.SessionBean$$EnhancerBySpringCGLIB$$161bd67b.toString(<generated>) ~[classes/:na]

そのほか、スレッドローカルな値(例えばSecurityContextHolderなど)も、参照時にエラーとなるはずなので、処理で使いたいならメソッド引数として渡す必要がある。

同時実行数の制御

調べました。

kimulla.hatenablog.com

@Asyncがついているときは呼び出し元とは別トランザクションを貼るか、試してみる

Springの@TransactionalのpropagationはREQUIRED。そのため、通常は呼び出し元でトランザクションが張られていれば呼び出し元のトランザクションに参加する。

17. Transaction Management

ただし、@Transactional + @Async がついている場合は、呼び出し元の@Transactionalとは別のコネクションを張る(Txが分かれる)はず。

public class TaskServiceImpl implements TaskService {
    final ExecuteService service;

    @Override
    @Transactional
    public void execute(int id) {
        Task task = taskMapper.findOne(id);
        log.info(task.toString());

        // 1sごとにtaskを1こずつ消化していく
        while (task.getDone() < task.getAmount()) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            service.execute(task);
        }
    }
public class ExecuteServiceImpl implements ExecuteService {
    final TaskMapper taskMapper;

    @Override
    @Async
    @Transactional
    public void execute(Task task) {
        task.setDone(task.getDone() + 1);
        taskMapper.update(task);
        log.info(task.toString());
    }
}

トランザクションが呼び出し元とは分かれてる。

f:id:kimulla:20170825101817g:plain