8. java.util.concurrentパッケージ2

2005.12.14 株式会社四次元データ 宮澤了祐

8.1. スケジューリング

JDK5.0より指定した時間後に処理を実行したり、指定した周期で処理を実行するスケジューリングが可能になりました。 スケジューリングを可能にするクラスは、java.util.concurrent.ScheduledExecutorServiceインタフェースを実装しています。

java.util.concurrent.Executorsクラスにある、ScheduledExecutorServiceを実装したクラスを返す、 スタティックなメソッドを利用することで、スケジューリングを行うことができます。

Executorsクラスに用意されている、スケジューリングのためのExecutorは、次の二種類です。

  • newSingleThreadScheduledExecutor()
    一つのスレッドでタスク処理を行う。
  • newScheduledThreadPool()
    タスクを処理するスレッドの数を指定出来ます。

スケジューリングを行うメソッドを以下で解説します。

  • schedule()
1
schedule(Runnable command, long delay, TimeUnit unit)

指定した時間が経過したのちに第一引数で指定したタスクを実行します。 時間の指定はTimeUnitクラスによって、秒、ミリ秒、マイクロ秒、ナノ秒の単位で定めることが出来ます。

タスクを送信した時刻、タスクを開始した時刻、タスクが終了した時刻を表示するタスクを定義し、schedule()メソッドで送信します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MyTask2 implements Runnable {
    int number;
    long start;
    public MyTask2(int number, long start){
        this.number = number;
        this.start = start;
    }
    public void run() {
        System.out.print("Task" + number + " Start");
        System.out.println("(" + (System.currentTimeMillis() - start) + ")");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.print("Task" + number + " End  ");
        System.out.println("(" + (System.currentTimeMillis() - start) + ")");
    }

    public static void main(String[] args){
        ScheduledExecutorService ex = Executors.newSingleThreadScheduledExecutor();
        long start = System.currentTimeMillis();
        for(int i=0; i<3; i++){
            System.out.print("Task"+i+" Send ");
            System.out.println("(" + (System.currentTimeMillis() - start) + ")");
            ex.schedule(new MyTask2(i,start),1000,TimeUnit.MILLISECONDS);
        }
    }
}

Task0 Send (15)
Task1 Send (15)
Task2 Send (15)
Task0 Start(1031)
Task0 End (2031)
Task1 Start(2031)
Task1 End (3031)
Task2 Start(3031)
Task2 End (4031)

タスクが送信されてから1000ミリ秒後になって、初めて処理が開始されていることがわかります。

  • scheduleAtFixedRate()
1
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

タスクが送信されたのち、第二引数で指定した長さが経過すれば、タスク処理を開始します。 その後、タスクは順に処理されます。

タスク開始後、第三引数で指定した時間が経過していれば、タスク処理を再び開始します。 Executorが取り消されるか、アプリケーションが終了するまでタスク処理を周期的に実行します。

前項と同じタスクを使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public static void main(String[] args){
        ScheduledExecutorService ex = Executors.newSingleThreadScheduledExecutor();

        long start = System.currentTimeMillis();
        for(int i=0; i<3; i++){
            System.out.print("Task"+i+" Send ");
            System.out.println("(" + (System.currentTimeMillis() - start) + ")");
            ex.scheduleAtFixedRate(new MyTask2(i,start),1000,10000,TimeUnit.MILLISECONDS);
        }
}

これは次のように出力されます。

Task0 Send (0)
Task1 Send (31)
Task2 Send (31)
Task0 Start(1031)
Task0 End (2031)
Task1 Start(2031)
Task1 End (3030)
Task2 Start(3030)
Task2 End (4030)
Task0 Start(11029)
Task0 End (12029)
Task1 Start(12029)
Task1 End (13028)
Task2 Start(13028)
...

Task2が終了したのちに、Task0が再び実行されています。 実行されるのは、タスクを送信してからinitialDelay+Delay秒後です。 もしこの段階でタスクが終了していなければ、タスクが終了したのちに、新たにタスクが実行されます。

newScheduledThreadPool()を用いた場合でも、 全てのタスクが終了していれば、initial+Delay秒後にタスクを再び処理します。 その段階でタスクの処理が終了していなければ、全てのタスクを終了した後に、タスクの再処理を行います。 処理中でないスレッドがあった場合でも、全てのタスクが終了するまで待機します。

  • scheduleWithFixedDelay()
1
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

