Oracle官方並發教程之高級並發對象

杜老師說 2022-01-07 13:33:50 阅读数:476

oracle 官方 教程 程之 之高

原文地址譯文地址

譯者:李任

目前為止,該教程重點講述了最初作為Java平臺一部分的低級別API。這些API對於非常基本的任務來說已經足够,但是對於更高級的任務就需要更高級的API。特別是針對充分利用了當今多處理器和多核系統的大規模並發應用程序。 本節,我們將著眼於Java 5.0新增的一些高級並發特征。大多數特征已經在新的java.util.concurrent包中實現。Java集合框架中也定義了新的並發數據結構。

  • 鎖對象提供了可以簡化許多並發應用的鎖的慣用法。
  • Executors為加載和管理線程定義了高級API。Executors的實現由java.util.concurrent包提供,提供了適合大規模應用的線程池管理。
  • 並發集合簡化了大型數據集合管理,且極大的减少了同步的需求。
  • 原子變量有减小同步粒度和避免內存一致性錯誤的特征。
  • 並發隨機數(JDK7)提供了高效的多線程生成偽隨機數的方法。

鎖對象

譯者:李任

同步代碼依賴於一種簡單的可重入鎖。這種鎖使用簡單,但也有諸多限制。java.util.concurrent.locks包提供了更複雜的鎖。我們不會詳細考察這個包,但會重點關注其最基本的接口,鎖。 鎖對象作用非常類似同步代碼使用的隱式鎖。如同隱式鎖,每次只有一個線程可以獲得鎖對象。通過關聯Condition對象,鎖對象也支持wait/notify機制。 鎖對象之於隱式鎖最大的優勢在於,它們有能力收回獲得鎖的嘗試。如果當前鎖對象不可用,或者鎖請求超時(如果超時時間已指定),tryLock方法會收回獲取鎖的請求。如果在鎖獲取前,另一個線程發送了一個中斷,lockInterruptibly方法也會收回獲取鎖的請求。 讓我們使用鎖對象來解决我們在活躍度中見到的死鎖問題。Alphonse和Gaston已經把自己訓練成能注意到朋友何時要鞠躬。我們通過要求Friend對象在雙方鞠躬前必須先獲得鎖來模擬這次改善。下面是改善後模型的源代碼,Safelock。為了展示其用途廣泛,我們假設Alphonse和Gaston對於他們新發現的穩定鞠躬的能力是如此入迷,以至於他們無法不相互鞠躬。

import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.Random;public class Safelock { static class Friend { private final String name; private final Lock lock = new ReentrantLock(); public Friend(String name) { this.name = name; } public String getName() { return this.name; } public boolean impendingBow(Friend bower) { Boolean myLock = false; Boolean yourLock = false; try { myLock = lock.tryLock(); yourLock = bower.lock.tryLock(); } finally { if (! (myLock && yourLock)) { if (myLock) { lock.unlock(); } if (yourLock) { bower.lock.unlock(); } } } return myLock && yourLock; } public void bow(Friend bower) { if (impendingBow(bower)) { try { System.out.format("%s: %s has" + " bowed to me!%n", this.name, bower.getName()); bower.bowBack(this); } finally { lock.unlock(); bower.lock.unlock(); } } else { System.out.format("%s: %s started" + " to bow to me, but saw that" + " I was already bowing to" + " him.%n", this.name, bower.getName()); } } public void bowBack(Friend bower) { System.out.format("%s: %s has" + " bowed back to me!%n", this.name, bower.getName()); } }
 static class BowLoop implements Runnable { private Friend bower; private Friend bowee; public BowLoop(Friend bower, Friend bowee) { this.bower = bower; this.bowee = bowee; } public void run() { Random random = new Random(); for (;;) { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) {} bowee.bow(bower); } } } public static void main(String[] args) { final Friend alphonse = new Friend("Alphonse"); final Friend gaston = new Friend("Gaston"); new Thread(new BowLoop(alphonse, gaston)).start(); new Thread(new BowLoop(gaston, alphonse)).start(); }}

執行器(Executors)

譯者:Greenster

在之前所有的例子中,Thread對象錶示的線程和Runnable對象錶示的線程所執行的任務之間是緊耦合的。這對於小型應用程序來說沒問題,但對於大規模並發應用來說,合理的做法是將線程的創建與管理和程序的其他部分分離開。封裝這些功能的對象就是執行器,接下來的部分將講詳細描述執行器。

 Executor接口

譯者:Greenster java.util.concurrent中包括三個Executor接口:

  • Executor,一個運行新任務的簡單接口。
  • ExecutorService,擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法。
  • ScheduledExecutorService,擴展了ExecutorService。支持Future和定期執行任務。

通常來說,指向Executor對象的變量應被聲明為以上三種接口之一,而不是具體的實現類

Executor接口

Executor接口只有一個execute方法,用來替代通常創建(啟動)線程的方法。例如:r是一個Runnable對象,e是一個Executor對象。可以使用

 e.execute(r);

來代替

 (new Thread(r)).start();

