《Java 7並發編程實戰手册》第四章線程執行器

杜老師說 2022-01-07 13:36:46 阅读数:322

java 手册 第四章 第四 四章

感謝人民郵電大學授權並發網發布此書樣章,新書購買傳送門=》當當網Snip20140120_1

本章將介紹下列內容:

  • 創建線程執行器
  • 創建固定大小的線程執行器
  • 在執行器中執行任務並返回結果
  • 運行多個任務並處理第一個結果
  • 運行多個任務並處理所有結果
  • 在執行器中延時執行任務
  • 在執行器中周期性執行任務
  • 在執行器中取消任務
  • 在執行器中控制任務的完成
  • 在執行器中分離任務的啟動與結果的處理
  • 處理在執行器中被拒絕的任務

4.1 簡介

通常,使用Java來開發一個簡單的並發應用程序時,會創建一些 Runnable 對象,然後創建對應的 Thread 對象來執行它們。但是,如果需要開發一個程序來運行大量的並發任務,這個方法將突顯以下劣勢:

  • 必須實現所有與 Thread 對象管理相關的代碼,比如線程的創建、結束以及結果獲取;
  • 需要為每一個任務創建一個 Thread 對象。如果需要執行大量的任務,這將大大地影響應用程序的處理能力;
  • 計算機的資源需要高效地進行控制和管理,如果創建過多的線程,將會導致系統負荷過重。