指定した遅延時刻によって周期的に実行される点など、 scheduleAtFixedRate()メソッドと基本的には変わりませんが、タスクの再実行のタイミングが違います。

1
2
3
4
5
6
7
8
9
public static void main(String[] args){
        ScheduledExecutorService ex = Executors.newSingleThreadScheduledExecutor();
        long start = System.currentTimeMillis();
        for(int i=0; i<3; i++){
            System.out.print("Task"+i+" Send ");
            System.out.println("(" + (System.currentTimeMillis() - start) + ")");
            ex.scheduleWithFixedDelay(new MyTask2(i,start),1000,10000,TimeUnit.MILLISECONDS);
        }
}

次のように出力されます。

Task0 Send (15)
Task1 Send (15)
Task2 Send (15)
Task0 Start(1031)
Task0 End (2032)
Task1 Start(2032)
Task1 End (3032)
Task2 Start(3032)
Task2 End (4032)
Task0 Start(12034)
Task0 End (13035)
Task1 Start(13035)
Task1 End (14035)
Task2 Start(14035)
Task2 End (15035)
...

引数のDelayは同じですが、実行されているのはTask0が終了してから10000ミリ秒後です。 つまりタスクが終了してから、再実行されるまでの期間を指定することが出来ます。

newScheduledThreadPool()を用いた場合でも、全てのタスクの処理が終了した後から、 指定した時間後にタスクの再処理を開始します。

8.2. Callableインタフェース

JDK5.0より、新たにjava.util.concurrent.Callableインタフェースが追加されました。 Runnableとは違い、値を返したり例外を投げることが出来ます。

ExecutorServiceのsubmit()メソッドにより、Callableタスクを送信します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;

public class MyCallableTask implements Callable<Date> {
    public Date call() throws Exception {
        Thread.sleep(1000);
        return new Date();
    }

    public static void main(String[] args){
        ExecutorService ex = Executors.newSingleThreadExecutor();
        Future<Date> future = ex.submit(new MyCallableTask());
        try {
             Date date = future.get();
             System.out.println(date);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        ex.shutdown();
    }
}

現在の時刻を返すタスクを設定します。 ExecutorServiceのsubmit()メソッドの返り値はFutureオブジェクトです。 Futureのget()メソッドによって、指定した型で値が返って来ます。 get()メソッドは、値が返ってくるまで待機します。 次のような形で出力されれば成功です。

Tue Jul 26 21:07:01 JST 2005

8.3. Futureインタフェース

Futureオブジェクトは、非同期計算のためのインタフェースです。 計算が完了したかどうかをチェックし、計算を待機し、計算結果を取得するためのメソッドが提供されます。

Futureインタフェースが提供するメソッドは、以下があります。

  • boolean cancel(boolean mayInterruptIfRunning)
    このタスクの実行の取り消しを試みます。
  • V get()
    必要に応じて、計算が完了するまで待機し、その後、結果を取得します。
  • V get(long timeout, TimeUnit unit)
    必要に応じて、計算が完了するのを、指定された時間まで待機します。
  • boolean isCancelled()
    このタスクが通常どおり完了する前に、取り消された場合は true を返します。
  • boolean isDone()
    このタスクが完了した場合は、true を返します。

get()メソッドは値が返って来るまで待機しますが、 待機する時間を指定することも可能です。 待機時間を指定するには次のメソッドを使います。

1
get(long timeout, TimeUnit unit);

指定した時間まで待機しても処理が完了しない場合は、java.util.concurrent.TimeoutExceptionが投げられます。

以下のように使用します。

1
2
3
4
5
6
7
8
Future<Date> future = ex.submit(new MyCallableTask());
try {
    System.out.println(future.get(1000,TimeUnit.MILLISECONDS));
} catch (TimeoutException e) {
    System.out.println("タイムアウトしました。タスクを終了します。");
    future.cancel(true);
    if(future.isCancelled())System.out.println("タスクを終了しました。");
}

Futureのget()メソッドを呼んだ瞬間より、以下のように並列処理が同期実行されます。

Futureのget()メソッドを呼び出す際に待機する時間を指定した場合、 その時間が経過するまでタスク(を処理するスレッド)との同期を行います。 指定した時間が経過すれば、呼び出し元に処理が戻り、並列で処理が行われます。

指定した時間内に処理が完了すれば、値が返ってきます。

待機する時間を指定しなければ、計算が終了するまで同期を行います。 もし呼び出した時に既に計算が終了してればすぐに値が返ってきます。