7. java.util.concurrentパッケージ

2005.11.16 株式会社四次元データ 宮澤了祐

JDK5.0で新たにjava.util.concurrentパッケージが追加されました。 マルチスレッドでの処理に対して有用なクラスが含まれています。

7.1. Executor

非同期で行いたい処理がいくつかあるとします。 それらを全て実行するために、一つの処理(=「タスク」、以下タスクと表記します)に対して一つのスレッドを作成していては、 リソースを多く消費することになります。

そのため、既に起動されているスレッドを再利用する等の対策を取ることが考えられます。 JDK5.0からExecutorという概念を取り入れることにより、スレッドの再利用やスケジューリングなど、並列処理を簡単に実現できるようになりました。

ExecutorはRunnableを引数に持つexecute()メソッドを持つインタフェースです。

以下のように使用します。

1
2
3
4
5
Executor ex = Executors.newSingleThreadExecutor();
Runnable r1 = ....
Runnable r2 = ....
ex.execute(r1);
ex.execute(r2);

java.util.concurrent.Executorsクラスは、Executorインタフェースなどの実装インスタンスを返す、メソッドを提供するユーティリティクラスです。

Executors.newSingleExecutor()メソッドにより、一つのスレッドでタスクを処理するExecutorを取得します。 Executorのexecute()メソッドでRunnableを実装したタスクを送信することにより、タスクを実行することが出来ます。

今までは、このような一つのスレッドで複数のタスクを処理するためには、もっと煩雑なコードになってしまいましたが、 Executorによってタスクの実行部分が分離されることで、利用者はタスクの内容と処理の順番を考慮する必要がなくなりました。

Executorsに定義されている実装インスタンスを返すメソッドは、newSingleThread()メソッドを含め次のものがあります。

  • newSingleThreadExecutor()
       一つのスレッドでタスクの処理を行います。
  • newFixedThreadPool()
       指定した数のスレッドを作成し、タスクの処理を行います。
  • newCachedThreadPool()
       必要に応じて自動的にスレッドを作成し、タスクの処理を行います。
    次の項でそれらの利用法を解説します。

7.2. ExecutorService

ExecutorServiceインタフェースはExecutorインタフェースを拡張し、状態追跡やタスク処理の中断などを可能にしたインタフェースです。 ExecutorServiceの主なメソッドには以下のものがあります。

Method Comment
execute() Executorより継承。タスクを送信する。
submit() タスクの計算結果や状態を取得するための、Futureオブジェクトを返すメソッド。
shutdown() シャットダウンを実行します。以前に送信したタスクは実行しますが、新規タスクは受け入れません。
shutdownNow() 現在実行中のタスクの中断を試み、待機中のタスクの処理を停止します。

concurrentパッケージに存在する多くの実装インスタンスは、ExecutorServiceインタフェースを実装しています。

7.3. スレッドプール

スレッドプールとはタスクをキューに追加し、プールしておいたスレッドにより、順次タスクを処理していくマルチスレッドの形式の一つです。 必要以上にスレッドを作成しないために、リソースの無駄な消費を抑えるメリットがあります。 Executorsクラスの中で、スレッドプールを可能にしているクラスを返すメソッドについて解説します。

  • newSingleThreadExecutor()

単一のスレッドで送信されたタスクを実行するExecutorを作成します。 Executorが終了した後もタスクが残っていれば処理を続行します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyTask implements Runnable{
    private String tab="";
    public MyTask(int tab){
        for(int i=0; i<=tab; i++){
            this.tab += " ";
        }
    }
    public void run(){
        for(int i=0; i<3; i++){
            System.out.println(tab + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        ExecutorService ex = Executors.newSingleThreadExecutor();
        System.out.println("[Sending Tasks....]");
        for(int i=0; i<3; i++){
            ex.execute(new MyTask(i*3));
        }
        System.out.println("[Finish Sending!]");

        ex.shutdown();
        System.out.println("[ShutDown]");
    }
}

Executors.newSingleThreadExecutor()メソッドで、単一のスレッドでタスク処理を行う、 ExecutorServiceオブジェクトを作成します。 ExecutorServiceのexecute()メソッドでタスクを送信します。 送信されたタスクは0,1,2という数字を表示するものです。

これは次のように出力されます。 ※環境に依存するため、必ずしも一致するとは限りません。

[Sending Tasks....]
[Finish Sending!]
0
[ShutDown]
1
2
  0
  1
  2
    0
    1
    2

全てのタスクの処理が、1つ前のタスクの終了を待機していることが、わかると思います。

  • newFixedThreadPool()

指定した数のスレッドを使いまわしてタスクを処理します。 前項と同じタスクを使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class MyTask implements Runnable{
    ...
    public static void main(String[] args){
        ExecutorService ex = Executors.newFixedThreadPool(2);
        System.out.println("[Sending Tasks....]");
        for(int i=0; i<3; i++){
            ex.execute(new MyTask(i*3));
        }
        System.out.println("[Finish Sending!]");

        ex.shutdown();
        System.out.println("[ShutDown]");
    }
}

Exectors.newFixedThreadPool(int n)で、使用するスレッドの数を指定した、 ExecutorServiceオブジェクトを作成します。 引数にスレッドの数を指定します。

実行すると以下のようになります。 ※環境に依存するため、必ずしも一致するとは限りません。

[Sending Tasks....]
0
[Finish Sending!]
[ShutDown]
  0
1
  1
2
  2
    0
    1
    2

スレッドが二つで動作していることがわかります。

  • newCachedThreadPool()

複数のスレッドでタスクを処理しますが、もし処理の終了しているスレッドがあれば、そのスレッドを再利用します。 60秒使用されないスレッドは削除されます。 短時間で処理の終わるタスクを、大量に処理する場合に有用です。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MyTask implements Runnable{
    private int number;
    public MyTask(int number){
        this.number = number;
    }
    public void run(){
        System.out.println("Task" + number + " Start");
        try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        System.out.println("Task" + number + " End");
   }

    public static void main(String[] args){
        ExecutorService ex = Executors.newCachedThreadPool();
        Thread.currentThread().getThreadGroup().list();
        System.out.println("Sending Tasks....");
        for(int i=0; i<3; i++){
            ex.execute(new MyTask(i));
            Thread.currentThread().getThreadGroup().list();
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ex.shutdown();
    }
}

タスクの開始時、および終了時にメッセージを表示するタスクを使用します。 ThreadGroupのlist()メソッドを使用し、現在稼働中のスレッドについての情報を表示します。

これは例えば次のように出力されます。

java.lang.ThreadGroup[name=main,maxpri=10]
Thread[main,5,main]
[Sending Tasks....]
java.lang.ThreadGroup[name=main,maxpri=10]
Thread[main,5,main]
Thread[pool-1-thread-1,5,main]
Task0 Start
java.lang.ThreadGroup[name=main,maxpri=10]
Thread[main,5,main]
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Task1 Start
Task0 End
Task2 Start
java.lang.ThreadGroup[name=main,maxpri=10]
Thread[main,5,main]
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Task1 End
Task2 End|

ExecutorServiceを取得した際には、まだタスク処理用のスレッドはありません。 一つ目のタスクを送信した際にタスク処理用のスレッドが一つ作成されます。 二つ目のタスクを送信した時にはまだ処理が終わっていないので、新たにスレッドを作成しました。 三つ目のタスクを送信した時にはTask0は処理が終わっていたので、スレッドは作成されません。 このように、タスク送信時に空いているスレッドがあればそれを使いまわすことが出来ます。