Disruptor入門

杜老師說 2022-01-07 11:54:04 阅读数:887

disruptor

翻譯自Disruptor git庫教程   英文地址

獲得Disruptor

可以通過Maven或者下載jar來安裝Disruptor。只要把對應的jar放在Java classpath就可以了。

基本的事件生產和消費

我們從一個簡單的例子開始學習Disruptor:生產者傳遞一個long類型的值給消費者,而消費者消費這個數據的方式僅僅是把它打印出來。首先聲明一個Event來包含需要傳遞的數據:

public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } } 

由於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。

public class LongEventFactory implements EventFactory { 
 @Override public Object newInstance() { return new LongEvent(); } } 

我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:

/** */public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } } 

事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布):

 

public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * onData用來發布事件,每調用一次就發布一次事件事件 * 它的參數會通過事件傳遞給消費者 * * @param bb */public void onData(ByteBuffer bb) { //可以把ringBuffer看做一個事件隊列,那麼next就是得到下面一個事件槽 long sequence = ringBuffer.next();try { //用上面的索引取出一個空的事件用於填充 LongEvent event = ringBuffer.get(sequence);// for the sequence event.setValue(bb.getLong(0)); } finally { //發布事件 ringBuffer.publish(sequence); } } } 

很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。發布事件最少需要兩步:獲取下一個事件槽並發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。如果我們使用RingBuffer.next()獲取一個事件槽,那麼一定要發布對應的事件。如果不能發布事件,那麼就會引起Disruptor狀態的混亂。尤其是在多個事件生產者的情况下會導致事件消費者失速,從而不得不重啟應用才能會恢複。

Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件。

 

public class LongEventProducerWithTranslator { //一個translator可以看做一個事件初始化器,publicEvent方法會調用它 //填充Event private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.setValue(bb.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } } 

上面寫法的另一個好處是,Translator可以分離出來並且更加容易單元測試。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產生一個Translator對象。很明顯,Translator中方法的參數是通過RingBuffer來傳遞的。

 

最後一步就是把所有的代碼組合起來完成一個完整的事件處理系統。Disruptor在這方面做了簡化,使用了DSL風格的代碼(其實就是按照直觀的寫法,不太能算得上真正的DSL)。雖然DSL的寫法比較簡單,但是並沒有提供所有的選項。如果依靠DSL已經可以處理大部分情况了。

 

public class LongEventMain { public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); } } } 

使用Java 8

Disruptor在自己的接口裏面添加了對於Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。

public class LongEventMainJava8 { /** * 用lambda錶達式來注册EventHandler和EventProductor * @param args * @throws InterruptedException */public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // 可以使用lambda來注册一個EventHandler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue())); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); Thread.sleep(1000); } } } 

在上面的代碼中,有很多自定義類型可以被省略了。還有注意的是:publishEvent方法中僅調用傳遞給它的參數,並不是直接調用對應的對象。如果把這段代碼換成下面的代碼:

 

ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){ bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000);}

這段代碼中有一個捕獲參數的lambda,意味著在lambda錶達式生成的內部類中會生成一個對象來存儲這個捕獲的bb對象。這會增加不必要的GC。所以在需要較低GC水平的情况下最好把所有的參數都通過publishEvent傳遞。

 

由於在Java 8中方法引用也是一個lambda,因此還可以把上面的代碼改成下面的代碼:

public class LongEventWithMethodRef { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event.getValue()); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); Thread.sleep(1000); } } } 

 

基本調整選項

上面的代碼已經可以處理大多數的情况了,但是在有的時候還是會需要根據不同的軟件或者硬件來調整選項以獲得更高的性能。基本的選項有兩個:單或者多生產者模式和可選的等待策略。

單或多 事件生產者

在並發系統中提高性能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的代碼中僅僅有一個事件生產者,那麼可以設置為單一生產者模式來提高系統的性能。

 

public class singleProductorLongEventMain { public static void main(String[] args) throws Exception { //.....// Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, // Single producernew BlockingWaitStrategy(), executor);//..... } } 

為了證明,下面的數據是從Mac Air i7上面測試的結果:

多生產者:

 

Run 0, Disruptor=26,553,372 ops/secRun 1, Disruptor=28,727,377 ops/secRun 2, Disruptor=29,806,259 ops/secRun 3, Disruptor=29,717,682 ops/secRun 4, Disruptor=28,818,443 ops/secRun 5, Disruptor=29,103,608 ops/secRun 6, Disruptor=29,239,766 ops/sec

單生產者:

Run 0, Disruptor=89,365,504 ops/secRun 1, Disruptor=77,579,519 ops/secRun 2, Disruptor=78,678,206 ops/secRun 3, Disruptor=80,840,743 ops/secRun 4, Disruptor=81,037,277 ops/secRun 5, Disruptor=81,168,831 ops/secRun 6, Disruptor=81,699,346 ops/sec

 

 可選的等待策略

Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控制線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。

SleepingWaitStrategy

和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產線程只需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情况下。比如异步日志功能。

YieldingWaitStrategy

YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在减低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他准備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。

BusySpinWaitStrategy

BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。

 

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: Disruptor入門

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071154042052.html