《Netty 權威指南》—— NIO創建的TimeServer源碼分析

杜老師說 2022-01-07 12:21:22 阅读数:225

netty 指南 nio timeserver 分析

聲明:本文是《Netty 權威指南》的樣章,感謝博文視點授權並發編程網站發布樣章,禁止以任何形式轉載此文。

我們將在TimeServer例程中給出完整的NIO創建的時間服務器源碼:

public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); New Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); }}

 

我們對NIO創建的TimeServer進行下簡單分析,8-15行跟之前的一樣,設置監聽端口。16-17行創建了一個被稱為MultiplexerTimeServer的多路複用類,它是個一個獨立的線程,負責輪詢多路複用器Selctor,可以處理多個客戶端的並發接入,現在我們繼續看MultiplexerTimeServer的源碼:

public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、綁定監聽端口 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路複用器關閉後,所有注册在上面的Channel和Pipe等資源都會被自動去注册並關閉,所以不需要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0字節,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } }}

由於這個類相比於傳統的Socket編程稍微複雜一些,在此我們進行詳細分析,我們從如下幾個關鍵步驟講解多路複用處理類:

14-26行為構造方法,在構造方法中進行資源初始化,創建多路複用器Selector、ServerSocketChannel,對Channel和TCP參數進行配置,例如將ServerSocketChannel設置為异步非阻塞模式,它的backlog設置為1024。系統資源初始化成功後將ServerSocketChannel注册到Selector,監聽SelectionKey.OP_ACCEPT操作比特;如果資源初始化失敗,例如端口被占用則退出

39-61行在線程的run方法的while循環體中循環遍曆selector,它的休眠時間為1S,無論是否有讀寫等事件發生,selector每隔1S都被喚醒一次,selector也提供了一個無參的select方法。當有處於就緒狀態的Channel時,selector將返回就緒狀態的Channel的SelectionKey集合,我們通過對就緒狀態的Channel集合進行迭代,就可以進行網絡的异步讀寫操作

76-83行處理新接入的客戶端請求消息,根據SelectionKey的操作比特進行判斷即可獲知網絡事件的類型,通過ServerSocketChannel的accept接收客戶端的連接請求並創建SocketChannel實例,完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。注意,我們需要將新創建的SocketChannel設置為异步非阻塞,同時也可以對其TCP參數進行設置,例如TCP接收和發送緩沖區的大小等,作為入門的例子,例程沒有進行額外的參數設置

84-109行用於讀取客戶端的請求消息,首先創建一個ByteBuffer,由於我們事先無法得知客戶端發送的碼流大小,作為例程,我們開辟一個1M的緩沖區。然後調用SocketChannel的read方法讀取請求碼流,注意,由於我們已經將SocketChannel設置為异步非阻塞模式,因此它的read是非阻塞的。使用返回值進行判斷,看讀取到的字節數,返回值有三種可能的結果:

1)      返回值大於0:讀到了字節,對字節進行編解碼;

2)      返回值等於0:沒有讀取到字節,屬於正常場景,忽略;

3)      返回值為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源。

當讀取到碼流以後,我們進行解碼,首先對readBuffer進行flip操作,它的作用是將緩沖區當前的limit設置為position,position設置為0,用於後續對緩沖區的讀取操作。然後根據緩沖區可讀的字節個數創建字節數組,調用ByteBuffer的get操作將緩沖區可讀的字節數組拷貝到新創建的字節數組中,最後調用字符串的構造函數創建請求消息體並打印。如果請求指令是”QUERY TIME ORDER”則把服務器的當前時間編碼後返回給客戶端,下面我們看看如果异步發送應答消息給客戶端。

111-119行將應答消息异步發送給客戶端,我們看下關鍵代碼,首先將字符串編碼成字節數組,根據字節數組的容量創建ByteBuffer,調用ByteBuffer的put操作將字節數組拷貝到緩沖區中,然後對緩沖區進行flip操作,最後調用SocketChannel的write方法將緩沖區中的字節數組發送出去。需要指出的是,由於SocketChannel是异步非阻塞的,它並不保證一次能够把需要發送的字節數組發送完,此時會出現“寫半包”問題,我們需要注册寫操作,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢,可以通過ByteBuffer的hasRemain()方法判斷消息是否發送完成。此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景,後續的章節會有詳細說明。

使用NIO創建TimeServer服務器完成之後,我們繼續學習如何創建NIO客戶端。首先還是通過時序圖了解關鍵步驟和過程,然後結合代碼進行詳細分析。

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: 《Netty 權威指南》—— NIO創建的TimeServer源碼分析

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071221216688.html