一個通用並發對象池的實現

杜老師說 2022-01-07 13:10:33 阅读数:234

通用

原文鏈接譯文鏈接,原文作者: Sarma Swaranga,本文最早發錶於deepinmind,校對:鄭旭東

這篇文章裏我們主要討論下如何在Java裏實現一個對象池。最近幾年,Java虛擬機的性能在各方面都得到了極大的提昇,因此對大多數對象而言,已經沒有必要通過對象池來提高性能了。根本的原因是,創建一個新的對象的開銷已經不像過去那樣昂貴了。

然而,還是有些對象,它們的創建開銷是非常大的,比如線程,數據庫連接等這些非輕量級的對象。在任何一個應用程序裏面,我們肯定會用到不止一個這樣的對象。如果有一種很方便的創建管理這些對象的池,使得這些對象能够動態的重用,而客戶端代碼也不用關心它們的生命周期,還是會很給力的。

在真正開始寫代碼前,我們先來梳理下一個對象池需要完成哪些功能。

  • 如果有可用的對象,對象池應當能返回給客戶端。
  • 客戶端把對象放回池裏後,可以對這些對象進行重用。
  • 對象池能够創建新的對象來滿足客戶端不斷增長的需求。
  • 需要有一個正確關閉池的機制來確保關閉後不會發生內存泄露。

不用說了,上面幾點就是我們要暴露給客戶端的連接池的接口的基本功能。

我們的聲明的接口如下:

package com.test.pool;/** * Represents a cached pool of objects. * * @author Swaranga * * @param <T> the type of object to pool. */public interface Pool<T>{ /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown();}

為了能够支持任意對象,上面這個接口故意設計得很簡單通用。它提供了從池裏獲取/返回對象的方法,還有一個關閉池的機制,以便釋放對象。

現在我們來實現一下這個接口。開始動手之前,值得一提的是,一個理想的release方法應該先嘗試檢查下這個客戶端返回的對象是否還能重複使用。如果是的話再把它扔回池裏,如果不是,就舍弃掉這個對象。我們希望這個Pool接口的所有實現都能遵循這個規則。在開始具體的實現類前,我們先創建一個抽象類,以便限制後續的實現能遵循這點。我們實現的抽象類就叫做AbstractPool,它的定義如下:

package com.test.pool;/** * Represents an abstract pool, that defines the procedure * of returning an object to the pool. * * @author Swaranga * * @param <T> the type of pooled objects. */abstract class AbstractPool <T> implements Pool <T>{ /** * Returns the object to the pool. * The method first validates the object if it is * re-usable and then puts returns it to the pool. * * If the object validation fails, * some implementations * will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation * */ @Override public final void release(T t) { if(isValid(t)) { returnToPool(t); } else { handleInvalidReturn(t); } } protected abstract void handleInvalidReturn(T t); protected abstract void returnToPool(T t); protected abstract boolean isValid(T t);}

在上面這個類裏,我們讓對象池必須得先驗證對象後才能把它放回到池裏。具體的實現可以自由選擇如何實現這三種方法,以便定制自己的行為。它們根據自己的邏輯來决定如何判斷一個對象有效,無效的話應該怎麼處理(handleInvalidReturn方法),怎麼把一個有效的對象放回到池裏(returnToPool方法)。

有了上面這幾個類,我們就可以著手開始具體的實現了。不過還有個問題,由於上面這些類是設計成能支持通用的對象池的,因此具體的實現不知道該如何驗證對象的有效性(因為對象都是泛型的)。因此我們還需要些別的東西來幫助我們完成這個。

我們需要一個通用的方法來完成對象的校驗,而具體的實現不必關心對象是何種類型。因此我們引入了一個新的接口,Validator,它定義了驗證對象的方法。這個接口的定義如下:

package com.test.pool; /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); }

上面這個接口定義了一個檢驗對象的方法,以及一個把對象置為無效的方法。當准備廢弃一個對象並清理內存的時候,invalidate方法就派上用場了。值得注意的是這個接口本身沒有任何意義,只有當它在對象池裏使用的時候才有意義,所以我們把這個接口定義到Pool接口裏面。這和Java集合庫裏的Map和Map.Entry是一樣的。所以我們的Pool接口就成了這樣:

package com.test.pool;/** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */public interface Pool< T >{ /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); }}