自從Java 5開始,Java並發API提供了一套意在解决這些問題的機制。這套機制稱之為執行器框架(Executor Framework,圍繞著 Executor 接口和它的子接口 ExecutorService,以及實現這兩個接口的 ThreadPoolExecutor 類展開。

這套機制分離了任務的創建和執行。通過使用執行器,僅需要實現 Runnable 接口的對象,然後將這些對象發送給執行器即可。執行器通過創建所需的線程,來負責這些 Runnable 對象的創建、實例化以及運行。但是執行器功能不限於此,它使用了線程池來提高應用程序的性能。當發送一個任務給執行器時,執行器會嘗試使用線程池中的線程來執行這個任務,避免了不斷地創建和銷毀線程而導致系統性能下降。

執行器框架另一個重要的優勢是 Callable 接口。它類似於 Runnable 接口,但是卻提供了兩方面的增强。

  • 這個接口的主方法名稱為 call() ,可以返回結果。
  • 當發送一個 Callable 對象給執行器時,將獲得一個實現了 Future 接口的對象。可以使用這個對象來控制 Callable 對象的狀態和結果。

本章接下來將使用上述由Java並發API提供的類及其變體來展示如何使用執行器框架。

4.2 創建線程執行器

使用執行器框架(Executor Framework)的第一步是創建 ThreadPoolExecutor 對象。可以 ThreadPoolExecutor類提供的四個構造器或者使用Executors工廠類來創建 ThreadPoolExecutor 對象。一旦有了執行器,就可以將RunnableCallable對象發送給它去執行了。

在本節,我們將學習如何使用兩種操作來實現一個範例,這個範列將模擬一個Web服務器來應對來自不同客戶端的請求。

准備工作

請先行閱讀1.2節來學習用Java創建線程的基本機制。然後比較這兩種機制,並根據不同的問題來選擇最佳的一種。

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.實現將被Web服務器執行的任務。創建一個名為 Task 的類,並實現 Runnable 接口。

public class Task implements Runnable {

2.聲明一個名為 initDate 的私有 Date 屬性,用來存儲任務的創建時間,然後創建一個名為 name 的私有 String 屬性,用來存儲任務的名稱。

private Date initDate;private String name;

3.實現類的構造器,用來初始化這兩個屬性。

public Task(String name){initDate=new Date();this.name=name;}

4.實現 run() 方法。

@Overridepublic void run() {

5.在控制臺上輸出 initDate 屬性和實際時間,即任務的開始時間。

System.out.printf("%s: Task %s: Created on: %s\n",Thread.currentThread().getName(),name,initDate);System.out.printf("%s: Task %s: Started on: %s\n",Thread.currentThread().getName(),name,new Date());

6.將任務休眠一段隨機時間。

try {Long duration=(long)(Math.random()*10);System.out.printf("%s: Task %s: Doing a task during %dseconds\n",Thread.currentThread().getName(),name,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}

7.在控制臺輸入任務的完成時間。

System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());

8.創建一個名為 Server的類,它將執行通過執行器接收到的每一個任務。

public class Server {

9.聲明一個名為executorThreadPoolExecutor屬性。

private ThreadPoolExecutor executor;

10.實現類的構造器,通過 Executors 類來初始化 ThreadPoolExecutor 對象。

public Server(){executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();}

11.實現 executeTask() 方法。它接收一個 Task 對象作為參數,並將 Task 對象發送給執行器。在控制臺輸出一條信息錶示新的任務已經到達。

public void executeTask(Task task){System.out.printf("Server: A new task has arrived\n");

12.調用執行器的 execute() 方法將任務發送給Task

executor.execute(task);

13.在控制臺輸出一些執行器相關的數據來觀察執行器的狀態。

System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());

14.實現 endServer() 方法。在這個方法裏,調用執行器的 shutdown() 方法來結束它的執行。

public void endServer() {executor.shutdown();}

15.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {Server server=new Server();for (int i=0; i<100; i++){Task task=new Task("Task "+i);server.executeTask(task);}server.endServer();}}

工作原理

這個範例的核心在於 Server 類,這個類創建和使用 ThreadPoolExecutor 執行器來執行任務。

第一個關鍵點是在 Server 類的構造器中創建 ThreadPoolExecutor 對象。ThreadPoolExecutor 類有4個不同的構造器,但是,由於這些構造器在使用上的複雜性,Java並發API提供 Executors 工廠類來構造執行器和其他相關的對象。雖然可以直接通過 ThreadPoolExecutor 其中之一的構造器來創建 ThreadPoolExecutor 對象,但是推薦使用 Executors 工廠類來創建它。

在這個示例中,通過使用 Executors 工廠類的 newCachedThreadPool() 方法創建了一個緩存線程池。這個方法返回一個 ExecutorService 對象,因此它將被强制轉換為 ThreadPoolExecutor 類型,並擁有所有的方法。如果需要執行新任務,緩存線程池就會創建新線程;如果線程所運行的任務執行完成後並且這個線程可用,那麼緩存線程池將會重用這些線程。線程重用的優點是减少了創建新線程所花費的時間。然而,新任務固定會依賴線程來執行,因此緩存線程池也有缺點,如果發送過多的任務給執行器,系統的負荷將會過載。

備注:僅當線程的數量是合理的或者線程只會運行很短的時間時,適合采用 Executors 工廠類的 newCachedThreadPool() 方法來創建執行器。

一旦創建了執行器,就可以使用執行器的 execute() 方法來發送 RunnableCallable 類型的任務。這個範例發送實現了 Runnable 接口的 Task 類型的對象給執行器。

範例中也打印了一些執行器相關的日志信息,專門使用了如下方法。

  • getPoolSize() :返回執行器線程池中實際的線程數。
  • getActiveCount() :返回執行器中正在執行任務的線程數。
  • getCompletedTaskCount() :返回執行器已經完成的任務數。

執行器以及 ThreadPoolExecutor 類一個重要的特性是,通常需要顯示地去結束它。如果不這樣做,那麼執行器將繼續執行,程序也不會結束。如果執行器沒有任務可執行了,它將繼續等待新任務的到來,而不會結束執行。Java應用程序不會結束直到所有非守護線程結束它們的運行,因此,如果有終止執行器,應用程序將永遠不會結束。

為了完成執行器的執行,可以使用 ThreadPoolExecutor 類的 shutdown() 方法。當執行器執行完成所有待運行的任務後,它將結束執行。調用 shutdown() 方法之後,如果嘗試再發送另一個任務給執行器,任務將被拒絕,並且執行器也將拋出 RejectedExecutionException 异常。

下面的截圖展示了範例執行的部分結果。

Java Concurrency Cook Book 4.1

當最後一個任務到達服務器時,執行器擁有由100項任務和90個活動線程組成的池。

更多信息

ThreadPoolExecutor 類提供了許多方法來獲取自身狀態的信息。在範例中,已經使用了 getPoolSize() 方法來獲取線程池的大小,用 getActiveCount() 方法來獲取線程池中活動線程的數量,用 getCompletedTaskCount() 方法來獲取執行器完成的任務數量。也可以使用 getLargestPoolSize() 方法來返回曾經同時比特於線程池中的最大線程數。

ThreadPoolExecutor 類也提供了結束執行器的相關方法。

  • shutdownNow() :這個方法會立即關閉執行器。執行器將不再執行那些正在等待執行的任務。這個方法將返回等待執行的任務列錶。調用時,正在運行的任務將繼續運行,但是這個方法並不等待這些任務完成。
  • isTerminated():如果調用了shutdown()shutdownNow()方法,並且執行器完成了關閉的過程,那麼這個方法將返回 true
  • isShutdown():如果調用了shutdown()方法,那麼這個方法將返回true
  • awaitTermination(long timeout, TimeUnit unit):這個方法將阻塞所調用的線程,直到執行器完成任務或者達到所指定的 timeout值。

TimeUnit是一個枚舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

備注:如果想等待任務的結束,而不管任務的持續時間,可以使用一個大的超時時間,比如DAYS

參見

  • 參見4.12節。
  • 參見8.4節。

4.3 創建固定大小的線程執行器

當使用Executors類的newCachedThreadPool()方法創建基本的 ThreadPoolExecutor 時,執行器運行過程中將碰到線程數量的問題。如果線程池裏沒有空閑的線程可用,那麼執行器將為接收到的每一個任務創建一個新線程,當發送大量的任務給執行器並且任務需要持續較長的時間時,系統將會超負荷,應用程序也將隨之性能不佳。

為了避免這個問題,Executors 工廠類提供了一個方法來創建一個固定大小的線程執行器。這個執行器有一個線程數的最大值,如果發送超過這個最大值的任務給執行器,執行器將不再創建額外的線程,剩下的任務將被阻塞直到執行器有空閑的線程可用。這個特性可以保證執行器不會給應用程序帶來性能不佳的問題。

在本節,我們將通過修改本章4.2節的範例來學習如何創建固定大小的線程執行器。

准備工作

請先行閱讀本章的4.2節,並實現其中所闡述的範例,因為本節將對其繼續修改。

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.實現本章4.2節所描述的範例。打開 Server 類並修改它的構造器,使用 newFixedThreadPool() 方法來創建執行器,並傳遞數字 5 作為它的參數。

public Server(){executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);}

2.修改 executeTask() 方法,增加一行打印日志信息。調用 getTaskCount() 方法來獲取已發送到執行器上的任務數。

System.out.printf("Server: Task Count: %d\n",executor.getTaskCount());

工作原理

在這個示例中,使用 Executors 工廠類的 newFixedThreadPool() 方法來創建執行器。這個方法創建了具有線程最大數量值的執行器。如果發送超過線程數的任務給執行器,剩餘的任務將被阻塞直到線程池裏有空閑的線程來處理它們。newFixedThreadPool() 方法接收執行器將擁有的線程數量的最大值作為參數。這個例子創建了一個線程數量的最大值為 5 的執行器。

下面的截圖展示了範例執行的部分結果。

Java Concurrency Cook Book 4.2

為了在程序中輸出相關信息,已經使用的 ThreadPoolExecutor 類的一些方法如下。

  • getPoolSize():返回執行器中線程的實際數量。
  • getActiveCount():返回執行器正在執行任務的線程數量。

將看到,控制臺輸出的信息是 5,錶示執行器擁有 5 個線程,並且執行器不會超過這個最大的線程連接數。

當發送最後一個任務給執行器時,由於執行器只有 5 個活動的線程,所以剩餘的 95 個任務只能等待空閑線程。getTaskCount() 方法可以用來顯示有多少個任務已經發送給執行器。

更多信息

Executors 工廠類也提供 newSingleThreadExecutor() 方法。這是一個創建固定大小線程執行器的極端場景,它將創建一個只有單個線程的執行器。因此,這個執行器只能在同一時間執行一個任務。

參見

  • 參見4.12節。
  • 參見8.4節。

4.4 在執行器中執行任務並返回結果

執行器框架(Executor Framework的優勢之一是,可以運行並發任務並返回結果。Java並發API通過以下兩個接口來實現這個功能。

Callable:這個接口聲明了 call() 方法。可以在這個方法裏實現任務的具體邏輯操作。Callable 接口是一個泛型接口,這就意味著必須聲明 call() 方法返回的數據類型。

Future:這個接口聲明了一些方法來獲取由 Callable 對象產生的結果,並管理它們的狀態。

在本節,我們將學習如何實現任務的返回結果,並在執行器中運行任務。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建名為FactorialCalculator的類,並實現Callable接口,接口的泛型參數為Integer 類型。

public class FactorialCalculator implements Callable<Integer> {

2.聲明一個名為 number 的私有 Integer 屬性,存儲任務即將用來計算的數字。

private Integer number;

3.實現類的構造器,用來初始化類的屬性。

public FactorialCalculator(Integer number){this.number=number;}

4.實現call()方法。這個方法返回FactorialCalculator類的 number 屬性的階乘(Factorial)。

@Overridepublic Integer call() throws Exception {

5.創建並初始化在call()方法內使用的內部變量。

int result = 1;

6.如果number值是0或1,則返回1;否則計算number的階乘。為了演示效果,在兩個乘法之間,將任務休眠20毫秒。

if ((num==0)||(num==1)) {result=1;} else {for (int i=2; i<=number; i++) {result*=i;TimeUnit.MILLISECONDS.sleep(20);}}

7.在控制臺輸出操作的結果。

System.out.printf("%s: %d\n",Thread.currentThread().getName(),result);

8.返回操作的結果。

return result;

9.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

10.通過Executors工廠類的newFixedThreadPool()方法創建ThreadPoolExecutor執行器來運行任務。傳遞參數2給newFixedThreadPool()方法錶示執行器將最多創建兩個線程。

ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(2);

11.創建一個 Future<Integer> 類型的列錶對象 resultList

List<Future<Integer>> resultList=new ArrayList<>();

12.通過 Random 類創建一個名 random 的隨機數字生成器。

Random random=new Random();

13.生成 10 個介於 0~10 之間的隨機整數。

for (int i=0; i<10; i++){Integer number= random.nextInt(10);

14.創建 FactorialCaculator 對象,並將隨機數 number 傳遞給它作為參數。

FactorialCalculator calculator=newFactorialCalculator(number);

15.調用執行器的 submit() 方法發送 FactorialCalculator 任務給執行器。這個方法返回一個 Future<Integer> 對象來管理任務和得到的最終結果。

Future<Integer> result=executor.submit(calculator);

16.將 Future 對象添加到前面創建的 resultList 列錶中。

resultList.add(result);}

17.創建一個 do 循環來監控執行器的狀態。

do {

18.通過執行器的 getCompletedTaskNumber() 方法,在控制臺輸出信息錶示任務完成的數量。

System.out.printf("Main: Number of Completed Tasks:%d\n",executor.getCompletedTaskCount());

19.遍曆 resultList 列錶中的 10 個 Future 對象,通過調用 isDone() 方法來輸出錶示任務是否完成的信息。

for (int i=0; i<resultList.size(); i++) {Future<Integer> result=resultList.get(i);System.out.printf("Main: Task %d: %s\n",i,result.isDone());}

20.將線程休眠 50 毫秒。

try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}

21.若執行器中完成的任務數量小於 10 ,則一直重複執行這個循環。

} while (executor.getCompletedTaskCount()<resultList.size());

22.在控制臺上輸出每一個任務得到的結果。對於每一個 Future 對象來講,通過調用 get() 方法將得到由任務返回的 Integer 對象。

System.out.printf("Main: Results\n");for (int i=0; i<resultList.size(); i++) {Future<Integer> result=resultList.get(i);Integer number=null;try {number=result.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

23.在控制臺上打印出數字number

System.out.printf("Main: Task %d: %d\n",i,number);}

24.調用執行器的 shutdown() 方法結束執行。

executor.shutdown();

工作原理

在本節,我們學習了如何使用 Callable 接口來啟動並發任務並返回結果。我們編寫了 FactorialCalculator 類,它實現了帶有泛型參數 Integer 類型的 Callable 接口。因此,這個 Integer 類型將作為在調用 call() 方法時返回的類型。

範例的另一個關鍵點在 Main 主類中。我們通過 submit() 方法發送一個 Callable 對象給執行器去執行,這個 submit() 方法接收 Callable 對象作為參數,並返回 Future 對象。Future 對象可以用於以下兩個主要目的。

  • 控制任務的狀態:可以取消任務和檢查任務是否已經完成。為了達到這個目的,可使用 isDone() 方法來檢查任務是否已經完成。
  • 通過 call() 方法獲取返回的結果。為了達到這個目的,可使用 get() 方法。這個方法一直等待直到 Callable 對象的 call() 方法執行完成並返回結果。如果 get() 方法在等待結果時線程中斷了,則將拋出一個 InterruptedException异常。如果 call() 方法拋出异常那麼 get() 方法將隨之拋出 ExecutionException 异常。

更多信息

在調用Future對象的get()方法時,如果Future對象所控制的任務並未完成,那麼這個方法將一直阻塞到任務完成。Future 接口也提供了get()方法的其他調用方式。

  • get(long timeout,TimeUnit unit):如果調用這個方法時,任務的結果並未准備好,則方法等待所指定的timeout時間。如果等待超過了指定的時間而任務的結果還沒有准備好,那麼這個方法將返回null

TimeUnit是一個枚舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDSSECONDS

參見

  • 參見4.2節。
  • 參見4.5節。
  • 參見4.6節。

4.5 運行多個任務並處理第一個結果

並發編程比較常見的一個問題是,當采用多個並發任務來解决一個問題時,往往只關心這些任務中的第一個結果。比如,對一個數組進行排序有很多種算法,可以並發啟動所有算法,但是對於一個給定的數組,第一個得到排序結果的算法就是最快的排序算法。

在本節,我們將學習如何使用 ThreadPoolExecutor 類來實現這個場景。範例允許用戶可以通過兩種驗證機制進行驗證,但是,只要有一種機制驗證成功,那麼這個用戶就被驗證通過了。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為 UserValidator 的類,它將實現用戶驗證的過程。

public class UserValidator {

2.聲明一個名為 name 的私有 String 屬性,用來存儲用戶驗證系統的名稱。

private String name;

3.實現類的構造器,用來初始化類的屬性。

public UserValidator(String name) {this.name=name;}

4.實現 validate() 方法。它接收兩個 String 參數,分別取名為用戶名name 和密碼 password,這兩個參數也將被用來進行用戶驗證。

public boolean validate(String name, String password) {

5.創建一個名為 randomRandom  類型的隨機對象。

Random random=new Random();

6.等待一段隨機時間來模擬用戶驗證的過程。

try {long duration=(long)(Math.random()*10);System.out.printf("Validator %s: Validating a user during %dseconds\n",this.name,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {return false;}

7.返回隨機的 boolean 值。當用戶通過驗證時,這個方法返回 true 值,如果用戶沒有通過驗證則返回 false 值。

return random.nextBoolean();}

8.實現 getName() 方法。這個方法返回 name 屬性值。

public String getName(){return name;}

9.創建一個名為 TaskValidator 的類,它將通過 UserValidation 對象作為並發任務來執行用戶驗證的過程。這個類實現了帶有 String 泛型參數的 Callable 接口。

public class TaskValidator implements Callable<String> {

10.聲明一個名為 validator 的私有 UserValidator 屬性。

private UserValidator validator;

11.聲明兩個私有的 String 屬性,分別為用戶名 user 和密碼 password

private String user;private String password;

12.實現類的構造器,用來初始化類的屬性。

public TaskValidator(UserValidator validator, String user,String password){this.validator=validator;this.user=user;this.password=password;}

13.實現call()方法,並返回String對象。

@Overridepublic String call() throws Exception {

14.如果用戶沒有通過 UserValidator 對象的驗證,就在控制臺輸出沒有找到這個用戶,錶明該用戶未通過驗證,並拋出 Exception 類型的异常。

if (!validator.validate(user, password)) {System.out.printf("%s: The user has not been found\n",validator.getName());throw new Exception("Error validating user");}

15.否則,就在控制臺輸出用戶已經找到,錶明該用戶已經通過驗證,然後返回 UserValidator 對象的名稱。

System.out.printf("%s: The user has been found\n",validator.getName());return validator.getName();

16.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

17.創建兩個 String 對象,分別取名為 usernamepassword,並初始化這兩個屬性值為test。

String username="test";String password="test";

18.創建兩個 UserValidator 對象,分別取名為 ldapValidatordbValidator

UserValidator ldapValidator=new UserValidator("LDAP");UserValidator dbValidator=new UserValidator("DataBase");

19.創建兩個TaskValidator對象,分別取名為ldapTask和dbTask,並分別用ldapValidator 和dbValidator來初始化他們。

TaskValidator ldapTask=new TaskValidator(ldapValidator,username, password);TaskValidator dbTask=new TaskValidator(dbValidator,username,password);

20.創建一個名為 taksList 的 TaskValidator 類型列錶,並將 ldapTask 和 dbTask 添加到列錶中。

List<TaskValidator> taskList=new ArrayList<>();taskList.add(ldapTask);taskList.add(dbTask);

21.通過Executors工廠類的newCachedThreadPool()方法創建一個新的 ThreadPoolExecutor 執行器對象,並創建一個名為 result 的 String 對象。

ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();String result;

22.調用執行器的 invokeAny() 方法。這個方法接收 taskList 作為參數,並返回String 對象。然後,在控制臺上輸出這個方法返回的 String 對象。

try {result = executor.invokeAny(taskList);System.out.printf("Main: Result: %s\n",result);} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

23.通過shutdown()方法來終止執行器,並在控制臺輸出信息錶示程序已經執行結束。

executor.shutdown();System.out.printf("Main: End of the Execution\n");

工作原理

這個範例的關鍵點在 Main 主類中。ThreadPoolExecutor 類的 invokeAny() 方法接收到一個任務列錶,然後運行任務,並返回第一個完成任務並且沒有拋出异常的任務的執行結果。這個方法返回的類型與任務裏的 call() 方法返回的類型相同,在這個範例中,它將返回 String 類型值。

下面的截圖展示了當範例運行後,有一個任務成功地驗證了用戶後的運行結果。

Java Concurrency Cook Book 4.3

範例中有兩個UserValidator對象,它們返回隨機的boolean值。每一個UserValidator對象被TaskValidator對象使用,TaskValidator對象實現了Callable接口。如果 UserValidator類的validate()方法返回false值,那麼TaskValidator類將拋出Exception异常。否則,返回true值。

因此,我們有兩個任務可以返回true值或拋出Exception异常。從而,可以有如下4種可能性。

  • 如果兩個任務都返回true值,那麼invokeAny()方法的結果就是首先完成任務的名稱。
  • 如果第一個任務返回true值,第二個任務拋出Exception异常,那麼invokeAny() 方法的結果就是第一個任務的名稱。
  • 如果第一個任務拋出Exception异常,第二個任務返回true值,那麼invokeAny() 方法的結果就是第二個任務的名稱。
  • 如果兩個任務都拋出Exception异常,那麼invokeAny()方法將拋出 ExecutionException异常。

將這個範例多運行幾次,那麼將得到如上所述的四種可能的結果。以下截圖則顯示當兩個任務同時拋出异常時,應用程序得到的結果。

Java Concurrency Cook Book 4.4

更多信息

ThreadPoolExecutor 類還提供了 invokeAny() 方法的其他版本:

invokeAny(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):這個方法執行所有的任務,如果在給定的超時期滿之前某個任務已經成功完成(也就是未拋出异常),則返回其結果。

TimeUnit是一個枚舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

參見

  • 參見4.6節。

4.6 運行多個任務並處理所有結果

執行器框架(Executor Framework允許執行並發任務而不需要去考慮線程創建和執行。它還提供了可以用來控制在執行器中執行任務的狀態和獲取任務運行結果的 Future 類。

如果想要等待任務結束,可以使用如下兩種方法。

  • 如果任務執行結束,那麼Future接口的isDone()方法將返回true
  • 在調用shutdown()方法後,ThreadPoolExecutor類的awaitTermination()方法會將線程休眠,直到所有的任務執行結束。

這兩個方法有一些缺點:第一個方法,僅可以控制任務的完成與否;第二個方法,必須關閉執行器來等待一個線程,否則調用這個方法線程將立即返回。

ThreadPoolExecutor 類還提供一個方法,它允許發送一個任務列錶給執行器,並等待列錶中所有任務執行完成。在本節,我們將編寫範例,執行三個任務,當它們全部執行結束後打印出結果信息,用來學習如何使用這個特性。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為 Result 的類,用來存儲範例中並發任務產生的結果。

public class Result {

2.聲明兩個私有屬性。一個名為 nameString 屬性,一個名為 valueint 屬性。

private String name;private int value;

3.實現對應的 get()set() 方法來設置和返回 namevalue 屬性。

public String getName() {return name;}public void setName(String name) {this.name = name;}public int getValue() {return value;}public void setValue(int value) {this.value = value;}

4.創建一個名為Task的類,並實現Callable接口,接口的泛型參數為Result類型。

public class Task implements Callable<Result> {

5.聲明一個名為 name 的私有 String 屬性。

private String name;

6.實現類的構造器,用來初始化類的屬性。

public Task(String name) {this.name=name;}

7.實現call()方法。在這個範例中,這個方法將返回一個Result類型的對象。

@Overridepublic Result call() throws Exception {

8.在控制臺輸出錶示任務開始的信息。

System.out.printf("%s: Staring\n",this.name);

9.等待一段隨機時間。

try {long duration=(long)(Math.random()*10);System.out.printf("%s: Waiting %d seconds for results.\n",this.name,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}

10.生成一個int值,准備作為返回Result對象中的int屬性,這個int值為5個隨機數的總和。

int value=0;for (int i=0; i<5; i++){value+=(int)(Math.random()*100);}

11.創建一個Result對象,並用任務的名稱和上一步計算的int值來對其進行初始化。

Result result=new Result();result.setName(this.name);result.setValue(value);

12.在控制臺輸出信息錶示任務執行結束。

System.out.println(this.name+": Ends");

13.返回Result對象。

return result;}

14.實現範例的主類,創建Main主類,並實現main()方法。

public class Main {public static void main(String[] args) {

15.通過Executors工廠類的newCachedThreadPool()方法創建一個ThreadPoolExecutor 執行器對象。

ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

16.創建一個Task類型的任務列錶taskList。創建3個Task任務並將它們添加到任務列錶taskList中。

List<Task> taskList=new ArrayList<>();for (int i=0; i<3; i++){Task task=new Task(i);taskList.add(task);}

17.創建一個 Future 類型的結果列錶 resultList。這些對象泛型參數為 Result 類型。

List<Future<Result>>resultList=null;

18.調用 ThreadPoolExecutor 類的 invokeAll() 方法。這個方法將返回上一步所創建的 Future 類型的列錶。

try {resultList=executor.invokeAll(taskList);} catch (InterruptedException e) {e.printStackTrace();}

19.調用shutdown()方法結束執行器。

executor.shutdown();

20.在控制臺輸出任務處理的結果,即Future類型列錶中的Result結果。

System.out.println("Main: Printing the results");for (int i=0; i<resultList.size(); i++){Future<Result> future=resultList.get(i);try {Result result=future.get();System.out.println(result.getName()+": "+result.getValue());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}

工作原理

在本節,我們學習了如何發送任務列錶給執行器,並且通過invokeAll()方法等待所有任務的完成。這個方法接收一個Callable對象列錶,並返回一個Future對象列錶。在這個列錶中,每一個任務對應一個Future對象。Future對象列錶中的第一個對象控制Callable列錶中第一個任務,以此類推。

需要注意的一點是,在存儲結果的列錶聲明中,用在Future接口中的泛型參數的數據類型必須與Callable接口的泛型數據類型相兼容。在這個例子中,我們使用的是相同的數據類型:Result類。

另一個關於invokeAll()方法重要的地方是,使用Future對象僅用來獲取任務的結果。當所有的任務執行結束時這個方法也執行結束了,如果在返回的Future對象上調用isDone()方法,那麼所有的調用將返回true值。

更多信息

ExecutorService 接口還提供了 invokeAll() 方法的另一個版本:

  • invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit):當所有任務執行完成,或者超時的時候(無論哪個首先發生),這個方法將返回保持任務狀態和結果的Future列錶。

TimeUnit是一個枚舉類,有如下的常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS和SECONDS。

參見

  • 參見4.4節。
  • 參見4.5節。

4.7 在執行器中延時執行任務

執行器框架(Executor Framework提供了 ThreadPoolExecutor 類並采用線程池來執行 CallableRunnable 類型的任務,采用線程池可以避免所有線程的創建操作而提高應用程序的性能。當發送一個任務給執行器時,根據執行器的相應配置,任務將盡可能快地被執行。但是,如果並不想讓任務馬上被執行,而是想讓任務在過一段時間後才被執行,或者任務能够被周期性地執行。為了達到這個目的,執行器框架提供了 ScheduledThreadPoolExecutor 類。

在本節,我們將學習如何創建 ScheduledThreadPoolExecutor 執行器,以及如何使用它在經過一個給定的時間後開始執行任務。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為Task的類,並實現Callable接口,接口的泛型參數為String類型。

public class Task implements Callable<String> {

2.聲明一個名為name的私有String屬性,用來存儲任務的名稱。

private String name;

3.實現類的構造器,並初始化 name 屬性。

public Task(String name) {this.name=name;}

4.實現call()方法。在控制臺輸出實際的時間,並返回一個文本信息,比如“Hello,world”。

public String call() throws Exception {System.out.printf("%s: Starting at : %s\n",name,new Date());return "Hello, world";}

5.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

6.通過Executors工廠類的newScheduledThreadPool()方法創建一個 ScheduledThreadPoolExecutor 執行器,並傳遞 1 作為參數。

ScheduledThreadPoolExecutor executor=(ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1);

7.初始化一些任務(在我們的示例中是 5 個),然後通過ScheduledThreadPoolExecutor 實例的 schedule() 方法來啟動這些任務。

System.out.printf("Main: Starting at: %s\n",new Date());for (int i=0; i<5; i++) {Task task=new Task("Task "+i);executor.schedule(task,i+1 , TimeUnit.SECONDS);}

8.調用執行器的 shutdown() 方法來結束執行器。

executor.shutdown();

9.調用執行器的 awaitTermination() 方法等待所有任務結束。

try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e) {e.printStackTrace();}

10.在控制臺輸出信息錶示程序執行結束的時間。

System.out.printf("Main: Ends at: %s\n",new Date());

工作原理

這個範例的關鍵點在於 Main 主類和 ScheduledThreadPoolExecutor 執行器的管理。雖然可以通過 ThreadPoolExecutor 類來創建定時執行器,但是在Java並發API中則推薦利用 Executors 工廠類來創建。在這個範例中,必須使用 newScheduledThreadPool() 方法,並且傳遞數字 1 作為方法的參數,這個參數就是線程池裏擁有的線程數。

為了在定時執行器中等待一段給定的時間後執行一個任務,需要使用 schedule() 方法。這個方法接收如下的參數:

  • 即將執行的任務;
  • 任務執行前所要等待的時間;
  • 等待時間的單比特,由 TimeUnit 類的一個常量來指定。

在這個示例中,每個任務將等待 N 秒(TimeUnit.SECONDS),這個 N 值則等於任務在數組中的比特置加 1。

備注:如果想在一個給定的時間點來定時執行任務,那就需要計算這個給定時間點和當前時間的差异值,然後用這個差异值作為任務的延遲值。

通過下面的截圖,可以看到範例運行的部分結果。

Java Concurrency Cook Book 4.5

從結果可知,每隔 1 秒鐘就有一個任務開始執行;這是因為所有的任務被同時發送到執行器,但每個任務都比前一個任務延遲了 1 秒鐘。

更多信息

也可以使用Runnable接口來實現任務,因為ScheduledThreadPoolExecutor類的 schedule()方法可以同時接受這兩種類型的任務。

雖然ScheduledThreadPoolExecutor 類是 ThreadPoolExecutor 類的子類,因而繼承了 ThreadPoolExecutor 類所有的特性。但是,Java推薦僅在開發定時任務程序時采用 ScheduledThreadPoolExecutor 類。

最後,在調用shutdown()方法而仍有待處理的任務需要執行時,可以配置 ScheduledThreadPoolExecutor的行為。默認的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,通過調用ScheduledThreadPoolExecutor類的 setExecuteExisting
DelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false參數給這個方法,執行shutdown()方法後,待處理的任務將不會被執行。

參見

  • 參見4.4節。

4.8 在執行器中周期性執行任務

執行器框架(Executor Framework提供了 ThreadPoolExecutor 類,通過線程池來執行並發任務從而避免了所有線程的創建操作。當發送一個任務給執行器後,根據執行器的配置,它將盡快地執行這個任務。當任務執行結束後,這個任務就會從執行器中删除;如果想再次執行這個任務,則需要再次發送這個任務到執行器。

但是,執行器框架提供了 ScheduledThreadPoolExecutor 類來執行周期性的任務。在本節,我們將學習如何使用這個類的功能來計劃執行周期性的任務。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為 Task 的類,並實現 Runnable 接口。

public class Task implements Runnable {

2.聲明一個名為 name 的私有 String 屬性,用來存儲任務的名稱。

private String name;

3.實現類的構造器,用來初始化類的屬性。

public Task(String name) {this.name=name;}

4.實現 run() 方法。在控制臺輸出實際的時間,用來檢驗任務將在指定的一段時間內執行。

@Overridepublic String call() throws Exception {System.out.printf("%s: Starting at : %s\n",name,new Date());return "Hello, world";}

5.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

6.通過調用Executors工廠類的newScheduledThreadPool()方法創建ScheduledThreadPoolExecutor 執行器對象,傳遞 1 作為這個方法的參數。

ScheduledExecutorService executor=Executors.newScheduledThreadPool(1);

7.在控制臺輸出實際時間。

System.out.printf("Main: Starting at: %s\n",new Date());

8.創建一個新的Task對象。

Task task=new Task("Task");

9.調用scheduledAtFixRate()方法將這個任務發送給執行器。傳遞給這個方法的參數分別為上一步創建的task對象、數字1、數字2,以及TimeUnit.SECONDS常量。這個方法返回一個用來控制任務狀態的ScheduledFuture對象。

ScheduledFuture<?> result=executor.scheduleAtFixedRate(task,1, 2, TimeUnit.SECONDS);

10.創建一個10步的循環,在控制臺輸出任務下一次將要執行的剩餘時間。在循環體內,用ScheduledFuture類的getDelay()方法來獲取任務下一次將要執行的毫秒數,然後將線程休眠500毫秒。

for (int i=0; i<10; i++){System.out.printf("Main: Delay: %d\n",result.getDelay(TimeUnit.MILLISECONDS));Sleep the thread during 500 milliseconds.try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}

11.調用 shutdown() 方法結束執行器。

executor.shutdown();

12.將線程休眠 5 秒,等待周期性的任務全部執行完成。

try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}

13.在控制臺輸出信息錶示程序結束。

System.out.printf("Main: Finished at: %s\n",new Date());

工作原理

想要通過執行器框架來執行一個周期性任務時,需要一個ScheduledExecutorService 對象。同創建執行器一樣,在Java中推薦使用Executors工廠類來創建 ScheduledExecutorService對象。Executors類就是執行器對象的工廠。在這個例子中,可以使用newScheduledThreadPool()方法來創建一個ScheduledExecutorService對象。這個方法接收一個錶示線程池中的線程數來作為參數。在這個範例中,因為僅有一個任務,所以只需要傳遞數字 1 作為參數即可。

一旦有了可以執行周期性任務的執行器,就可以發送任務給這個執行器。在範例中,我們使用scheduledAtFixedRate()方法發送任務。這個方法接收4個參數,分別為將被周期性執行的任務,任務第一次執行後的延時時間,兩次執行的時間周期,以及第2個和第3個參數的時間單比特。這個單比特是TimeUnit枚舉的常量。TimeUnit是一個枚舉類,有如下的常量:DAYSHOURSMICROSECONDSMILLISECONDSMINUTESNANOSECONDSSECONDS

另一個需要注意的是,兩次執行之間的周期是指任務在兩次執行開始時的時間間隔。如果有一個周期性的任務需要執行 5 秒鐘,但是卻讓它每 3 秒鐘執行一次,那麼,在任務執行的過程中將會有兩個任務實例同時存在。

scheduleAtFixedRate()方法返回一個ScheduledFuture對象,ScheduledFuture接口則擴展了Future接口,於是它帶有了定時任務的相關操作方法。ScheduledFuture是一個泛型參數化的接口。在這個示例中,任務是Runnable對象,並沒有泛型參數化,必須通過 ? 符號作為參數來泛型化它們。

我們已經使用過 ScheduledFuture 接口中的一個方法。getDelay()方法返回任務到下一次執行時所要等待的剩餘時間。這個方法接收一個 TimeUnit 常量作為時間單比特。

下面的截圖顯示了範例的部分運行結果。

Java Concurrency Cook Book 4.6

通過控制上面的信息,可以看到任務是每 2 秒執行一次;剩餘的延遲時間會每隔 500 毫秒在控制臺上輸出,這個 500 毫秒則是主線程將被休眠的時間。當關閉執行器時,定時任務將結束執行,然後在控制臺上也看不到更多的信息了。

更多信息

ScheduledThreadPoolExecutor 類還提供了其他方法來安排周期性任務的運行,比如,scheduleWithFixedRate()方法。這個方法與 scheduledAtFixedRate() 方法具有相同的參數,但是略有一些不同需要引起注意。在 scheduledAtFixedRate() 方法中,第 3 個參數錶示任務兩次執行開始時間的間隔,而在 schedulledWithFixedDelay () 方法中,第 3 個參數則是錶示任務上一次執行結束的時間與任務下一次開始執行的時間的間隔。

也可以配置ScheduledThreadPoolExecutor實現shutdown()方法的行為,默認行為是當調用shutdown()方法後,定時任務就結束了。可以通過 ScheduledThreadPoolExecutor類的setContinueExistingPeriodicTasksAfterShutdownPolicy() 方法來改變這個行為,傳遞參數true給這個方法,這樣調用shutdown()方法後,周期性任務仍將繼續執行。

參見

  • 參見4.2節。
  • 參見4.7節。

4.9 在執行器中取消任務

使用執行器時,不需要管理線程,只需要實現 RunnableCallable 任務並發送任務給執行器即可。執行器負責創建線程,管理線程池中的線程,當線程不再需要時就銷毀它們。有時候,我們可能需要取消已經發送給執行器的任務。在這種情况下,可以使用 Future 接口的 cancel() 方法來執行取消操作。在本節,我們將學習如何使用這個方法來取消已經發送給執行器的任務。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為Task的類,並實現Callable接口,接口的泛型參數為String類型。接著實現call()方法,構造一個無限循環,先在控制臺輸出信息,然後休眠100毫秒。

public class Task implements Callable<String> {@Overridepublic String call() throws Exception {while (true){System.out.printf("Task: Test\n");Thread.sleep(100);}}

2.實現範例主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

3.通過Executors工廠類的newCachedThreadPool()方法創建一個ThreadPoolExecutor執行器對象。

ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();

4.創建一個新的Task對象。

Task task=new Task();

5.調用submit()方法將任務發送給執行器。

System.out.printf("Main: Executing the Task\n");Future<String> result=executor.submit(task);

6.讓主線程休眠2秒。

try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}

7.執行器的submit()方法返回名為resultFuture對象,調用Future對象的cancel() 方法來取消任務的執行。傳遞參數true給這個cancel()方法。

System.out.printf("Main: Canceling the Task\n");result.cancel(true);

8.在控制臺輸出調用isCancelled() 方法和isDone()方法的結果,來驗證任務已經被取消和已完成。

System.out.printf("Main: Cancelled: %s\n",result.isCancelled());System.out.printf("Main: Done: %s\n",result.isDone());

9.調用shutdown()方法結束執行器,然後在控制臺輸出信息錶示程序執行結束。

executor.shutdown();System.out.printf("Main: The executor has finished\n");

工作原理

如果想取消一個已經發送給執行器的任務,可以使用 Future 接口的 cancel() 方法。根據調用 cancel() 方法時所傳遞的參數以及任務的狀態,這個方法的行為有些不同。

  • 如果任務已經完成,或者之前已經被取消,或者由於某種原因而不能被取消,那麼方法將返回 false 並且任務也不能取消。
  • 如果任務在執行器中等待分配 Thread 對象來執行它,那麼任務被取消,並且不會開始執行。如果任務已經在運行,那麼它依賴於調用 cancel() 方法時所傳遞的參數。如果傳遞的參數為 true 並且任務正在運行,那麼任務將被取消。如果傳遞的參數為 false 並且任務正在運行,那麼任務不會被取消。

下面的截圖展示了範例執行的結果。

Java Concurrency Cook Book 4.7

更多信息

如果Future對象所控制任務已經被取消,那麼使用Future對象的get()方法時將拋出 CancellationException异常。

參見

  • 參見4.4節。

4.10 在執行器中控制任務的完成

FutureTask 類提供了一個名為 done() 的方法,允許在執行器中的任務執行結束之後,還可以執行一些代碼。這個方法可以被用來執行一些後期處理操作,比如:產生報錶,通過郵件發送結果或釋放一些系統資源。當任務執行完成是受 FutureTask 類控制時,這個方法在內部被 FutureTask 類調用。在任務結果設置後以及任務的狀態已改變為 isDone之後,無論任務是否被取消或者正常結束,done()方法才被調用。

默認情况下,done()方法的實現為空,即沒有任何具體的代碼實現。我們可以覆蓋 FutureTask 類並實現done()方法來改變這種行為。在本節,我們將學習如何覆蓋這個方法,並在任務結束後執行這些代碼。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建名為ExecutableTask的類,並實現Callable接口,接口的泛型參數為String類型。

public class ExecutableTask implements Callable<String> {

2.聲明一個名為name的私有String屬性,用來存儲任務的名稱,用 getName() 方法來返回這個屬性值。

private String name;public String getName(){return name;}

3.實現類的構造器,並初始化任務的名稱。

public ExecutableTask(String name){this.name=name;}

4.實現 call() 方法。將任務休眠一段隨機時間,並返回帶有任務名稱的消息。

@Overridepublic String call() throws Exception {try {long duration=(long)(Math.random()*10);System.out.printf("%s: Waiting %d seconds for results.\n",this.name,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {}return "Hello, world. I'm "+name;}

5.實現一個名為ResultTask的類,並繼承FutureTask類。FutureTask類的泛型參數為String類型。

public class ResultTask extends FutureTask<String> {

6.聲明一個名為name的私有String屬性,用來存儲任務的名稱。

private String name;

7.實現類構造器。它接收一個Callable對象作為參數,調用父類構造器,並用接收到的任務屬性來初始化name屬性。

public ResultTask(Callable<String> callable) {super(callable);this.name=((ExecutableTask)callable).getName();}

8.覆蓋done()方法。檢查isCancelled()方法的返回值,然後根據這個返回值在控制臺輸出不同的信息。

Overrideprotected void done() {if (isCancelled()) {System.out.printf("%s: Has been canceled\n",name);} else {System.out.printf("%s: Has finished\n",name);}}

9.實現範例的主類,創建Main主類,然後實現main()方法。

public class Main {public static void main(String[] args) {

10.調用Executors工廠類的newCachedThreadPool()方法創建一個 ExecutorService 執行器對象。

ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

11.創建一個數組用來存儲5ResultTask對象。

ResultTask resultTasks[]=new ResultTask[5];

12.初始化ResultTask對象。在數組的每一個比特置上,必須創建 ExecutorTask 對象,然後創建ResultTask對象來使用ExecutorTask對象,最後調用submit()方法將 ResultTask任務發送給執行器。

for (int i=0; i<5; i++) {ExecutableTask executableTask=new ExecutableTask("Task "+i);resultTasks[i]=new ResultTask(executableTask);executor.submit(resultTasks[i]);}

13.將主線程休眠5秒鐘。

try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e1) {e1.printStackTrace();}

14.取消已經發送給執行器的所有任務。

for (int i=0; i<resultTasks.length; i++) {resultTasks[i].cancel(true);}

15.通過調用ResultTask對象的get()方法,在控制臺上輸出還沒有被取消的任務結果。

for (int i=0; i<resultTasks.length; i++) {try {if (!resultTasks[i].isCancelled()){System.out.printf("%s\n",resultTasks[i].get());}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}   }

16.調用shutdown()方法結束執行器。

executor.shutdown();}}

工作原理

當任務執行結束時, FutureTask類就會調用done()方法。在這個範例中,我們實現了一個Callable類、一個ExecutableTask類以及一個FutureTask類的子類ResultTask,這個子類用來控制ExecutableTask對象的執行。

在創建好返回值以及改變任務狀態為isDone之後,FutureTask類就會在內部調用 done()方法。雖然我們無法改變任務的結果值,也無法改變任務的狀態,但是可以通過任務來關閉系統資源、輸出日志信息、發送通知等。

參見

  • 參見4.4節。

4.11 在執行器中分離任務的啟動與結果的處理

通常情况下,使用執行器來執行並發任務時,將RunnableCallable任務發送給執行器,並獲得Future對象來控制任務。此外,還會碰到如下情形,需要在一個對象裏發送任務給執行器,然後在另一個對象裏處理結果。對於這種情况,Java並發API提供了 CompletionService 類。

CompletionService類有一個方法用來發送任務給執行器,還有一個方法為下一個已經執行結束的任務獲取Future對象。從內部實現機制來看,CompletionService類使用 Executor對象來執行任務。這個行為的優勢是可以共享CompletionService對象,並發送任務到執行器,然後其他的對象可以處理任務的結果。第二個方法有一個不足之處,它只能為已經執行結束的任務獲取Future對象,因此,這些Future對象只能被用來獲取任務的結果。

在本節,我們將學習如何使用CompletionService類,在執行器中來分離任務的啟動與結果的處理。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建名為 ReportGenerator 的類,並實現 Callable 接口,接口的泛型參數為 String 類型。

public class ReportGenerator implements Callable<String> {

2.聲明兩個私有的String屬性,分別命名為sendertitle,將用來錶示報告的數據。

private String sender;private String title;

3.實現類的構造器,用來初始化這兩個屬性。

public ReportGenerator(String sender, String title){this.sender=sender;this.title=title;}

4.實現call()方法。讓線程休眠一段隨機時間。

@Overridepublic String call() throws Exception {try {long duration=(long)(Math.random()*10);System.out.printf("%s_%s: ReportGenerator: Generating areport during %d seconds\n",this.sender,this.title,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}

5.生成包含了sendertitle屬性的字符串並返回該字符串。

String ret=sender+": "+title;return ret;}

6.創建一個名為ReportRequest的類,實現Runnable接口。這個類將模擬請求獲取報告。

public class ReportRequest implements Runnable {

7.聲明一個名為name的私有String屬性,用來存儲ReportRequest的名稱。

private String name;

8.聲明一個名為 service 的私有 CompletionService 屬性,這個 CompletionService 接口是泛型接口。在這個示例中,我們采用 String 類作為泛型參數。

private CompletionService<String> service;

9.實現類的構造器,並初始化這兩個屬性。

public ReportRequest(String name, CompletionService<String>service){this.name=name;this.service=service;}

10.實現run()方法。創建ReportGenerator對象,然後調用CompletionService對象的submit()方法將ReportGenerator對象發送給CompletionService對象。

@Overridepublic void run() {ReportGenerator reportGenerator=new ReportGenerator(name,"Report");service.submit(reportGenerator);}

11.創建名為ReportProcessor的類,並實現Runnable接口。這個類將獲取到 ReportGenerator任務的結果。

public class ReportProcessor implements Runnable {

12.聲明一個名為 service 的私有CompletionService屬性。由於CompletionService接口是一個泛型接口,在這個示例中,我們采用String類作為泛型參數。

private CompletionService<String> service;

13.聲明一個名為end的私有boolean屬性。

private boolean end;

14.實現類的構造器,並初始化這兩個屬性。

public ReportProcessor (CompletionService<String> service){this.service=service;end=false;}

15.實現run()方法。如果end屬性值為false,則調用CompletionService接口的poll() 方法,來獲取下一個已經完成任務的Future對象;當然,這個任務是采用 CompletionService來完成的。

@Overridepublic void run() {while (!end){try {Future<String> result=service.poll(20, TimeUnit.SECONDS);

16.通過調用Future對象的get()方法來獲取任務的結果,並在控制臺輸出這些結果。

if (result!=null) {String report=result.get();System.out.printf("ReportReceiver: Report Received:%s\n",report);}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}System.out.printf("ReportSender: End\n");}

17.實現setEnd()設置方法,修改end的屬性值。

public void setEnd(boolean end) {this.end = end;}

18.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

19.調用Executors工廠類的newCachedThreadPool()方法創建ThreadPoolExecutor執行器對象。

ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

20.創建CompletionService對象,並將上一步創建的executor對象作為構造器的參數。

CompletionService<String> service=new ExecutorCompletionService<>(executor);

21.創建兩個ReportRequest對象,然後創建兩個線程Thread對象分別來執行它們。

ReportRequest faceRequest=new ReportRequest("Face", service);ReportRequest onlineRequest=new ReportRequest("Online",service);Thread faceThread=new Thread(faceRequest);Thread onlineThread=new Thread(onlineRequest);

22.創建1個ReportProcessor對象,然後創建1個線程Thread對象來執行它。

ReportProcessor processor=new ReportProcessor(service);Thread senderThread=new Thread(processor);

23.啟動這3個線程。

System.out.printf("Main: Starting the Threads\n");faceThread.start();onlineThread.start();senderThread.start();

24.等待ReportRequest線程的結束。

try {System.out.printf("Main: Waiting for the reportgenerators.\n");faceThread.join();onlineThread.join();} catch (InterruptedException e) {e.printStackTrace();}

25.調用shutdown() 方法結束執行器,然後調用awaitTermination()方法等待所有的任務執行結束。

System.out.printf("Main: Shutting down the executor.\n");executor.shutdown();try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e) {e.printStackTrace();}

26.調用 setEnd() 方法,設置 end 屬性為 true,來結束 ReportSender 的執行。

processor.setEnd(true);System.out.println("Main: Ends");

工作原理

在範例的主類中,我們調用Executors工廠類的newCachedThreadPool()方法創建了 ThreadPoolExecutor執行器對象。然後,使用這個對象初始化了CompletionService對象,因為完成服務(Completion Service)使用執行器來執行任務。然後,調用ReportRequest 類中的submit()方法,利用“完成服務”來執行任務。

當“完成服務”任務結束,這些任務中一個任務就執行結束了,“完成服務”中存儲著Future對象,用來控制它在隊列中的執行。

調用poll()方法訪問這個隊列,查看是否有任務已經完成,如果有,則返回隊列中的第一個元素(即一個任務執行完成後的Future對象)。當poll()方法返回Future對象後,它將從隊列中删除這個Future對象。在這個示例中,我們在調用poll()方法時傳遞了兩個參數,錶示當隊列裏完成任務的結果為空時,想要等待任務執行結束的時間。

一旦創建了CompletionService對象,還要創建兩個ReportRequest對象,用來執行在CompletionService中的兩個ReportGenerator任務。ReportProcessor任務則將處理兩個被發送到執行器裏的ReportRequest 任務所產生的結果。

更多信息

CompletionService類可以執行CallableRunnable類型的任務。在這個範例中,使用的是Callable類型的任務,但是,也可以發送Runnable對象給它。由於Runnable對象不能產生結果,因此CompletionService的基本原則不適用於此。

CompletionService類提供了其他兩種方法來獲取任務已經完成的Future對象。這些方法如下。

  • poll():無參數的poll()方法用於檢查隊列中是否有Future對象。如果隊列為空,則立即返回null。否則,它將返回隊列中的第一個元素,並移除這個元素。
  • take():這個方法也沒有參數,它檢查隊列中是否有Future對象。如果隊列為空,它將阻塞線程直到隊列中有可用的元素。如果隊列中有元素,它將返回隊列中的第一個元素,並移除這個元素。

參見

  • 參見4.4節。

4.12 處理在執行器中被拒絕的任務

當我們想結束執行器的執行時,調用 shutdown() 方法來錶示執行器應當結束。但是,執行器只有等待正在運行的任務或者等待執行的任務結束後,才能真正結束。

如果在shutdown()方法與執行器結束之間發送一個任務給執行器,這個任務會被拒絕,因為這個時間段執行器已不再接受任務了。ThreadPoolExecutor類提供了一套機制,當任務被拒絕時調用這套機制來處理它們。

在本節,我們將學習如何處理執行器中被拒絕的任務,這些任務實現了RejectedExecutionHandler 接口。

准備工作

本節的範例是在Eclipse IDE裏完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以打開這個IDE並且創建一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。

1.創建一個名為 RejectedTaskController 的類,並實現 RejectedExecutionHandler 接口,然後實現接口的 rejectedExecution() 方法,在控制臺輸出已被拒絕的任務的名稱和執行器的狀態。

public class RejectedTaskController implementsRejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutorexecutor) {System.out.printf("RejectedTaskController: The task %s hasbeen rejected\n",r.toString());System.out.printf("RejectedTaskController: %s\n",executor.toString());System.out.printf("RejectedTaskController: Terminating:%s\n",executor.isTerminating());System.out.printf("RejectedTaksController: Terminated:%s\n",executor.isTerminated());}

2.創建一個名為Task的類,並實現Runnable接口。

public class Task implements Runnable{

3.聲明一個名為name的私有String屬性,用來存儲任務的名稱。

private String name;

4.實現類的構造器,用來初始化類的屬性。

public Task(String name){this.name=name;}

5.實現run()方法。在控制臺輸出信息錶示方法開始執行。

@Overridepublic void run() {System.out.println("Task "+name+": Starting");

6.讓線程休眠一段隨機時間。

try {long duration=(long)(Math.random()*10);System.out.printf("Task %s: ReportGenerator: Generating areport during %d seconds\n",name,duration);TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}

7.在控制臺輸出信息錶示方法執行結束。

System.out.printf("Task %s: Ending\n",name);}

8.覆蓋toString()方法,返回任務的名稱。

public String toString() {return name;}

9.實現範例的主類,創建 Main 主類,並實現 main() 方法。

public class Main {public static void main(String[] args) {

10.創建 RejectedTaskController 對象來管理被拒絕的任務。

RejectecTaskController controller=newRejectecTaskController();

11.調用 Executors 工廠類的 newCachedThreadPool() 方法創建 ThreadPoolExecutor 執行器對象。

ThreadPoolExecutor executor=(ThreadPoolExecutor) Executors.newCachedThreadPool();

12.設置用於被拒絕的任務的處理程序。

executor.setRejectedExecutionHandler(controller);

13.創建 3 個任務並發送給執行器。

System.out.printf("Main: Starting.\n");for (int i=0; i<3; i++) {Task task=new Task("Task"+i);executor.submit(task);}

14.調用 shutdown() 方法關閉執行器。

System.out.printf("Main: Shutting down the Executor.\n");executor.shutdown();

15.創建另一個任務並發送給執行器。

System.out.printf("Main: Sending another Task.\n");Task task=new Task("RejectedTask");executor.submit(task);

16.在控制臺輸出信息錶示程序結束。

System.out.println("Main: End");System.out.printf("Main: End.\n");

工作原理

通過下面的截圖,可以看到範例運行的結果。

Java Concurrency Cook Book 4.8

我們可以看到被拒絕的任務,當執行已經關閉,RejectecedTaskController 在控制臺輸出任務和執行器的信息。

為了處理在執行器中被拒絕的任務,需要創建一個實現RejectedExecutionHandler接口的處理類。這個接口有一個rejectedExecution()方法,其中有兩個參數:

  • 一個Runnable對象,用來存儲被拒絕的任務;
  • 一個Executor對象,用來存儲任務被拒絕的執行器。

被執行器拒絕的每一個任務都將調用這個方法。需要先調用Executor類的 setRejectedExecutionHandler()方法來設置用於被拒絕的任務的處理程序。

更多信息

當執行器接收一個任務並開始執行時,它先檢查shutdown()方法是否已經被調用了。如果是,那麼執行器就拒絕這個任務。首先,執行器會尋找通過setRejectedExecutionHandler()方法設置的用於被拒絕的任務的處理程序,如果找到一個處理程序,執行器就調用其rejectedExecution()方法;否則就拋出 RejectedExecutionExeption异常。這是一個運行時异常,因此並不需要catch語句來對其進行處理。

參見

  • 參見4.2節。

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: 《Java 7並發編程實戰手册》第四章線程執行器

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071336461177.html