但execute方法沒有定義具體的實現方式。對於不同的Executor實現,execute方法可能是創建一個新線程並立即啟動,但更有可能是使用已有的工作線程運行r,或者將r放入到隊列中等待可用的工作線程。(我們將在線程池一節中描述工作線程。)

ExecutorService接口

ExecutorService接口在提供了execute方法的同時,新加了更加通用的submit方法。submit方法除了和execute方法一樣可以接受Runnable對象作為參數,還可以接受Callable對象作為參數。使用Callable對象可以能使任務返還執行的結果。通過submit方法返回的Future對象可以讀取Callable任務的執行結果,或是管理Callable任務和Runnable任務的狀態。 ExecutorService也提供了批量運行Callable任務的方法。最後,ExecutorService還提供了一些關閉執行器的方法。如果需要支持即時關閉,執行器所執行的任務需要正確處理中斷。

ScheduledExecutorService接口

ScheduledExecutorService擴展ExecutorService接口並添加了schedule方法。調用schedule方法可以在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService接口還定義了按照指定時間間隔定期執行任務的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。

線程池

譯者:Greenster

在java.util.concurrent包中多數的執行器實現都使用了由工作線程組成的線程池,工作線程獨立於所它所執行的Runnable任務和Callable任務,並且常用來執行多個任務。 使用工作線程可以使創建線程的開銷最小化。在大規模並發應用中,創建大量的Thread對象會占用占用大量系統內存,分配和回收這些對象會產生很大的開銷。 一種最常見的線程池是固定大小的線程池。這種線程池始終有一定數量的線程在運行,如果一個線程由於某種原因終止運行了,線程池會自動創建一個新的線程來代替它。需要執行的任務通過一個內部隊列提交給線程,當沒有更多的工作線程可以用來執行任務時,隊列保存額外的任務。 使用固定大小的線程池一個很重要的好處是可以實現優雅退化。例如一個Web服務器,每一個HTTP請求都是由一個單獨的線程來處理的,如果為每一個HTTP都創建一個新線程,那麼當系統的開銷超出其能力時,會突然地對所有請求都停止響應。如果限制Web服務器可以創建的線程數量,那麼它就不必立即處理所有收到的請求,而是在有能力處理請求時才處理。 創建一個使用線程池的執行器最簡單的方法是調用java.util.concurrent.ExecutorsnewFixedThreadPool方法。Executors類還提供了下列一下方法:

  • newCachedThreadPool方法創建了一個可擴展的線程池。適合用來啟動很多短任務的應用程序。
  • newSingleThreadExecutor方法創建了每次執行一個任務的執行器。
  • 還有一些創建ScheduledExecutorService執行器的方法。

如果上面的方法都不滿足需要,可以嘗試java.util.concurrent.ThreadPoolExecutor或者java.util.concurrent.ScheduledThreadPoolExecutor

Fork/Joint

譯者:Zach

fork/join框架是ExecutorService接口的一種具體實現,目的是為了幫助你更好地利用多處理器帶來的好處。它是為那些能够被遞歸地拆解成子任務的工作類型量身設計的。其目的在於能够使用所有可用的運算能力來提昇你的應用的性能。 類似於ExecutorService接口的其他實現,fork/join框架會將任務分發給線程池中的工作線程。fork/join框架的獨特之處在與它使用工作竊取(work-stealing)算法。完成自己的工作而處於空閑的工作線程能够從其他仍然處於忙碌(busy)狀態的工作線程處竊取等待執行的任務。 fork/join框架的核心是ForkJoinPool類,它是對AbstractExecutorService類的擴展。ForkJoinPool實現了工作偷取算法,並可以執行ForkJoinTask任務。

基本使用方法

使用fork/join框架的第一步是編寫執行一部分工作的代碼。你的代碼結構看起來應該與下面所示的偽代碼類似:

if (當前這個任務工作量足够小) 直接完成這個任務else 將這個任務或這部分工作分解成兩個部分 分別觸發(invoke)這兩個子任務的執行,並等待結果

你需要將這段代碼包裹在一個ForkJoinTask的子類中。不過,通常情况下會使用一種更為具體的的類型,或者是RecursiveTask(會返回一個結果),或者是RecursiveAction。 當你的ForkJoinTask子類准備好了,創建一個代錶所有需要完成工作的對象,然後將其作為參數傳遞給一個ForkJoinPool實例的invoke()方法即可。

要清晰,先模糊

想要了解fork/join框架的基本工作原理,接下來的這個例子會有所幫助。假設你想要模糊一張圖片。原始的source圖片由一個整數的數組錶示,每個整數錶示一個像素點的顏色數值。與source圖片相同,模糊之後的destination圖片也由一個整數數組錶示。 對圖片的模糊操作是通過對source數組中的每一個像素點進行處理完成的。處理的過程是這樣的:將每個像素點的色值取出,與周圍像素的色值(紅、黃、藍三個組成部分)放在一起取平均值,得到的結果被放入destination數組。因為一張圖片會由一個很大的數組來錶示,這個流程會花費一段較長的時間。如果使用fork/join框架來實現這個模糊算法,你就能够借助多處理器系統的並行處理能力。下面是上述算法結合fork/join框架的一種簡單實現:

