ExecutorService-10個要訣和技巧

杜老師說 2022-01-07 06:10:01 阅读数:980

executorservice-10 executorservice 技巧

原文鏈接 作者:Tomasz Nurkiewicz    譯者:simonwang

ExecutorService抽象概念自Java5就已經提出來了,現在是2014年。順便提醒一下:Java5和Java6都已不被支持,Java7在半年內也將會這樣。我提出這個的原因是許多Java程序員仍然不能完全明白ExecutorService到底是怎樣工作的。還有很多地方要去學習,今天我會分享一些很少人知道的特性和實踐。然而這篇文章仍然是面向中等程序員的,沒什麼特別高級的地方。

1. Name pool threads

我想强調一點的是,當在運行JVM或調試期間創建線程時,默認的線程池命名規則是pool-N-thread-M,這裏N代錶線程池的序列數(每一次你創建一個線程池的時候,全局計數N就加1),而M則是某一個線程池的線程序列數。例如,pool-2-thread-3就意味著JVM生命周期中第2線程池的第3線程。具體可以查看:Executors.defaultThreadFactory()。這樣不具備描述性,JDK使得線程命名的過程有些微的複雜,因為命名的方法隱藏在ThreadFactory內部。幸運地是Guava有一個很有用的類:

import com.google.common.util.concurrent.ThreadFactoryBuilder;final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Orders-%d") .setDaemon(true) .build();final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

線程池默認創造的是非守護線程,由你來决定是否合適。

2. Switch names according to context

有一個我從 Supercharged jstack: How to Debug Your Servers at 100mph學到的小技巧。一旦我們記住了線程的名字,那麼在任何時刻我們都能够改變它們!這是有道理的,因為線程轉儲顯示了類名和方法名,沒有參數和局部變量。通過調整線程名保留一些必要的事務標識符,我們可以很容易追踪某一條運行緩慢或者造成死鎖的信息/記錄/查詢等。例如:

private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + messageId); try { //real logic here... } finally { currentThread.setName(oldName); } });}

在try-finally塊內部,當前線程被命名為Processing-WHATEVER-MESSAGE-ID-IS,當通過系統追踪信息流時這可能會派上用場。

3. Explicit and safe shutdown

在客戶端線程和線程池之間有一個任務隊列,當你的應用關閉時,你必須關心兩件事:任務隊列會發生什麼;正在運行的任務會怎樣(這個時候將詳細介紹)。令人感到吃驚的是許多程序員並不會適當地或有意識地關閉線程池。這有兩個方法:要麼讓所有的任務隊列全都執行完(shutdown()),要麼舍弃它們(shutdownNow()),這依賴你使用的具體情况。例如如果我們提交一連串的任務並且想要它們在完成後盡可能快的返回,可以使用shutdown():

private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug("All e-mails were sent so far? {}", done);}

在這個例子中我們發送了一堆e-mail,每一個都作為一個獨立的任務交給線程池。在提交了所有的任務之後我們執行shutdown使線程池不再接收新的任務。然後最多等待1minute直到所有的任務都完成。然而如果有些任務仍然處於掛起狀態,awaitTermination()將返回false,而那些在等待的任務會繼續執行。我知道一些人會使用新潮的用法:

emails.parallelStream().forEach(this::sendEmail);

你可能會覺得我太保守,但我喜歡去控制並行線程的數量。不用介意,還有一種優雅的shutdown()方法shutdownNow():

final List<Runnable> rejected = executorService.shutdownNow();log.debug("Rejected tasks: {}", rejected.size());

這樣一來隊列中還在等待的任務將會被舍弃並被返回,但已經在運行的任務將會繼續。

4. Handle interruption with care

很少人知道Future接口的cancel,這裏我不想重複說明,你可以去看我以前的文章:

InterruptedException and interrupting threads explained

5. Monitor queue length and keep it bounded

不合適的線程池大小可能會造成運行緩慢、不穩定以及內存泄漏。如果你配置太少的線程,那麼任務隊列就會變大,消耗太多內存。另一方面太多的線程又會由於過度頻繁的上下文切換而造成整個系統運行緩慢。所以觀察隊列的長度並將其限定在一定範圍內是很重要的,這樣的話過載的線程池會短暫拒絕新任務的提交:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);

上面的代碼和Executors.newFixedThreadPool(n)是等價的,然而不同的是默認情况下固定線程池使用的是無限制的LinkedBlockingQueue ,我們使用的是固定容量100的ArrayBlockingQueue。這就意味著如果已經有100個任務在排隊(其中有n個任務正在執行),那麼新的任務就將被駁回並拋出RejectedExecutionException。一旦在外部可以訪問queue ,那麼我們就可以周期性地調用size(),並把它提交到logs/JMX或其他任何你使用的監視器中。

