線程池運用實例——一次錯誤的多線程程序設計以及修複過程

杜老師說 2022-01-07 14:44:23 阅读数:298

一次 程序

寫在前面的話 

寫下這篇文章只為了回顧之前在實際工作中犯的一個極其二逼的錯誤,用我的經曆來提示後來者,諸比特程序大神,大牛,小牛們看到此文笑笑即可,輕拍輕拍。。。

1 背景

有這麼一個需求,我們的系統(後面簡稱:A系統)需要在後臺執行一個報錶導出任務,在這個任務的執行過程中需要通過CORBA調用其他系統(後面簡稱:B系統)的一個(也有可能是多個)接口去查詢報錶,待結果返回後,將這些結果寫入Excel。這個需求是不是很簡單?套用網上一些FutureTask或者線程池的例子一兩小時就能搞定這個需求。當時我也是這樣認為的,可誰想,這是一個巨大的坑….

2 初始設計

用過CORBA的同學會知道,如同數據庫連接一樣,CORBA的連接數也是是有限的,如果一個接口調用的時間過長,就會長時間占用CORBA有限的連接數,當這種長時間的同步調用過多時就會造成整個系統CORBA調用的阻塞,進而造成系統停止響應。由於查詢操作很耗時,為了避免這種情况的發生,這個接口被設計成了一個异步接口。任務的執行流程就會是這樣:任務開始執行,接著調用這個接口並且通過CORBA向B系統訂閱一個事件,然後任務進入等待狀態,當B系統執行完成後,會向A系統發送一個事件告知執行的結果,任務收到事件後重新開始執行直到結束,如圖:

既然說到了事件,那麼很自然而然的就想到了使用回調的方式去響應事件,並且為了避免事件超時(也就是長時間沒有接收到事件)導致任務長時間等待,我還使用了一個定時的任務去檢查任務的狀態。所以我的程序看起來就像這樣:

IEventFuture.java

