Java並發編程【1.2時代】

杜老師說 2022-01-07 16:17:40 阅读数:33

java

         本文介紹了Java原生的多線程技術(1.2),通過詳細介紹waitnotify相關的機制、基礎的多線程技術以及基於這些技術的等待超時、線程間的通信技術和線程池高階技術,最後通過一個基於線程池的簡單文本web服務器—MollyServer,來闡明多線程帶來好處。通過介紹這些技術,展示了在沒有使用Java並發包的時代(1.5-)是如何完成Java的多線程編程,為理解Java5提供了良好幫助。

線程簡介1

       Java從誕生開始就明智的選擇內置對多線程的支持,這將Java語言同其他同一時期的語言相比,具有明顯優勢。線程作為操作系統最小的調度單元,多個線程同時執行,將會改善我們的代碼,在多核環境中具有更加明顯的好處,但是過多的創建線程和對線程的不當管理也容易造成問題。

啟動線程

構造線程

       Java中啟動線程必須要先行的構造一個Thread對象,然後調用這個對象的start方法。

this.group = g; this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); this.name = name.toCharArray(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = AccessController.getContext(); this.target = target; setPriority(priority); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID();

線程的構造,最主要或者說也就是線程對象的初始化過程,在上述過程中,一個新構造的線程對象是由其parent線程來進行分配空間的,而child線程繼承了parent的是否Daemon,優先級和加載資源的classloader,棧空間的大小並且還會分配一個唯一的ID來標識這個child線程,至此一個能够運行的線程對象就初始化好了,在堆內存中等待著運行。

啟動線程

          調用Thread對象的start方法,就可啟動一個新的線程,parent線程同步告知Java VM,只要線程規劃器空閑,應立即啟動這個線程。

2

         而啟動線程,也是交給操作系統來完成,這裏就是一個本地方法了。

         啟動一個線程時,最好設置名稱,這樣在jstack分析時,就會好很多,自定義的線程最好能够起個名字。

/** * @author weipeng * */public class ThreadName { /** * @param args */ public static void main(String[] args) { Thread t = new Thread(new Job()); t.setName("ThreadNameJob"); t.start(); } static class Job implements Runnable { @Override public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }}

        上述代碼直接運行,可以通過jstack pid來觀察棧信息,結果如下:

2012-05-05 23:50:07Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode):"Attach Listener" daemon prio=10 tid=0x00007f4c38001000 nid=0x30b5 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE"DestroyJavaVM" prio=10 tid=0x00007f4c60007800 nid=0x3086 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE"ThreadNameJob" prio=10 tid=0x00007f4c600a2800 nid=0x3097 waiting on condition [0x00007f4c37cfb000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at com.murdock.books.multithread.example.ThreadName$Job.run(ThreadName.java:26) at java.lang.Thread.run(Thread.java:662)"Low Memory Detector" daemon prio=10 tid=0x00007f4c60091800 nid=0x3095 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE"C2 CompilerThread1" daemon prio=10 tid=0x00007f4c6008f000 nid=0x3094 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE"C2 CompilerThread0" daemon prio=10 tid=0x00007f4c6008c000 nid=0x3093 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE"Signal Dispatcher" daemon prio=10 tid=0x00007f4c6008a000 nid=0x3092 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE"Finalizer" daemon prio=10 tid=0x00007f4c6006e000 nid=0x3091 in Object.wait() [0x00007f4c5c860000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159)"Reference Handler" daemon prio=10 tid=0x00007f4c6006c000 nid=0x3090 in Object.wait() [0x00007f4c5c961000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock)"VM Thread" prio=10 tid=0x00007f4c60065800 nid=0x308f runnable"GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4c6001a800 nid=0x3087 runnable"GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4c6001c800 nid=0x3088 runnable"GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4c6001e800 nid=0x3089 runnable"GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4c60020000 nid=0x308a runnable"VM Periodic Task Thread" prio=10 tid=0x00007f4c6009c000 nid=0x3096 waiting on conditionJNI global references: 882

         可以看到一個Java程序在運行時,後臺創建了很多的線程,所以一個Java程序,縱使只有main,它也是多線程的,其中可以看到ThreadNameJob這個線程,也可以看到本地以吞吐量優先的ParallelGC的線程,它的數量默認是和CPU相同的,其中有4個對新生代進行GC的線程。

終止線程

3

       線程從執行Runnalbe開始到結束。

理解中斷

        中斷是一種狀態,它使一個運行中的線程能够感知到其他線程對自身作出了中斷操作,也就是影響到了自己。線程工作檢查自身是否被中斷來作出響應的行為。而該狀態並沒有維護在Thread中,是通過native方法獲得。

         可以通過當前線程對象的isInterrupted來判斷是否被中斷了。

/** * @author weipeng * */public class Interrupted { /** * @param args */ public static void main(String[] args) throws Exception { InterruptedJob ij = new InterruptedJob(); ij.setName("InterruptedJobThread "); ij.start(); Thread.sleep(2000); // 中斷 ij.interrupt(); System.out.println("INTERRUPTED IJ"); Thread.sleep(2000); } static class InterruptedJob extends Thread { @Override public void run() { try { while (true) { Thread.sleep(1000); } } catch (InterruptedException e) { System.out.println("CURRENT INTERRUPT STATUS IS " + Thread.currentThread().getName() + Thread.currentThread().isInterrupted()); // 再次進行中斷 Thread.currentThread().interrupt(); System.out.println("CURRENT INTERRUPT STATUS IS " + Thread.currentThread().getName() + Thread.currentThread().isInterrupted()); } } }}

上述程序輸出:

INTERRUPTED IJ

CURRENT INTERRUPT STATUS IS InterruptedJobThread false

CURRENT INTERRUPT STATUS IS InterruptedJobThread true

可以看出一旦拋出InterruptedException,當前線程的中斷狀態就被清除,但是也可以調用Thread.interrupted()來清除當前的中斷狀態。

線程屬性

4

        Java中創建的線程均會映射為操作系統層面的線程,在Java線程對象中有部分屬性可以提供訪問。線程狀態是理解線程運行的關鍵。

線程優先級

publicclass Thread implements Runnable { /* Make sure registerNatives is the first thing <clinit> does. */ private static native void registerNatives(); static { registerNatives(); } private char name[]; private int priority;

         可以看到priority,這個代錶著優先級,優先級的範圍從110,優先級高的線程占有CPU時間長一些,這當然是在長時間運行時體現出來的,但是不能做為程序執行的依據。

         對priority可以通過對線程對象進行設置,使用setPriority來完成對線程優先級的設定。

下面的例子中,構建了三個不同的線程,它們的優先級不一樣,從110,然後運行,優先級高的線程對times++執行的會多一些。

/** * @author weipeng * */public class Priority { private static CountDownLatch countDownLatch = new CountDownLatch(10000000); private static CountDownLatch start = new CountDownLatch(1); public static void main(String[] args) { CountJob job1 = new CountJob(); Thread lingdao = new Thread(job1); lingdao.setPriority(10); lingdao.start(); CountJob job2 = new CountJob(); Thread pming = new Thread(job2); pming.setPriority(1); pming.start(); CountJob job3 = new CountJob(); Thread zhongchan = new Thread(job3); zhongchan.setPriority(5); zhongchan.start(); start.countDown(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("lingdao : have " + job1.getTimes()); System.out.println("pming : have" + job2.getTimes()); System.out.println("zhongchan : have" + job3.getTimes()); } static class CountJob implements Runnable { private int times = 0; @Override public void run() { // 等待開始 try { start.await(); } catch (InterruptedException e) { e.printStackTrace(); } while (countDownLatch.getCount() > 0) { synchronized (CountJob.class) { if (countDownLatch.getCount() > 0) { countDownLatch.countDown(); times++; } } } } public int getTimes() { return times; } }}

      執行結果如下:

lingdao : have 4347635

pming : have2661562

zhongchan : have2990803

       每次執行的可能都不一樣,但是總的趨勢是高優先級的線程對CPU的占用時間會多一些。

線程狀態

        線程在運行的生命周期中可能處於下面的6種不同的狀態,在一個時刻,線程可能處於CPU上處於運行,或者暫時的沒有分配到CPU資源而處於就緒(准備運行),或者處於阻塞的狀態。具體內容如下面的錶格所示:

狀態名稱

阻塞

可以中斷

說明

運行中 N N

正在CPU上進行執行

准備運行(就緒) N N 暫時的失去CPU資源處於就緒隊列中,可能隨時被線程調度器調度執行
休眠 Y Y 讓出CPU資源的就緒隊列,等待一段時間後再次被放入隊列,可以被中斷提前進入就緒隊列
等待 Y Y 接受到通知或者等待超時會進入到就緒隊列,可以被中斷
阻塞於I/O Y N I/O條件滿足後,例如讀入了一些字符,准備運行
阻塞於同步 Y N

當獲得同步鎖後准備運行

        可以使用如下狀態遷移來描述線程的狀態:

5

        線程在一個時刻將會處於上述的三種狀態之一,這個模型將有效的理解Java線程對象,但是其中處於等待狀態的線程可能會在等待I/O和等待同步時無法被中斷,雖然運行的線程已經被中斷標識,但是不會像休眠和等待一樣通過InterruptedException來直接返回。

/** * <pre> * 處於同步讀取的線程被中斷,不會拋出异常 * * </pre> * * @author weipeng * */public class ReadInterrupted { /** * @param args */ public static void main(String[] args) { // 使用父線程,也就是main-thread Thread thread = new Thread(new InterruptedJob(Thread.currentThread())); thread.start(); InputStream is = System.in; try { is.read(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Main Thread is interrupted ? " + Thread.currentThread().isInterrupted()); } static class InterruptedJob implements Runnable { Thread interruptedThread; public InterruptedJob(Thread thread) { this.interruptedThread = thread; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } interruptedThread.interrupt(); } }}

       運行的結果是:

      這時整個線程掛在is.read上,這時隨意從控制臺輸入一個字符,主線程退出:

     123

     Main Thread is interrupted ? true

       可以看出對阻塞於同步I/O的線程被中斷後,中斷標識被打上,但是不會拋出异常退出。

線程規劃

        對高I/O的線程盡量給予高優先級的設定,對於低I/OCPU運算為主的線程盡量降低優先級,避免過多的占用CPU。因此,不能依據線程優先級的高低來運行程序,需要保證每個線程都有運行的機會。

並發訪問對象

6

      Java支持多個線程同時的訪問一個對象,或者對象的變量,由於每個線程可以擁有這個變量的拷貝(這麼做的目的是能够快速的執行,雖然變量分配的內存在共享內存中,但是每個執行的線程還是可以擁有一份拷貝,這樣做的目的是加速程序的執行,這是現代多核處理器的一個顯著特性)。因此,程序在執行過程中,可能一個線程看到的變量並不一定是最新的。

Volatile

     Volatile關鍵字,就是告知任何對該變量的訪問均需要從共享內存中獲取,而對它的改變必須同步刷新會共享內存。

       比如,錶示一個程序是否運行的變量,boolean on = true,那麼可能是另一個線程來對它進行關閉動作,因此將其設置成為volatile boolean on,這樣就會再其他線程對它進行改變時,能够讓原有的線程立刻感知到。

       但是過多的使用volatile是不必要的,相反它會降低程序執行的效率。

Synchronized

        同步,在帶來可見性的同時,它主要是對多個線程在同一個時刻,只能有一個處於方法或者塊中。

        可以通過將synchronized關鍵字加在方法前面或者采用同步快的方式來進行錶現:

static synchronized void m() { System.out.println("T"); } public static void main(String[] args) { m(); synchronized(Synchronized.class) { m(); } }

}

        Java同步是針對普通的Java對象而言的,每個Java對象均有一把“鎖”,這個鎖在一個線程進入時會排斥其他線程進入,是一個排他鎖。通過javap來觀察字節碼,可以看到:

public static void main(java.lang.String[]); Code: Stack=2, Locals=2, Args_size=1 0: invokestatic #31; //Method m:()V 3: ldc #1; //class com/murdock/books/multithread/example/Synchronized 5: dup 6: astore_1 7: monitorenter 8: invokestatic #31; //Method m:()V 11: aload_1 12: monitorexit 13: goto 19 16: aload_1 17: monitorexit 18: athrow 19: return

          當出現命令monitorenter時代獲得了該對象的鎖,當運行命令monitorexit時代錶釋放了該對象的鎖。

同步化集合

同步化訪問

        在Java的集合api中有非常多的同步集合,比如:VectorHashtable,這些集合的所有方法都是synchronized,也就是說對這些集合的訪問是同步的,但是如果每個接口都有一個專屬的同步集合實現是非常不現實的,因此用過使用Collections.synchronizedXxx方法,可以包裝一個同步的集合對象進行使用。

        比如,摘自Collections

public static <T> List<T> synchronizedList(List<T> list) { return (list instanceof RandomAccess ? new SynchronizedRandomAccessList<T>(list) : new SynchronizedList<T>(list)); }

        該方法返回的就是一個實現了List接口的同步數據結構,這個同步的數據結構每個方法均是同步的,但是如果需要對其進行額外的操作,需要將其加入到同步塊中。

SynchronizedCollection(Collection<E> c) { if (c==null) throw new NullPointerException(); this.c = c; mutex = this; }

        上面可以看到同步集合均是對自身進行同步。

public class Synchronized { static synchronized void m() { System.out.println("T"); } public static void main(String[] args) throws Exception { List<String> s = new ArrayList<String>(); s.add("1"); List<String> synchronizedList = Collections.synchronizedList(s); Thread t = new Thread(new AccessSynchronizedCollections( synchronizedList)); t.start(); synchronized (synchronizedList) { Thread.sleep(5000); System.out.println("Main-thread" + synchronizedList.size()); } } /** * 這個線程將會首先休息2000ms,然後喚醒後去請求鎖,並執行操作 */ static class AccessSynchronizedCollections implements Runnable { List<String> list; public AccessSynchronizedCollections(List<String> list) { this.list = list; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AccessSynchronizedCollections" + list.size()); list.add("2"); } }}

         上述執行的結果:

       Main-thread1

       AccessSynchronizedCollections1

         可以看到,在自定義對集合操作,比如缺少就添加,就需要將集合進行同步,然後在進行操作,否則很容易在判定過程中加入了其他線程對集合的操作。

安全複制集合

         有時一個集合對象是進程內共享的,可能會發生一些變化,因此在作出一些操作的時候,希望能够拿到一份瞬時的拷貝,這個拷貝可能和執行中的這一時刻的集合有了變化,但是能够保證是穩定的。就像我們出門買了一份報紙,我們回家閱讀報紙的時候,上面的新聞可能隨時會發生變化,但是這並不妨礙我們去閱讀它。

第一種複制的方式:

List<String> synchronizedList = Collections.synchronizedList(list); long currentTime = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { String[] array = synchronizedList.toArray(new String[0]); } System.out.println(System.currentTimeMillis() - currentTime);

第二種複制的方式:

for (int i = 0; i < 10000; i++) { synchronized (synchronizedList) { int size = synchronizedList.size(); String[] array = new String[size]; synchronizedList.toArray(array); } }

         第一種比較簡單,第二種對於new String[0]沒有做過多的浪費,但是時間測算,第二種沒有第一種好,因為主要比拼的是toArray的實現,在給定的數組大於等於列錶時,將會使用給定的數組,否則將會通過反射構造一個數組,而這個還是很高效的。

         因此對於集合的數組複制,使用第一種方式是比較適合的。

死鎖

          兩個線程或者多個線程在請求其永遠無法獲取資源的鎖時,就是死鎖狀態。這裏不演示死鎖產生的範例。

          避免死鎖的主要原則:

          首先,對於資源的加鎖時間必須足够短,也就是必要時進行鎖;

          其次,訪問資源過程中的鎖需要按照一致的順序進行獲取,否則需要提昇出一個更大的鎖來確保資源的獲取;

          最後,盡量通過封裝的形式,避免將鎖暴露給外部,從而造成不必要的資源死鎖。

線程間通信

7

         線程開始運行,就如同一個脚本一樣,有自己的棧空間,按照既定的代碼一步一步的執行,直到最後的終結。但是每個運作中的線程,如果僅僅是孤立的運作,那麼沒有一點用處,或者說用處很少,但是多個運作的線程能够相互配合,各司其職將會帶來巨大的好處。

線程間通信的必要性

         一個運作的脚本(線程)修改了一個對象的值,另一個線程捕獲到這個對象的變化,然後進行對應的操作,這個過程事件的觸發啟於一個線程,而最終的執行又是一個線程。因此前者好比生產者,後者就是消費者,這樣的模式隔開了生產和消費,在功能上和架構上具有良好的伸縮性。但是在Java語言中怎樣能够做到上述的過程呢?

         當然,簡單的辦法是不斷的循環去查看,比如:

while (value != desire) {

Thread.sleep(1000);

}

doXxx

        這段偽碼就是相當與如果值不是這個消費線程所要的,那麼就睡眠一段時間,這樣的方式看似能够解决這個問題,但是有兩個矛盾的問題。

        第一個,在睡眠時,基本不消耗CPU,但是如果睡得久,那麼就不能及時的發現value已經變化,也就是及時性難以保證;

        第二個,如果降低睡眠的時間,比如睡1毫秒,這樣消費者能更加迅速的捕獲出變化,但是它卻占用了更多的CPU時間,造成了無端的浪費。

        面對這個矛盾,Java通過固有的wait/notify機制能够很好的實現這個模式。

等待/通知機制

         等待通知機制,是指一個線程調用了對象A上的wait方法,而另外的一個線程在進行了某些操作後,在對象A上的notify或者notifyAll方法,這樣完成了兩個線程之間的交互。而這個waitnotify之間的關系就像一個信號量一樣來完成二者之間的交互工作。

        一個標准的waitnotify的例子,這個例子有兩個線程,第一個等待共享的一個值為false,當為false時它進行print,另外一個在睡眠了一段時間後,將這個值由原有的true改為falsenotify

/** * @author weipeng */public class WaitNotify { static boolean flag = true; static Object OBJ = new Object(); public static void main(String[] args) { Thread t1 = new Thread(new Waiter()); t1.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread t2 = new Thread(new Notifier()); t2.start(); } /** * 等待,如果flag為false則打印 */ static class Waiter implements Runnable { @Override public void run() { // 加鎖,擁有OBJ的Monitor synchronized (OBJ) { // 當條件不滿足時,繼續wait,同時釋放了OBJ的鎖 while (flag) { try { System.out.println(Thread.currentThread() + " still true. wait......"); OBJ.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 條件滿足時,完成工作 System.out .println(Thread.currentThread() + " is false. doXXX."); } } } static class Notifier implements Runnable { @Override public void run() { synchronized (OBJ) { // 獲取OBJ的鎖,然後進行通知,通知時不會釋放OBJ的鎖 // 這也類似於過早通知 OBJ.notifyAll(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } flag = false; OBJ.notifyAll(); } } }}

      從上面的例子中能够提煉出經典的等待和通知機制,對於等待的一方,遵循如下的原則:

1)獲得對象的鎖;

2)如果條件不滿足,那麼調用對象的wait,釋放鎖,被通知後繼續檢查(2

3)條件已經滿足,執行對應的邏輯。

synchronized(OBJ) {

while(Condition not hold) {

OBJ.wait();

}

// Condition hold

do XXX;

}

       通知的一方,遵循如下原則:

1)獲得對象的鎖;

2)更新變量或者條件,然後通知。

synchronized(OBJ) {

value = newvalue;

OBJ.notifyAll();

}

等待/通知的API

等待和通知機制被深深植入了Java語言中,在Object方法中有5final的方法,也就是子類不能複寫的方法。

方法名稱

簡介

notify() 隨機通知調用notify對象上正在等待的線程,注意這個通知沒有放弃對對象的鎖,僅在通知notify完成之後直到釋放了對象的鎖才在對方線程的wait方法處返回;
notifyAll() 這個方法會依次通知所有的正在等待在該對象上的線程,是一種比較保險的做法;
wait() 該方法會讓調用線程進入休眠狀態,只有等待另外線程的notify或者被中斷才會返回,注意的是,調用wait後,會釋放對象的鎖;
wait(long) 等待,這裏的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回,但是這裏很難區分出是其他線程的notify還是超時返回;
wait(long, int) 對於超時更細粒度的控制,達到納秒,但是這個方法用的不多。

        這裏要說明notify方法不會釋放對象的鎖,而也只有釋放了對象的鎖,另一個線程才能從wait中競爭獲得對象的鎖並從wait方法中返回。

/** * @author weipeng */public class WaitNotify { static boolean flag = true; static Object OBJ = new Object(); public static void main(String[] args) { Thread t1 = new Thread(new Waiter()); t1.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread t2 = new Thread(new Notifier()); t2.start(); } /** * 等待,如果flag為false則打印 */ static class Waiter implements Runnable { @Override public void run() { // 加鎖,擁有OBJ的Monitor synchronized (OBJ) { // 當條件不滿足時,繼續wait,同時釋放了OBJ的鎖 while (flag) { try { System.out.println(Thread.currentThread() + " still true. wait......" + new Date()); OBJ.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 條件滿足時,完成工作 System.out .println(Thread.currentThread() + " is false. doXXX." + new Date()); } } } static class Notifier implements Runnable { @Override public void run() { synchronized (OBJ) { // 獲取OBJ的鎖,然後進行通知,不會在notify調用中,釋放OBJ的鎖 // 這也類似於過早通知 // 直到當前線程釋放了OBJ後,Waiter才能從wait方法中返回 OBJ.notifyAll(); flag = false; try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }}

       程序的輸出:

Thread[Thread-0,5,main] still true. wait……Sun Jun 24 20:53:03 CST 2012

Thread[Thread-0,5,main] is false. doXXX.Sun Jun 24 20:53:14 CST 2012

        可以看到,二者之間相差了10秒,也就是Thread.sleep(10000)這段代碼造成的,可以看出Notifier沒有釋放OBJ的鎖,而Waiter在對方沒有釋放前是不會返回的。

PipedStream管道

      Piped這個詞就是管道,相當於從一端入一端出的輸入輸出流。只是不是從網絡和文件上讀入內容,而是在線程之間傳遞數據,而傳輸的媒介為內存。

       管道主要包括了:

PipedOutputStreamPipedInputStreamPipedReaderPipedWriter四個,面向的處理內容為字節和字符。

public class PipedTest { static class Print implements Runnable { private PipedInputStream in; public Print(PipedInputStream in) { this.in = in; } @Override public void run() { int receive = 0; try { while ((receive = in.read()) != -1) { System.out.println(receive); } } catch (IOException ex) { ex.printStackTrace(); } } } /** * @param args */ public static void main(String[] args) throws Exception { PipedOutputStream out = new PipedOutputStream(); PipedInputStream in = new PipedInputStream(); // Out ==> In out.connect(in); Thread t = new Thread(new Print(in)); t.start(); int receive = 0; while ((receive = System.in.read()) != -1) { out.write(receive); } }}

        上述程序,以main線程作為輸入,而另外的Print作為輸出。對於Piped類型的流,必須要進行connect,如果沒有綁定,對於該流的訪問會拋出异常。

ThreadLocal

       ThreadLocal線程變量,這是一個以ThreadLocal對象為Key,一個Objectvalue的存儲結構。它被附帶在線程上,也就是說一個線程可以根據一個ThreadLocal擁有一個變量。

       在線程對象中,有一個成員變量,類型如下:

static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal k, Object v) { super(k); value = v; } }

        可以看到線程對象中的這個ThreadLocalMap是以ThreadLocal作為Key的。那麼對於一個ThreadLocal在線程對其調用get方法時,會獲取對應的Object,下面是get方法。

public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) return (T)e.value; } return setInitialValue(); }

       下面對這些代碼做些說明:

       首先調用方會獲得掉用線程Thread t = Thread.currentThread();

      其次會獲得線程對象的ThreadLocalMap對象;

      然後在ThreadLocalMap對象上,以this,也就是ThreadLocalkey去獲得對應的值;

      如果ThreadLocalMap這個對象為NULL,這裏做延遲加載,通過setInitialValue()方法來初始化線程對象的ThreadLocalMap變量。

      可以看出只有線程執行了任意ThreadLocalget方法後,才會擁有ThreadLocalMap這個對象,而該變量又是包訪問級別的,所以不會擔心被其他類修改。

完全等待超時

8

             有時我們需要在調用一個方法時等待一段時間(一般來說是設置一個值,有更改),等待條件的滿足,而等待是有時限的,比如:1000ms,如果在1000ms後無法滿足條件那麼返回,否則在時限內如果成功則立刻返回。

模式

        之前提到了基於wait的經典模式,即:同步,whilewaitdoXxx的邏輯,那麼這種模式無法做到一點,就是能够讓客戶端超時返回。

        如果加入超時的話,對於經典模式的修改其實不會很複雜,假設超時時間是t ms,那麼可以推知在now + t之後就會超時,則定義:

remaining = t;

future = now + t;

          這時僅需要wait(remaining)即可,在醒來之後會將future – now,這個會設置到remaining上,但是如果remaining為負數,則直接退出。

public synchronized Object get(long mills) throws InterruptedException { long future = System.currentTimeMillis() + mills; long remained = mills; // 當結果為空並沒有超時 while ((result == null) && remained > 0) { wait(remained); remained = future - System.currentTimeMillis(); } return result; }

         在while的判斷中加入了remained > 0的約束。這個模式就可以實現等待超時,在mills毫秒內無法獲取到result或者result已經獲取到了,都會返回。

使用實例與場景

        這裏我們模擬一個數據庫鏈接獲取的過程,這是一個消費者和生產者的案例。

         生產者每1000ms生產一個鏈接到池子中,每個消費者從池子中獲取一個鏈接,如果在800ms獲取不到,那麼就返回,並告知獲取鏈接超時。初始的池子裏有10個鏈接,消費者有5個,生產者有2個。

Connection的定義

public class Connection { public void sendStatement() { try { Thread.sleep(10); System.out.println(Thread.currentThread() + " Send Statement"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}

ConnectionPool的定義

public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<Connection>(); private static final int MAX_SIZE = 20; public ConnectionPool(int initialSize){ if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { pool.addLast(new Connection()); } } } public void releaseConnection() throws InterruptedException { synchronized (pool) { while (pool.size() >= MAX_SIZE) { pool.wait(); } // 添加後需要進行通知,這樣其他消費者能够感知到鏈接池中已經增加了一個鏈接 pool.addLast(new Connection()); pool.notifyAll(); } } public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool) { // 完全超時 if (mills <= 0) { while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); } else { long futureTime = System.currentTimeMillis() + mills; long deltaTime = mills; while (pool.isEmpty() && deltaTime > 0) { pool.wait(deltaTime); deltaTime = futureTime - System.currentTimeMillis(); } Connection result = null; if (!pool.isEmpty()) { result = pool.removeFirst(); } return result; } } }}

          這裏主要看一下fecthConnection,它提供了完全超時的實現,主要是通過計算出將要超時的時間點futureTime,和超時的時間距離deltaTime,在這個基礎上複用了僅點的同步、whiledo的結構,只不過是在while的不通過條件中增加了時間距離的消耗判斷,如果小於0直接返回,當然面對過早通知,將會更新deltaTime

           當執行從pool.wait方法中返回後,有可能是超時,也有可能是已經滿足了池中有連接的狀况,因此如果有連接則直接返回,否則返回空。

測試用例

public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool(10); static CountDownLatch latch = new CountDownLatch(1); /** * <pre> * Thread[Thread-5,5,main] put a connection. * Thread[Thread-6,5,main] put a connection. * Thread[Thread-4,5,main] got a connection * Thread[Thread-3,5,main] got a connection * Thread[Thread-5,5,main] put a connection. * Thread[Thread-6,5,main] put a connection. * Thread[Thread-1,5,main] got a connection * Thread[Thread-4,5,main] got a connection * </pre> * * @param args */ public static void main(String[] args) { for (int i = 0; i < 5; i++) { Consumer p = new Consumer(latch); Thread t = new Thread(p); t.start(); } for (int i = 0; i < 2; i++) { Producer p = new Producer(latch); Thread t = new Thread(p); t.start(); } latch.countDown(); } static class Producer implements Runnable { private CountDownLatch latch; public Producer(CountDownLatch latch){ this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { pool.releaseConnection(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + " put a connection."); } } } static class Consumer implements Runnable { private CountDownLatch latch; public Consumer(CountDownLatch latch){ this.latch = latch; } public void run() { try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { Connection connection = pool.fetchConnection(0); if (connection == null) { System.out.println(Thread.currentThread() + " can not got a connection"); } else { System.out.println(Thread.currentThread() + " got a connection"); } } catch (InterruptedException e) { e.printStackTrace(); } } } }}

這是一個執行了一段時間的結果:

Thread[Thread-5,5,main] put a connection.Thread[Thread-0,5,main] got a connectionThread[Thread-6,5,main] put a connection.Thread[Thread-0,5,main] got a connectionThread[Thread-6,5,main] put a connection.Thread[Thread-5,5,main] put a connection.Thread[Thread-4,5,main] got a connectionThread[Thread-5,5,main] put a connection.Thread[Thread-6,5,main] put a connection.Thread[Thread-4,5,main] got a connectionThread[Thread-0,5,main] got a connection

          可以看到,因為生產者少,所以每次生產連接後,都被等待的消費者取走,而超時是完全超時,如果我們吧等待的時間長度調整到2000ms,就可以看到如下結果:

Thread[Thread-6,5,main] put a connection.Thread[Thread-0,5,main] got a connectionThread[Thread-2,5,main] got a connectionThread[Thread-1,5,main] can not got a connectionThread[Thread-5,5,main] put a connection.Thread[Thread-6,5,main] put a connection.

         有部分消費者,等待了2000ms沒有得到連接後,就返回了,這裏就非常類似數據庫鏈接池的實現。

阻塞隊列(FIFO

         阻塞隊列是對於資源獲取和釋放的一個良好數據結構,比如:作為資源的生產方,如果生產方生產的數據沒有比特置存放,那麼生產方將會阻塞在生產的這個方法上,當然也可以選擇阻塞多少毫秒。消費方也是同樣的道理。

阻塞隊列

/** * @author weipeng 2012-7-24 下午4:34:22 */public class BlockingQueue<E> { /** * 默認隊列長度 */ private static final int DEFAULT_SIZE = 10; /** * 隊列數組 */ private Object[] array; /** * 當前的長度 */ private int size; /** * 將要放置的比特置 */ private int head; /** * 將要移除的比特置 */ private int tail; public BlockingQueue(int size){ array = size > 0 ? new Object[size] : new Object[DEFAULT_SIZE]; } public BlockingQueue(){ this(DEFAULT_SIZE); } public int getCapacity() { return array.length; } /** * @return */ public int getSize() { synchronized (array) { return size; } } @SuppressWarnings("unchecked") public E take(long millis) throws InterruptedException { long waitTime = millis > 0 ? millis : 0; synchronized (array) { Object result = null; if (waitTime == 0) { while (size <= 0) { array.wait(); } result = array[tail]; size--; tail = (tail + 1) % getCapacity(); } else { long future = System.currentTimeMillis() + waitTime; long remain = waitTime; while (size <= 0 && remain > 0) { array.wait(remain); remain = future - System.currentTimeMillis(); } if (size > 0) { result = array[tail]; size--; tail = (tail + 1) % getCapacity(); } } array.notifyAll(); return (E) result; } } public E take() throws InterruptedException { return take(0); } public boolean offer(E e, long mills) throws InterruptedException { long waitTime = mills > 0 ? mills : 0; boolean result = false; if (e != null) { synchronized (array) { if (waitTime <= 0) { while (size >= getCapacity()) { array.wait(); } array[head] = e; size++; head = (head + 1) % getCapacity(); result = true; } else { long future = System.currentTimeMillis() + waitTime; long remain = waitTime; while (size >= getCapacity() && remain > 0) { array.wait(remain); remain = future - System.currentTimeMillis(); } if (size < getCapacity()) { array[head] = e; size++; head = (head + 1) % getCapacity(); result = true; } } array.notifyAll(); } } return result; } public boolean offer(E e) throws InterruptedException { return offer(e, 0); } public void printQueue() { synchronized (array) { System.out.println("======================"); for (int i = 0; i < size; i++) { System.out.println("[" + i + "]" + array[i]); } System.out.println("[head]" + head); System.out.println("[tail] " + tail); System.out.println("[size]" + size); System.out.println("======================"); } }}

        其中 head是插入的比特置,tail是移除的比特置。下面是測試用例:

@Test public void offer() throws InterruptedException { for (int i = 0; i < 10; i++) { queue.offer(new Object()); } queue.printQueue(); System.out.println(queue.offer(new Object(), 1000)); }

輸出結果:

======================[0][email protected][1][email protected][2][email protected][3][email protected][4][email protected][5][email protected][6][email protected][7][email protected][8][email protected][9][email protected][head]0[tail] 0[size]10======================false

         可以看到第11次添加被阻塞了,在1秒內沒有添加成功,那麼直接返回false

@Test public void take() throws InterruptedException { Thread t = new Thread() { Thread thread; { thread = Thread.currentThread(); } @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } thread.interrupt(); } }; t.start(); System.out.println(queue.take(2000)); }

      結果是在2秒內,還沒有獲取到,主線程被中斷,而take能够感知到中斷,就提前返回了。

@Test public void interactive() throws Exception { final AtomicLong offer = new AtomicLong(); final AtomicLong take = new AtomicLong(); final AtomicLong notTake = new AtomicLong(); Thread t = new Thread() { public void run() { while (true) { try { queue.offer(new Object()); offer.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); } } } }; t.start(); Thread t1 = new Thread() { public void run() { while (true) { try { if (queue.take(1) == null) { notTake.incrementAndGet(); } else { take.incrementAndGet(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }; t1.start(); Thread t2 = new Thread() { public void run() { while (true) { try { if (queue.take(1) == null) { notTake.incrementAndGet(); } else { take.incrementAndGet(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }; t2.start(); Thread.sleep(10000); t.interrupt(); t1.interrupt(); t2.interrupt(); System.out.println(offer.get()); System.out.println(take.get()); System.out.println(notTake.get()); queue.printQueue(); }

           運行了10秒鐘,1個生產方,2個消費方,每個消費者在1ms內沒有獲取到的時候,就會將notTake1

           結果輸出:

java.lang.InterruptedException at java.lang.Object.wait(Native Method) at com.murdock.controller.BlockingQueue.take(BlockingQueue.java:74) at com.murdock.controller.BlockingQueueTest$3.run(BlockingQueueTest.java:81)java.lang.InterruptedException at java.lang.Object.wait(Native Method) at com.murdock.controller.BlockingQueue.take(BlockingQueue.java:74) at com.murdock.controller.BlockingQueueTest$4.run(BlockingQueueTest.java:99)java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at com.murdock.controller.BlockingQueue.offer(BlockingQueue.java:103) at com.murdock.controller.BlockingQueue.offer(BlockingQueue.java:137) at com.murdock.controller.BlockingQueueTest$2.run(BlockingQueueTest.java:65)

8828338

8828338

6283

======================

[head]8

[tail] 8

[size]0

======================

          可以看到有6283次沒有獲取到,生產了8828338次,消費了8828338次,一致的,但是有6283次沒有獲取到數據,因為超時返回了。

線程池(ThreadPool)

9

線程池技術簡介

       對於服務端的程序,經常處理的場景是:

       面對客戶端傳入的短小任務,快速的處理並返回。

       如果每次接受到一個任務,創建一個線程,然後進行執行,這種模式在原型階段是個不錯的選擇,但是如果面對的是成千上萬的任務遞交進服務器時,如果還是采用一個任務一個線程的方式,那麼將會創建數以萬記的線程,從而是操作系統進入到頻繁上下文切換的狀態,而如文中第一章所述,線程的創建和消亡是需要耗費系統資源的,這樣無疑是無法滿足要求的。

        而線程池技術能够很好的解决這個問題,它預先的創建了若幹的線程,也就是說線程的創建是托管的,並不能由用戶直接完全控制,從而使用固定或較為固定數目的線程來完成任務的執行,一方面消除了頻繁創建和消亡線程的開銷,另一方面,隨著任務的請求多少能够平緩的進行響應。

        在最優的狀態下,系統面臨大量的請求和較小的請求時,總體線程數量水平波動不大,當請求的規模變大時,響應處於平緩的劣化。

線程池的實現

線程池接口的定義

/** * @author weipeng */public interface ThreadPool<Job extends Runnable> { /** * <pre> * 執行一個Job,這個Job需要實現Runnable * * </pre> * * @param job */ void execute(Job job); /** * <pre> * 關閉線程池 * * </pre> */ void shutdown(); /** * <pre> * 增加工作線程 * * </pre> * * @param workerNum */ void addWorkers(int workerNum); /** * <pre> * 减少工作線程 * * </pre> * * @param workerNum */ void removeWorker(int workerNum); /** * <pre> * 得到Jobs的列錶 * * </pre> * * @return */ int getJobSize();}

       可以看到上面的接口可以完成一個Runnable的執行,並且能够將線程池中的工作線程進行增加和减少,同時可以支持優雅的關閉。

線程池的實現

/** * <pre> * 默認的線程池實現,可以新增工作線程也可以减少工作線程 * * 當然提交JOB後會進入隊列中,而Worker進行消費 * * 這是一個簡單的生產和消費者模式 * * </pre> * * @author weipeng * */public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { /** * 線程池最大限制數 */ private static final int MAX_WORKER_NUMBERS = 10; /** * 線程池默認的數量 */ private static final int DEFAULT_WORKER_NUMBERS = 5; /** * 線程池最小的數量 */ private static final int MIN_WORKER_NUMBERS = 1; /** * 這是一個工作列錶,將會向裏面插入工作 */ private final LinkedList<Job> jobs = new LinkedList<Job>(); /** * 工作者列錶 */ private final List<Worker> workers = Collections .synchronizedList(new ArrayList<Worker>()); /** * 工作者線程的數量 */ private int workerNum = DEFAULT_WORKER_NUMBERS; public DefaultThreadPool() { initializeWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; initializeWokers(workerNum); } /* * (non-Javadoc) * * @see * com.murdock.books.multithread.example.ThreadPool#execute(java.lang.Runnable * ) */ @Override public void execute(Job job) { if (job != null) { // 添加一個工作,然後進行通知 synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } /* * (non-Javadoc) * * @see com.murdock.books.multithread.example.ThreadPool#shutdown() */ @Override public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } @Override public void addWorkers(int workerNum) { int addedNum = workerNum; if (workerNum + this.workerNum > MAX_WORKER_NUMBERS) { addedNum = MAX_WORKER_NUMBERS - this.workerNum; } synchronized (jobs) { initializeWokers(addedNum); this.workerNum = this.workerNum + addedNum; } } @Override public void removeWorker(int workerNum) { if (workerNum >= this.workerNum) { throw new IllegalArgumentException( "can not remove beyond workerNum. now num is " + this.workerNum); } synchronized (jobs) { int count = 0; while (count < workerNum) { workers.get(count).shutdown(); count++; } this.workerNum = this.workerNum - count; } } @Override public int getJobSize() { return jobs.size(); } /** * 初始化線程工作者 */ private void initializeWokers(int num) { for (int i = 0; i < num; i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker); thread.start(); } } /** * <pre> * 工作者,負責消費任務 * * </pre> */ class Worker implements Runnable { /** * 工作 */ private volatile boolean running = true; @Override public void run() { while (running) { Job job = null; synchronized (jobs) { // 如果工作者列錶是空的,那麼就wait,放弃cpu執行占用 while (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; } } // 取出一個Job job = jobs.removeFirst(); } if (job != null) { try { job.run(); } catch (Exception ex) { ex.printStackTrace(); } } } } public void shutdown() { running = false; } }}

          上面的邏輯中,客戶端調用execute時,會不斷的向jobs中添加工作,而每個Worker在不斷將jobs取出並執行,當jobs為空時,Worker進行阻塞狀態。

          這裏有一點需要注意,也就是execute時,使用了notify,而不是notifyAll,因為我能够確定有消費者Worker被喚醒,這時使用notify將會比notifyAll獲得更小的開銷,這在高性能的並發處理中是非常重要的。

測試用例

測試提交工作
@Testpublic void testExe() { for (int i = 0; i < 1000; i++) { threadPoolNoPrint.execute(new NoPrint()); } sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(5000); System.out.println(threadPoolNoPrint.getJobSize()); }

執行結果:

991

985

980

        可以看到提交後,每個20ms,查看已經堆積的任務,發現在不斷的减少。

測試增加工作線程
@Test public void addExe() { for (int i = 0; i < 1000; i++) { threadPoolNoPrint.execute(new NoPrint()); } sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); System.out.println("============Add Worker============"); threadPoolNoPrint.addWorkers(5); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(5000); System.out.println(threadPoolNoPrint.getJobSize()); }

執行結果:

990

985

980

============Add Worker============

980

967

955

           在起初的5個線程運作時,可以看到每隔一段時間,消耗了5個工作,而增加了線程(並發度增加)後,沒個間隔消耗量12個左右工作,提昇了1倍多。

减少工作線程
@Test public void reduceExe() { for (int i = 0; i < 1000; i++) { threadPoolNoPrint.execute(new NoPrint()); } sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); System.out.println("============Add Worker============"); threadPoolNoPrint.addWorkers(5); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); System.out.println("==============Reduce Worker=============="); threadPoolNoPrint.removeWorker(7); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(20); System.out.println(threadPoolNoPrint.getJobSize()); sleep(5000); System.out.println(threadPoolNoPrint.getJobSize()); }

執行結果:

990

985

980

============Add Worker============

980

965

955

==============Reduce Worker==============

955

952

949

         可以看到5個線程開始執行,然後增加到了10個,最後减少到了3個,執行的單比特時間完成工作出現了先上揚再回落的過程。

關閉線程池
@Test public void gracefulShutdown() { for (int i = 0; i < 1000; i++) { threadPoolPrint.execute(new Print()); } sleep(50); threadPoolPrint.shutdown(); }

執行結果:

Thread[Thread-1,5,main], time=1347615521118Thread[Thread-3,5,main], time=1347615521118Thread[Thread-0,5,main], time=1347615521118Thread[Thread-4,5,main], time=1347615521118Thread[Thread-2,5,main], time=1347615521118Thread[Thread-1,5,main], time=1347615521124Thread[Thread-4,5,main], time=1347615521124Thread[Thread-0,5,main], time=1347615521124Thread[Thread-3,5,main], time=1347615521124Thread[Thread-2,5,main], time=1347615521124Thread[Thread-1,5,main], time=1347615521129Thread[Thread-3,5,main], time=1347615521129Thread[Thread-0,5,main], time=1347615521129Thread[Thread-4,5,main], time=1347615521129Thread[Thread-2,5,main], time=1347615521129Thread[Thread-1,5,main], time=1347615521134Thread[Thread-3,5,main], time=1347615521134Thread[Thread-0,5,main], time=1347615521135Thread[Thread-4,5,main], time=1347615521135Thread[Thread-2,5,main], time=1347615521135Thread[Thread-1,5,main], time=1347615521140Thread[Thread-3,5,main], time=1347615521140Thread[Thread-0,5,main], time=1347615521140Thread[Thread-4,5,main], time=1347615521140Thread[Thread-2,5,main], time=1347615521140Thread[Thread-1,5,main], time=1347615521145Thread[Thread-3,5,main], time=1347615521145Thread[Thread-0,5,main], time=1347615521145Thread[Thread-4,5,main], time=1347615521145Thread[Thread-2,5,main], time=1347615521145Thread[Thread-1,5,main], time=1347615521150Thread[Thread-3,5,main], time=1347615521150Thread[Thread-0,5,main], time=1347615521150Thread[Thread-4,5,main], time=1347615521151Thread[Thread-2,5,main], time=1347615521151Thread[Thread-1,5,main], time=1347615521155Thread[Thread-3,5,main], time=1347615521156Thread[Thread-0,5,main], time=1347615521156Thread[Thread-4,5,main], time=1347615521156Thread[Thread-2,5,main], time=1347615521156Thread[Thread-1,5,main], time=1347615521161Thread[Thread-3,5,main], time=1347615521161Thread[Thread-0,5,main], time=1347615521161Thread[Thread-2,5,main], time=1347615521161Thread[Thread-4,5,main], time=1347615521161Thread[Thread-1,5,main], time=1347615521166Thread[Thread-3,5,main], time=1347615521166Thread[Thread-0,5,main], time=1347615521166Thread[Thread-4,5,main], time=1347615521167Thread[Thread-2,5,main], time=1347615521166

          可以看到1000個工作,在50ms後消耗了上圖所示的工作,而非1000個全部,整個關閉過程沒有异常發生,俗稱“優雅關閉”。

一個基於線程池的簡單文本web服務器

          我們將一個Http請求作為一個工作,提交到線程池中,然後由線程池的工作者來完成對請求的分析以及響應的回複,這樣做能够極大的提昇服務的效率,這也是傳統、經典的Web服務器運作方式。

/** * */package com.murdock.books.multithread.example;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.ServerSocket;import java.net.Socket;/** * <pre> * 請求: * GET /p/1845211588 HTTP/1.1 * * 響應: * HTTP/1.1 200 OK * Date: Fri, 14 Sep 2012 11:39:26 GMT * Content-Type: text/html; charset=GBK * Transfer-Encoding: chunked * Connection: Keep-Alive * Vary: Accept-Encoding * tracecode: 23665957650539960842091419, 23665874971177305354091419 * Content-Encoding: gzip * Server: Apache * </pre> * * @author weipeng * */public class HttpTextServer { static ThreadPool<TextHandler> threadPool = new DefaultThreadPool<TextHandler>( 10); static String basePath = "/home/weipeng/project/multithread"; public static void main(String[] args) throws Exception { ServerSocket ss = new ServerSocket(8080); Socket socket = null; while ((socket = ss.accept()) != null) { threadPool.execute(new TextHandler(socket)); } ss.close(); } static class TextHandler implements Runnable { private Socket socket; public TextHandler(Socket socket) { this.socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; try { reader = new BufferedReader(new InputStreamReader( socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" ")[1]; br = new BufferedReader(new InputStreamReader( new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200 OK"); out.println("Content-Type: text/html; charset=UTF-8"); out.println("Server: SimpleMolly"); out.println(""); while ((line = br.readLine()) != null) { out.println(line); } out.println("CURRENT-THREAD ===> " + Thread.currentThread()); out.flush(); } catch (Exception ex) { ex.printStackTrace(); } finally { if (br != null) { try { br.close(); } catch (Exception ex) { ex.printStackTrace(); } finally { br = null; } } if (reader != null) { try { reader.close(); } catch (Exception ex) { ex.printStackTrace(); } finally { reader = null; } } if (out != null) { try { out.close(); } catch (Exception ex) { ex.printStackTrace(); } finally { out = null; } } if (socket != null) { try { socket.close(); } catch (Exception ex) { ex.printStackTrace(); } finally { socket = null; } } } } }}

        實現簡介:

1)服務端監聽8080端口;

2)當一個socket鏈接上來後,將其放置入線程池;

3)線程池中的worker也就是TextHandlersocket中獲取需要訪問的資源;

4)根據資源的路徑找到資源並讀取同時輸出到socket的輸出流;

5)關閉輸出流和相關資源。

       訪問效果:

第一次訪問:

10

第二次訪問:

11

          可以看到一個線程2提供的服務,一個是線程3的,證明是多個線程交替的提供服務。

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: Java並發編程【1.2時代】

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071617389032.html