定制並發類(七)實現ThreadFactory接口生成自定義的線程給Fork/Join框架

杜老師說 2022-01-07 14:24:20 阅读数:408

定制 threadfactory 接口 生成 自定

聲明:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González     譯者:許巧輝

實現ThreadFactory接口生成自定義的線程給Fork/Join框架

Fork/Join框架是Java7中最有趣的特征之一。它是Executor和ExecutorService接口的一個實現,允許你執行Callable和Runnable任務而不用管理這些執行線程。

這個執行者面向執行能被拆分成更小部分的任務。主要組件如下:

  • 一個特殊任務,實現ForkJoinTask類
  • 兩種操作,將任務劃分成子任務的fork操作和等待這些子任務結束的join操作
  • 一個算法,優化池中線程的使用的work-stealing算法。當一個任務正在等待它的子任務(結束)時,它的執行線程將執行其他任務(等待執行的任務)。

ForkJoinPool類是Fork/Join的主要類。在它的內部實現,有如下兩種元素:

  • 一個存儲等待執行任務的列隊。
  • 一個執行任務的線程池

在這個指南中,你將學習如何實現一個在ForkJoinPool類中使用的自定義的工作者線程,及如何使用一個工廠來使用它。

准備工作…

這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,打開它並創建一個新的Java項目。

如何做…

按以下步驟來實現的這個例子:

1.創建一個繼承ForkJoinWorkerThread類的MyWorkerThread類。

public class MyWorkerThread extends ForkJoinWorkerThread {

2.聲明和創建一個參數化為Integer類的ThreadLocal屬性,名為taskCounter。

private static ThreadLocal<Integer> taskCounter=new ThreadLocal<Integer>();

3.實現這個類的構造器。

protected MyWorkerThread(ForkJoinPool pool) {super(pool);}

4.重寫onStart()方法。調用父類的這個方法,寫入一條信息到控制臺。設置當前線程的taskCounter屬性值為0。

@Overrideprotected void onStart() {super.onStart();System.out.printf("MyWorkerThread %d: Initializing taskcounter.\n",getId());taskCounter.set(0);}

5.重寫onTermination()方法。寫入當前線程的taskCounter屬性值到控制臺。

@Overrideprotected void onTermination(Throwable exception) {System.out.printf("MyWorkerThread %d:%d\n",getId(),taskCounter.get());super.onTermination(exception);}

6.實現addTask()方法。遞增taskCounter屬性值。

public void addTask(){int counter=taskCounter.get().intValue();counter++;taskCounter.set(counter);}

7.創建一個實現ForkJoinWorkerThreadFactory接口的MyWorkerThreadFactory類。實現newThread()方法,創建和返回一個MyWorkerThread對象。

@Overridepublic ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new MyWorkerThread(pool);}}

8.創建MyRecursiveTask類,它繼承一個參數化為Integer類的RecursiveTask類。

public class MyRecursiveTask extends RecursiveTask<Integer> {

9.聲明一個私有的、int類型的屬性array。

private int array[];

10.聲明兩個私有的、int類型的屬性start和end。

private int start, end;

11.實現這個類的構造器,初始化它的屬性。

public MyRecursiveTask(int array[],int start, int end) {this.array=array;this.start=start;this.end=end;}

12.實現compute()方法,用來合計數組中在start和end比特置之間的所有元素。首先,將執行這個任務的線程轉換成一個MyWorkerThread對象,然後使用addTask()方法來增長這個線程的任務計數器。

@Overrideprotected Integer compute() {Integer ret;MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();thread.addTask();}

13.實現addResults()方法。計算和返回兩個任務(接收參數)的結果的總和。

private Integer addResults(Task task1, Task task2) {int value;try {value = task1.get().intValue()+task2.get().intValue();} catch (InterruptedException e) {e.printStackTrace();value=0;} catch (ExecutionException e) {e.printStackTrace();value=0;}

14.令這個線程睡眠10毫秒,然後返回任務的結果。

try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}return value;}

15.實現這個例子的主類,通過創建Main類,並實現main()方法。

public class Main {public static void main(String[] args) throws Exception {

16.創建一個名為factory的MyWorkerThreadFactory對象。

MyWorkerThreadFactory factory=new MyWorkerThreadFactory();

17.創建一個名為pool的ForkJoinPool對象,將前面創建的factory對象作為參數傳給它的構造器。

ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);

18.創建一個大小為100000的整數數組,將所有元素初始化為值1。

int array[]=new int[100000];for (int i=0; i<array.length; i++){array[i]=1;}

19.創建一個新的Task對象,用來合計數組中的所有元素。

MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);

20.使用execute()方法,將這個任務提交給池。

pool.execute(task);

21.使用join()方法,等待這個任務的結束。

task.join();

22.使用shutdown()方法,關閉這個池。

pool.shutdown();

23.使用awaitTermination()方法,等待這個執行者的結束。

pool.awaitTermination(1, TimeUnit.DAYS);

24.使用get()方法,將任務的結束寫入到控制臺。

System.out.printf("Main: Result: %d\n",task.get());

25.寫入一條信息到控制臺,錶明程序的結束。

System.out.printf("Main: End of the program\n");

它是如何工作的…

Fork/Join框架使用的線程叫工作者線程。Java包含繼承Thread類的ForkJoinWorkerThread類和使用Fork/Join框架實現工作者線程。

在這個指南中,你已實現了繼承ForkJoinWorkerThread類的MyWorkerThread類,並重寫這個類的兩個方法。你的目標是實現每個工作者線程的任務計數器,以至於你可以知道每個工作者線程執行多少個任務。你已經通過一個ThreadLocal屬性實現計數器。這樣,每個線程都擁有它自己的計數器,對於來你說是透明的。

你已重寫ForkJoinWorkerThread類的onStart()方法來實現任務的計數器。當工作者線程開始它的執行時,這個方法將被調用。你也重寫了onTermination()方法,將任務計數器的值寫入到控制臺。當工作者線程結束它的執行時,這個方法將被調用。你也在MyWorkerThread類中實現addTask()方法,用來增加每個線程的任務計數器。

對於ForkJoinPool類,與Java並發API中的所有執行者一樣,使用工廠來創建它。所以,如果你想在ForkJoinPool類中使用MyWorkerThread線程,你必須實現自己的線程工廠。對於Fork/Join框架,這個工廠必須實現ForkJoinPool.ForkJoinWorkerThreadFactory類。為此,你已實現MyWorkerThreadFactory類。這個類只有一個用來創建一個新的MyWorkerThread對象的方法。

最後,你只要使用已創建的工廠來初始化ForkJoinPool類。你已在Main類中通過使用ForkJoinPool的構造器實現了。

以下截圖顯示了這個程序的部分輸出:

4

你可以看出ForkJoinPool對象如何執行4個工作者線程及每個工作者線程執行多少個任務。

不止這些…

考慮一下,當一個線程正常結束或拋出一個Exception异常時,調用的ForkJoinWorkerThread提供的onTermination()方法。這個方法接收一個Throwable對象作為參數。如果這個參數值為null時,錶明這個工作者線程正常結束。但是,如果這個參數的值不為null,錶明這個線程拋出一個异常。你必須包含必要的代碼來處理這種情况。

參見

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071424202089.html