public interface IEventFuture { void onEventReceived(Event event);}

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture { private static final int INITIALIZED = 0; private static final int RUNNING = 1; private static final int COMPLETED = 2; private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L; private Date lastUpdate = new Date(); private volatile int state = INITIALIZED; private Timer timer = new Timer(); private SystemBSer systemBSer = new SystemBSer(); private int eventId = -1; @Override public Void call() throws Exception { this.state = RUNNING; try { systemBSer.doQuery(); subscribeEvent(); startTaskTimeoutMonitorTask(); Future future = createEventFuture(); future.get(); } catch (Throwable t) { onTaskError(t); } finally { EventManager.unsubscribe(this.eventId); timer.cancel(); } return null; } @Override public void onEventReceived(Event event) { this.lastUpdate = new Date();// start to write excel// .....// end to write excel this.state = COMPLETED; } private void subscribeEvent() { this.eventId = EventManager.subscribe(this); } private Future createEventFuture() { FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() { @Override public Void call() throws Exception { while (state != COMPLETED) { } return null; } }); new Thread(listenFuture).start(); return listenFuture; } private void startTaskTimeoutMonitorTask() { timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { if (state != COMPLETED || new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) { onTaskTimeout(); } } }, 0, 15 * 60 * 1000); } private void onTaskTimeout() { // do something on task timeout. //   .... // end // set task to completed to end task. this.state = COMPLETED; } private void onTaskError(Throwable t) {// do something to handle error. }}

3 昇級改進

由於做這個需求的關系,我開始閱讀一些關於JAVA多線程編程的一下教程,在閱讀到關於閉鎖的內容時,我突然靈光一現,這玩意不正好可以代替我那個醜陋的使用循環來讓任務進入等待狀態的實現麼?然後我的程序就變成了這樣:

ExportRptTask.java

public class ExportRptTask implements Callable<Void>, IEventFuture { private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L; private Date lastUpdate = new Date(); private CountDownLatch endGate = new CountDownLatch(1); private Timer timer = new Timer(); private SystemBSer systemBSer = new SystemBSer(); private int eventId = -1; @Override public Void call() throws Exception { try { systemBSer.doQuery(); subscribeEvent(); endGate.await(); startTaskTimeoutMonitorTask(); } catch (Throwable t) { onTaskError(t); } finally { EventManager.unsubscribe(this.eventId); timer.cancel(); } return null; } @Override public void onEventReceived(Event event) { this.lastUpdate = new Date();// start to write excel// .....// end to write excel this.endGate.countDown(); } private void subscribeEvent() { this.eventId = EventManager.subscribe(this); } private void startTaskTimeoutMonitorTask() { timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { if (new Date().getTime() - lastUpdate.getTime() > TASK_TIME_OUT_TIME) { onTaskTimeout(); } } }, 0, 15 * 60 * 1000); } private void onTaskTimeout() {// do something on task timeout.//   ....// end// set task to completed to end task. this.endGate.countDown(); } private void onTaskError(Throwable t) {// do something to handle error. }}

4 問題浮現

正在我為我使用高大上的閉鎖代替循環沾沾自喜的時候,測試大爺告訴我,任務經常莫名其妙的失敗,並且日志中沒有任何异常。開始,這讓我覺得很不可思議,因為我已經在call()方法處處理了所有的异常,任務失敗時至少也應該有個日志啥的吧。這個問題一直困擾著我,直到有一天分析日志我突然發現任務執行的工作線程(也就是call()方法所在的線程)和接收到事件後的回調並不是同一個線程。這就意味著在查詢到報錶結果後,所有寫Excel,分發結果等等的操作都是在事件回調的線程中執行的,那麼一旦這裏發生异常原來call()中的catch塊自然無法捕獲,然後异常就被莫名其妙的吞掉了。好吧,我承認我之前對線程池也就了解點皮毛,對多線程也僅僅是有個概念,想當然的認為在線程池中可以Hold住任務的一切,包括響應這個任務在執行過程中創建的其他線程運行時發生的异常。而且更嚴重的是按照原來的實現,只有當整個任務執行完成(包括寫完Excel)後,才會釋放那個閉鎖,所以一旦事件回調發生异常,那麼整個任務都無法終止。在線程池中發生一個任務永遠無法終止的後果,你懂的。

5 重新設計

痛定思痛,我决定重新梳理這個任務的流程。這個需求的難點就是在如何監聽並響應B系統給我們發送的事件,實際上,這是一個很經典的生產者–消費者問題,而阻塞隊列正好是解决這類問題的利器。重新設計的事件響應流程就變成:當B系統發送事件的時候,事件回調線程會往阻塞隊列裏面填充一個事件。在另一方面,任務調用完B系統的查詢接口後,就開始從阻塞隊列中取事件,當事件隊列為空的時候,取事件的線程(也就是線程池執行任務的工作線程)會被阻塞。並且,阻塞隊列的取操作可以設置超時時間,所以當取到的事件對象為空時,就意味著事件超時了,這樣就省去了使用定時任務定時檢查任務狀態的工作。重新設計的程序是這樣的:

EventProxy.java

public class EventProxy implements IEventFuture { private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10); private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L; @Override public void onEventReceived(Event event) { eventQueue.offer(event); } public Event getEvent() throws InterruptedException { return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS); }}

ExportRptTask.java

public class ExportRptTask3 implements Callable<Void> { private SystemBSer systemBSer = new SystemBSer(); private EventProxy eventProxy = new EventProxy(); private int eventId = -1; @Override public Void call() throws Exception { try { systemBSer.doQuery(); subscribeEvent(); Event event = eventProxy.getEvent(); if (event != null) { processEvent(event); } else { onTaskTimeout(); } } catch (Throwable t) { onTaskError(t); } finally { EventManager.unsubscribe(this.eventId); } return null; } private void subscribeEvent() { this.eventId = EventManager.subscribe(eventProxy); } private void processEvent(Event event) {// do something on receive event. } private void onTaskTimeout() {// do something on task timeout.//   ....// end } private void onTaskError(Throwable t) {// do something to handle error. }}

6 總結

相信各比特並發編程的大牛們能在一瞬間就可以把我的程序(包括改進後的)批得體無完膚,不過我還是想分享下我在這個過程中的收獲。

  • 在動手寫程序前,請先理解你的需求,特別是要注意用已有的模型去識別問題,在本例中,我就是沒有識別響應事件的流程其實是個生產者–消費者問題導致了後面的錯誤
  • 請充分的了解你需要使用的技術和工具。比如,使用線程池你就要了解線程池的工作原理,這樣你才能正確的使用這些技術。做技術切忌想當然。
  • 在使用線程池時,重要的操作盡量放在任務的主線程中執行(也就是call()/run()方法所在的線程),否則線程池本身難以對任務進行控制。
  • 如果一定要在任務中再創建新的線程,請確保任務主線程是任務最後退出的線程。切忌不要使用外部線程直接調用任務類的方法,在本例中我就犯了這樣的錯誤。

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071444231398.html