准備工作已經差不多了,在最後開始前我們還需要一個終極武器,這才是這個對象池的殺手鐧。就是“能够創建新的對象”。我們的對象池是泛型的,因此它們得知道如何去生成新的對象來填充這個池子。這個功能不能依賴於對象池本身,必須要有一個通用的方式來創建新的對象。通過一個ObjectFactory的接口就能完成這個,它只有一個“如何創建新的對象”的方法。我們的ObjectFactory接口如下:

package com.test.pool;/** * Represents the mechanism to create * new objects to be used in an object pool. * * @author Swaranga * * @param < T > the type of object to create. */public interface ObjectFactory < T >{ /** * Returns a new instance of an object of type T. * * @return T an new instance of the object of type T */ public abstract T createNew();}

我們的工具類都已經搞定了,現在可以開始真正實現我們的Pool接口了。因為我們希望這個池能在並發程序裏面使用,所以我們會創建一個阻塞的對象池,當沒有對象可用的時候,讓客戶端先阻塞住。我們的阻塞機制是讓客戶端一直阻塞直到有對象可用為止。這樣的話導致我們還需要再增加一個只阻塞一定時間的方法,如果在超時時間到來前有對象可用則返回,如果超時了就返回null而不是一直等待下去。這樣的實現有點類似Java並發庫裏的LinkedBlockingQueue,因此真正實現前我們再暴露一個接口,BlockingPool,類似於Java並發庫裏的BlockingQueue接口。

這裏是BlockingQueue的聲明:

package com.test.pool;import java.util.concurrent.TimeUnit;/** * Represents a pool of objects that makes the * requesting threads wait if no object is available. * * @author Swaranga * * @param < T > the type of objects to pool. */public interface BlockingPool < T > extends Pool < T >{ /** * Returns an instance of type T from the pool. * * The call is a blocking call, * and client threads are made to wait * indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * sets the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * @return T an instance of the Object * of type T from the pool. */ T get(); /** * Returns an instance of type T from the pool, * waiting up to the * specified wait time if necessary * for an object to become available.. * * The call is a blocking call, * and client threads are made to wait * for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * set the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * * @param time amount of time to wait before giving up, * in units of <tt>unit</tt> * @param unit a <tt>TimeUnit</tt> determining * how to interpret the * <tt>timeout</tt> parameter * * @return T an instance of the Object * of type T from the pool. * * @throws InterruptedException * if interrupted while waiting */ T get(long time, TimeUnit unit) throws InterruptedException;}

BoundedBlockingPool的實現如下:

package com.test.pool;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public final class BoundedBlockingPool extends <AbstractPool> implements <BlockingPool>{ private int size; private BlockingQueue objects; private Validator validator; private ObjectFactory objectFactory; private ExecutorService executor = Executors.newCachedThreadPool(); private volatile boolean shutdownCalled; public BoundedBlockingPool( int size, Validator validator, ObjectFactory objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedBlockingQueue (size); initializeObjects(); shutdownCalled = false; } public T get(long timeOut, TimeUnit unit) { if(!shutdownCalled) { T t = null; try { t = objects.poll(timeOut, unit); return t; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown'); } public T get() { if(!shutdownCalled) { T t = null; try { t = objects.take(); } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown'); } public void shutdown() { shutdownCalled = true; executor.shutdownNow(); clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { if(validator.isValid(t)) { executor.submit(new ObjectReturner(objects, t)); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } } private class ObjectReturner implements <Callable> { private BlockingQueue queue; private E e; public ObjectReturner(BlockingQueue queue, E e) { this.queue = queue; this.e = e; } public Void call() { while(true) { try { queue.put(e); break; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } } return null; } }}

上面是一個非常基本的對象池,它內部是基於一個LinkedBlockingQueue來實現的。這裏唯一比較有意思的方法就是returnToPool。因為內部的存儲是一個LinkedBlockingQueue實現的,如果我們直接把返回的對象扔進去的話,如果隊列已滿可能會阻塞住客戶端。不過我們不希望客戶端因為把對象放回池裏這麼個普通的方法就阻塞住了。所以我們把最終將對象插入到隊列裏的任務作為一個异步的的任務提交給一個Executor來執行,以便讓客戶端線程能立即返回。

現在我們將在自己的代碼中使用上面這個對象池,用它來緩存數據庫連接。我們需要一個校驗器來驗證數據庫連接是否有效。

下面是這個JDBCConnectionValidator:

package com.test;import java.sql.Connection;import java.sql.SQLException;import com.test.pool.Pool.Validator;public final class JDBCConnectionValidator implements Validator < Connection >{ public boolean isValid(Connection con) { if(con == null) { return false; } try { return !con.isClosed(); } catch(SQLException se) { return false; } } public void invalidate(Connection con) { try { con.close(); } catch(SQLException se) { } }}

還有一個JDBCObjectFactory,它將用來生成新的數據庫連接對象:

package com.test;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import com.test.pool.ObjectFactory;public class JDBCConnectionFactory implements ObjectFactory < Connection >{ private String connectionURL; private String userName; private String password; public JDBCConnectionFactory( String driver, String connectionURL, String userName, String password) { super(); try { Class.forName(driver); } catch(ClassNotFoundException ce) { throw new IllegalArgumentException('Unable to find driver in classpath', ce); } this.connectionURL = connectionURL; this.userName = userName; this.password = password; } public Connection createNew() { try { return DriverManager.getConnection( connectionURL, userName, password); } catch(SQLException se) { throw new IllegalArgumentException('Unable to create new connection', se); } }}

現在我們用上述的Validator和ObjectFactory來創建一個JDBC的連接池:

package com.test;import java.sql.Connection;import com.test.pool.Pool;import com.test.pool.PoolFactory;public class Main{ public static void main(String[] args) { Pool < Connection > pool = new BoundedBlockingPool < Connection > ( 10, new JDBCConnectionValidator(), new JDBCConnectionFactory('', '', '', '') ); //do whatever you like }}

為了犒勞下能讀完整篇文章的讀者,我這再提供另一個非阻塞的對象池的實現,這個實現和前面的唯一不同就是即使對象不可用,它也不會讓客戶端阻塞,而是直接返回null。具體的實現在這:

package com.test.pool;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.Semaphore;public class BoundedPool < T > extends AbstractPool < T >{ private int size; private Queue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private Semaphore permits; private volatile boolean shutdownCalled; public BoundedPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super(); this.objectFactory = objectFactory; this.size = size; this.validator = validator; objects = new LinkedList < T >(); initializeObjects(); shutdownCalled = false; } @Override public T get() { T t = null; if(!shutdownCalled) { if(permits.tryAcquire()) { t = objects.poll(); } } else { throw new IllegalStateException('Object pool already shutdown'); } return t; } @Override public void shutdown() { shutdownCalled = true; clearResources(); } private void clearResources() { for(T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { boolean added = objects.add(t); if(added) { permits.release(); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for(int i = 0; i < size; i++) { objects.add(objectFactory.createNew()); } }}

考慮到我們現在已經有兩種實現,非常威武了,得讓用戶通過工廠用具體的名稱來創建不同的對象池了。工廠來了:

package com.test.pool;import com.test.pool.Pool.Validator;/*** Factory and utility methods for* {@link Pool} and {@link BlockingPool} classes* defined in this package.* This class supports the following kinds of methods:**<ul>*<li> Method that creates and returns a default non-blocking*        implementation of the {@link Pool} interface.*   </li>**<li> Method that creates and returns a*        default implementation of*        the {@link BlockingPool} interface.*   </li>*</ul>** @author Swaranga*/public final class PoolFactory{ private PoolFactory() { }/*** Creates a and returns a new object pool,* that is an implementation of the {@link BlockingPool},* whose size is limited by* the <tt> size </tt> parameter.** @param size the number of objects in the pool.* @param factory the factory to create new objects.* @param validator the validator to* validate the re-usability of returned objects.** @return a blocking object pool* bounded by <tt> size </tt>*/public static < T > Pool < T >newBoundedBlockingPool(int size,ObjectFactory < T > factory,Validator < T > validator){ return new BoundedBlockingPool < T > ( size, validator, factory);}/** Creates a and returns a new object pool,* that is an implementation of the {@link Pool}* whose size is limited* by the <tt> size </tt> parameter.** @param size the number of objects in the pool.* @param factory the factory to create new objects.* @param validator the validator to validate* the re-usability of returned objects.** @return an object pool bounded by <tt> size </tt>*/public static < T > Pool < T > newBoundedNonBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator){ return new BoundedPool < T >(size, validator, factory);}}

現在我們的客戶端就能用一種可讀性更强的方式來創建對象池了:

package com.test;import java.sql.Connection;import com.test.pool.Pool;import com.test.pool.PoolFactory;public class Main{ public static void main(String[] args) { Pool < Connection > pool = PoolFactory.newBoundedBlockingPool( 10, new JDBCConnectionFactory('', '', '', ''), new JDBCConnectionValidator()); //do whatever you like }}

好吧,終於寫完了,拖了這麼久了。盡情使用和完善它吧,或者再多加幾種實現。

快樂編碼,快樂分享!

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: 一個通用並發對象池的實現

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071310333367.html