跟我學Kafka之NIO通信機制

杜老師說 2022-01-07 06:48:10 阅读数:661

跟我 kafka nio 通信

很久沒有做技術方面的分享了,今天閑來有空寫一篇關於Kafka通信方面的文章與大家共同學習。

一、Kafka通信機制的整體結構

這個圖采用的就是我們之前提到的SEDA多線程模型,鏈接如下:
http://www.jianshu.com/p/e184fdc0ade4
1、對於broker來說,客戶端連接數量有限,不會頻繁新建大量連接。因此一個Acceptor thread線程處理新建連接綽綽有餘。
2、Kafka高吐吞量,則要求broker接收和發送數據必須快速,因此用proccssor thread線程池處理,並把讀取客戶端數據轉交給緩沖區,不會導致客戶端請求大量堆積。
3、Kafka磁盤操作比較頻繁會且有io阻塞或等待,IO Thread線程數量一般設置為proccssor thread num兩倍,可以根據運行環境需要進行調節。

二、SocketServer整體設計時序圖

Kafka 通信時序圖.jpg

說明:

Kafka SocketServer是基於Java NIO來開發的,采用了Reactor的模式,其中包含了1個Acceptor負責接受客戶端請求,N個Processor線程負責讀寫數據,M個Handler來處理業務邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列來緩沖請求。

下面我們就針對以上整體設計思路分開講解各個不同部分的源代碼。

2.1 啟動初始化工作

def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") }

說明:

ConnectionQuotas對象負責管理連接數/IP, 創建一個Acceptor偵聽者線程,初始化N個Processor線程,processors是一個線程數組,可以作為線程池使用,默認是三個,Acceptor線程和N個Processor線程中每個線程都獨立創建Selector.open()多路複用器,相關代碼在下面:

val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue));val serverChannel = openServerSocket(host, port);

範圍可以設定從1到Int的最大值。

2.2 Acceptor線程

def run() { serverChannel.register(selector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 while(isRunning) { val ready = selector.select(500) if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } debug("Closing server socket and selector.") swallowError(serverChannel.close()) swallowError(selector.close()) shutdownComplete() }

2.1.1 注册OP_ACCEPT事件

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

2.1.2 內部邏輯

此處采用的是同步非阻塞邏輯,每隔500MS輪詢一次,關於同步非阻塞的知識點在http://www.jianshu.com/p/e9c6690c0737
當有請求到來的時候采用輪詢的方式獲取一個Processor線程處理請求,代碼如下:

currentProcessor = (currentProcessor + 1) % processors.length

之後將代碼添加到newConnections隊列之後返回,代碼如下:

def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup()}//newConnections是一個線程安全的隊列,存放SocketChannel通道private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

2.3 kafka.net.Processor

override def run() { startupComplete() while(isRunning) { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() val startSelectTime = SystemTime.nanoseconds val ready = selector.select(300) currentTimeNanos = SystemTime.nanoseconds val idleTime = currentTimeNanos - startSelectTime idleMeter.mark(idleTime) // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. aggregateIdleMeter.mark(idleTime / totalProcessorThreads) trace("Processor id " + id + " selection time = " + idleTime + " ns") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null try { key = iter.next iter.remove() if(key.isReadable) read(key) else if(key.isWritable) write(key) else if(!key.isValid) close(key) else throw new IllegalStateException("Unrecognized key state for processor thread.") } catch { case e: EOFException => { info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress)) close(key) } case e: InvalidRequestException => { info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) close(key) } case e: Throwable => { error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) close(key) } } } } maybeCloseOldestConnection } debug("Closing selector.") closeAll() swallowError(selector.close()) shutdownComplete() }

先來重點看一下configureNewConnections這個方法:

private def configureNewConnections() { while(newConnections.size() > 0) { val channel = newConnections.poll() debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) channel.register(selector, SelectionKey.OP_READ) } }

循環判斷NewConnections的大小,如果有值則彈出,並且注册為OP_READ讀事件。
再回到主邏輯看一下read方法。

def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } }

說明

1、把當前SelectionKey和事件循環時間放入LRU映射錶中,將來檢查時回收連接資源。
2、建立BoundedByteBufferReceive對象,具體讀取操作由這個對象的readFrom方法負責進行,返回讀取的字節大小。

  • 如果讀取完成,則修改狀態為receive.complete,並通過requestChannel.sendRequest(req)將封裝好的Request對象放到RequestQueue隊列中。
  • 如果沒有讀取完成,則讓selector繼續偵聽OP_READ事件。

2.4 kafka.server.KafkaRequestHandler

def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300) val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }

說明

KafkaRequestHandler也是一個事件處理線程,不斷的循環讀取requestQueue隊列中的Request請求數據,其中超時時間設置為300MS,並將請求發送到apis.handle方法中處理,並將請求響應結果放到responseQueue隊列中去。
代碼如下:

try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }

說明如下:

參數 說明 對應方法
RequestKeys.ProduceKey producer請求 ProducerRequest
RequestKeys.FetchKey consumer請求 FetchRequest
RequestKeys.OffsetsKey topic的offset請求 OffsetRequest
RequestKeys.MetadataKey topic元數據請求 TopicMetadataRequest
RequestKeys.LeaderAndIsrKey leader和isr信息更新請求 LeaderAndIsrRequest
RequestKeys.StopReplicaKey 停止replica請求 StopReplicaRequest
RequestKeys.UpdateMetadataKey 更新元數據請求 UpdateMetadataRequest
RequestKeys.ControlledShutdownKey controlledShutdown請求 ControlledShutdownRequest
RequestKeys.OffsetCommitKey commitOffset請求 OffsetCommitRequest
RequestKeys.OffsetFetchKey consumer的offset請求 OffsetFetchRequest

2.5 Processor響應數據處理

private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { val key = curr.request.requestKey.asInstanceOf[SelectionKey] curr.responseAction match { case RequestChannel.SendAction => { key.interestOps(SelectionKey.OP_WRITE) key.attach(curr) } } curr = requestChannel.receiveResponse(id) } }

我們回到Processor線程類中,processNewRequest()方法是發送請求,那麼會調用processNewResponses()來處理Handler提供給客戶端的Response,把requestChannel中responseQueue的Response取出來,注册OP_WRITE事件,將數據返回給客戶端。

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: 跟我學Kafka之NIO通信機制

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201070648099299.html