6. Remember about exception handling

下面代碼段的結果是什麼?

executorService.submit(() -> { System.out.println(1 / 0);});

我深受其苦:它不會打印任何東西。不會拋出java.lang.ArithmeticException: / by zero,什麼也沒有。線程池將忽略這個异常,就像它從來沒發生過。如果上面的代碼是用java.lang.Thread偶然創造的,那麼UncaughtExceptionHandler可能會起作用。但在線程池裏你就要多加小心了。如果你正在提交Runnable (沒有返回結果,就像上面),那麼你必須將整個代碼塊用try-catch包起來,至少要log一下。如果你提交的是Callable,確保你總是使用阻塞的get()方法來重拋异常:

final Future<Integer> division = executorService.submit(() -> 1 / 0);//below will throw ExecutionException caused by ArithmeticExceptiondivision.get();

有趣的是就算是Spring框架在處理這個bug的時候會使用@Async,詳細: SPR-8995SPR-12090

7. Monitor waiting time in a queue

監控工作隊列深度又是一個層面,在排除單個事務或任務的故障時,有必要了解從任務的提交到實際執行耗時多長。這種等待時間最好趨近於零(當線程池中有空閑的線程時),但任務又不得不在隊列中排隊導致等待時間變長。而且如果池內沒有一定數量的線程,在運行新任務時可能需要創造新的線程,而這個過程也是要消耗少量時間的。為了能够清楚地監測這個時間,我們使用類似下面的代碼包裝原始的ExecutorService :

public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug("Task {} spent {}ms in queue", task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }); } //...}

這並不是完整的實現,但你得知道這個基本概念。當我們向線程池提交任務的那一刻,就立馬開始測量時間,而任務一開始被執行就停止測量。不要被上面源碼中很接近的startTime 和queueDuration 所迷惑了,事實上這兩行是在不同的線程中執行的,可能有數毫秒甚至數秒的差別,例如:

Task [email protected] spent 9883ms in queue

8. Preserve client stack trace

響應式編程這段日子似乎比較火,Reactive manifesto,reactive streams,RxJava(剛剛發布1.0),Clojure agents,scala.rx…,這些東西都挺好的,但它們的堆棧跟踪將不再友好,大多數堆棧跟踪沒有什麼卵用。舉個例子,當線程池中的任務拋出了一個异常:

java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na] at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0] at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

我們很容易就發現MyTask在76行拋出了空指針异常,但我們並不知道是誰提交了這個任務,因為堆棧跟踪僅僅只是告訴你Thread和 ThreadPoolExecutor的信息。我們能通過源碼從技術上定比特MyTask被創造的比特置,不需要線程(更不必說事件驅動、響應式編程)我們就能够馬上看到全面信息。如果我們保留客戶端代碼(提交任務的代碼)的堆棧跟踪並在出現故障的時候將其打印出來會怎麼樣?這不是什麼新想法,例如Hazelcast會將當前點發生的异常傳送回客戶端代碼,下面就看看保持客戶端堆棧跟踪是怎樣實現的:

public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception("Client stack trace"); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); } //...}

這次一旦出現异常我們將檢索任務被提交地方的所有堆棧跟踪和線程名,和標准异常相比下面的异常信息更有價值:

Exception java.lang.NullPointerException in task submitted from thrad main here:java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na] at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na] at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0] at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0] at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9. Prefer CompletableFuture

Java 8提出了强大的CompletableFuture,請盡可能的使用它。ExecutorService並沒有擴展支持這個强大的抽象,所以你要小心使用它。用:

final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);

代替:

final Future<BigDecimal> future = executorService.submit(this::calculate);

CompletableFuture繼承了Future及其所有功能,而且CompletableFuture所提供的擴展功能極大地豐富了我們的API。

10. Synchronous queue

SynchronousQueue是一種有趣的BlockingQueue但真正意義上並不是queue,事實上它連數據結構都算不上。要解釋的話它算是0容量的隊列,引用JavaDoc:

each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]

Synchronous queues are similar to rendezvous channels used in CSP and Ada.

這和線程池有什麼關系呢?試著在ThreadPoolExecutor中使用SynchronousQueue:

BlockingQueue<Runnable> queue = new SynchronousQueue<>();ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, queue);

我們創造了有兩個線程的線程池和一個SynchronousQueue,因為SynchronousQueue本質上是零容量的隊列,因此如果有空閑線程,ExecutorService只會執行新的任務。如果所有的線程都被占用,新任務會被立刻拒絕不會等待。當進程背景要求立刻啟動或者被丟弃時,這種機制是可取的。
以上,希望你們能够找到至少一個有用的!

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: ExecutorService-10個要訣和技巧

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201070610005502.html