SIerだけど技術やりたいブログ

5年目のSIerのブログです

Spring 非同期タスクの同時実行数を制限する方法

Springには非同期処理を実行するための@Asyncアノテーションがある。
34. Task Execution and Scheduling
kimulla.hatenablog.com



この非同期タスクの同時実行数を絞るための方法をメモる。

デフォルトだと非同期処理の実行にSimpleAsyncTaskExecutorが使われるため、@Asyncの呼び出しごとにスレッドが生成されて即実行されてしまう。


TaskExecutorにThreadPoolTaskExecutorを指定すると、スレッドがプールされるようになる。ThreadPoolTaskExecutorはjava標準のThreadPoolExecutorをラップしたようなクラスのため、パラメータで以下の設定ができる。

corePoolSize - アイドルであってもプール内に維持されるスレッドの数
maximumPoolSize - プール内で可能なスレッドの最大数
keepAliveTime - スレッドの数がコアよりも多い場合、これは超過したアイドル状態のスレッドが新しいタスクを待機してから終了するまでの最大時間
unit - keepAliveTime 引数の時間単位
workQueue - タスクが超過するまで保持するために使用するキュー。このキューは、execute メソッドで送信された Runnable タスクだけを保持する

corePoolSize (getCorePoolSize() を参照) と maximumPoolSize (getMaximumPoolSize() を参照) で設定された境界に従って、ThreadPoolExecutor は自動的にプールサイズを調整します (getPoolSize() を参照)。新しいタスクが execute(java.lang.Runnable) メソッドで送信され、corePoolSize よりも少ない数のスレッドが実行中である場合は、その他のワークスレッドがアイドル状態であっても、要求を処理するために新しいスレッドが作成されます。corePoolSize よりも多く、maximumPoolSize よりも少ない数のスレッドが実行中である場合、新しいスレッドが作成されるのはキューがいっぱいである場合だけです。corePoolSize と maximumPoolSize を同じ値に設定すると、固定サイズのスレッドプールが作成されます。maximumPoolSize を Integer.MAX_VALUE などの実質的にアンバウンド形式である値に設定すると、プールに任意の数の並行タスクを格納することができます。コアプールサイズと最大プールサイズは構築時にのみ設定されるのがもっとも一般的ですが、setCorePoolSize(int) および setMaximumPoolSize(int) を使用して動的に変更することもできます。

ということで、corePoolSize=maximumPoolSize=制限したい同時実行数 にしてThreadTaskExecutorを生成すれば同時実行数が制御できる。

ついでに、QueueCapacityでいくつまでタスクをキューにため込むかも制御できる。

以下、SpringBoot 1.5.6.RELEASE で動作確認する。

まずは非同期処理を有効にするために@EnableAsyncをつける。またtaskExecutorを生成し、@QualifierでBeanに名前をつける。(TaskExecutorがひとつだけなら@Qualifierは不要)

@EnableAsync
@SpringBootApplication
public class AsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

    @Bean
    @Qualifier("heavyTaskTaskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(5);
        return executor;
    }
}


非同期にしたい処理に@Asyncアノテーションをつける。また、名前をつけたTaskExecutorを利用するために@Asyncアノテーションの引数に指定する。

@Slf4j
@Service
public class TaskServiceImpl implements TaskService {

    @Async("heavyTaskTaskExecutor")
    @Override
    public void heavyTask() {
        try {
            log.info("start heavy task");
            TimeUnit.SECONDS.sleep(5);
            log.info("end heavy task");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

非同期処理の呼び出し側は呼び出すだけ。スレッドのキューサイズよりも多く呼び出した場合、TaskRejectedExceptionが発生する。そのため、@ExceptionHandlerでキャッチしてハンドリングする。

@RestController
@AllArgsConstructor
@RequestMapping("api/tasks")
public class TaskRestController {
    private final TaskService taskService;

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Boolean createNewTask() {
        taskService.heavyTask();
        return true;
    }

    @ExceptionHandler(TaskRejectedException.class)
    @ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
    public String handle() {
        return "too busy";
    }

}

実行すると同時実行数が制御されていることがわかる。
f:id:kimulla:20170805215224p:plain

お手軽に非同期タスクの同時実行数を制限できた。