public class ForkBlur extends RecursiveAction {private int[] mSource;private int mStart;private int mLength;private int[] mDestination;// Processing window size; should be odd.private int mBlurWidth = 15;public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst;}protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // Calculate average. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float)((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float)((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float)((pixel & 0x000000ff) >> 0) / mBlurWidth; } // Reassemble destination pixel. int dpixel = (0xff000000 ) | (((int)rt) << 16) | (((int)gt) << 8) | (((int)bt) << 0); mDestination[index] = dpixel; }}

接下來你需要實現父類中的compute()方法,它會直接執行模糊處理,或者將當前的工作拆分成兩個更小的任務。數組的長度可以作為一個簡單的閥值來判斷任務是應該直接完成還是應該被拆分。

protected static int sThreshold = 100000;protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination));}

如果前面這個方法是在一個RecursiveAction的子類中,那麼設置任務在ForkJoinPool中執行就再直觀不過了。通常會包含以下一些步驟:

  1. 創建一個錶示所有需要完成工作的任務。
    // source image pixels are in src// destination image pixels are in dstForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 創建將要用來執行任務的ForkJoinPool
    ForkJoinPool pool = new ForkJoinPool();
  3. 執行任務。
    pool.invoke(fb);

想要瀏覽完成的源代碼,請查看ForkBlur,其中還包含一些創建destination圖片文件的額外代碼。

標准實現

除了能够使用fork/join框架來實現能够在多處理系統中被並行執行的定制化算法(如前文中的ForkBlur.java例子),在Java SE中一些比較常用的功能點也已經使用fork/join框架來實現了。在Java SE 8中,java.util.Arrays類的一系列parallelSort()方法就使用了fork/join來實現。這些方法與sort()系列方法很類似,但是通過使用fork/join框架,借助了並發來完成相關工作。在多處理器系統中,對大數組的並行排序會比串行排序更快。這些方法究竟是如何運用fork/join框架並不在本教程的討論範圍內。想要了解更多的信息,請參見Java API文檔。 其他采用了fork/join框架的方法還包括java.util.streams包中的一些方法,此包是作為Java SE 8發行版中Project Lambda的一部分。想要了解更多信息,請參見Lambda Expressions一節。

並發集合

譯者:李任

java.util.concurrent包囊括了Java集合框架的一些附加類。它們也最容易按照集合類所提供的接口來進行分類:

  • BlockingQueue定義了一個先進先出的數據結構,當你嘗試往滿隊列中添加元素,或者從空隊列中獲取元素時,將會阻塞或者超時。
  • ConcurrentMapjava.util.Map的子接口,定義了一些有用的原子操作。移除或者替換鍵值對的操作只有當key存在時才能進行,而新增操作只有當key不存在時。使這些操作原子化,可以避免同步。ConcurrentMap的標准實現是ConcurrentHashMap,它是HashMap的並發模式。
  • ConcurrentNavigableMap是ConcurrentMap的子接口,支持近似匹配。ConcurrentNavigableMap的標准實現是ConcurrentSkipListMap,它是TreeMap的並發模式。

所有這些集合,通過 在集合裏新增對象和訪問或移除對象的操作之間,定義一個happens-before的關系,來幫助程序員避免內存一致性錯誤

原子變量

譯者:李任

java.util.concurrent.atomic包定義了對單一變量進行原子操作的類。所有的類都提供了get和set方法,可以使用它們像讀寫volatile變量一樣讀寫原子類。就是說,同一變量上的一個set操作對於任意後續的get操作存在happens-before關系。原子的compareAndSet方法也有內存一致性特點,就像應用到整型原子變量中的簡單原子算法。 為了看看這個包如何使用,讓我們返回到最初用於演示線程幹擾的Counter類:

class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; }}

使用同步是一種使Counter類變得線程安全的方法,如SynchronizedCounter

class SynchronizedCounter {private int c = 0;public synchronized void increment() {c++;}public synchronized void decrement() {c--;}public synchronized int value() {return c;}}

對於這個簡單的類,同步是一種可接受的解决方案。但是對於更複雜的類,我們可能想要避免不必要同步所帶來的活躍度影響。將int替換為AtomicInteger允許我們在不進行同步的情况下阻止線程幹擾,如AtomicCounter

import java.util.concurrent.atomic.AtomicInteger;class AtomicCounter {private AtomicInteger c = new AtomicInteger(0);public void increment() {c.incrementAndGet();}public void decrement() {c.decrementAndGet();}public int value() {return c.get();}

並發隨機數

譯者:李任

在JDK7中,java.util.concurrent包含了一個相當便利的類,ThreadLocalRandom,當應用程序期望在多個線程或ForkJoinTasks中使用隨機數時。

對於並發訪問,使用TheadLocalRandom代替Math.random()可以减少競爭,從而獲得更好的性能。

你只需調用ThreadLocalRandom.current(), 然後調用它的其中一個方法去獲取一個隨機數即可。下面是一個例子:

 int r = ThreadLocalRandom.current().nextInt(4,77);

 

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: Oracle官方並發教程之高級並發對象

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071333503871.html