append(T item)
из нескольких потоков сыплются некие объекты, и, раз в какой-то интервал времени, задаваемый при создании, накопившиеся объекты сбрасываются списком в заранее заданный callback. Причем, понятно, очередь должна иметь как можно большую пропускную способность, т.е. засинхронизировать метод append -- это не выход. Первая моя версия использовала ReadWriteLock -- неэксклюзивным ReadLock-ом защищался append, эксклюзивным WriteLock-ом -- flush. Но потом я взмедитнул, и подумал, что блокировки там вообще лишние. И получилась у меня lock-free версия вот такой:
public class LockFreeBufferedQueue<ItemType> { private final ICallback<Collection<ItemType>> callback; private final long delay; private final TimeUnit timeUnit; private final ScheduledExecutorService executor; private final CallableVoid> flushBufferTask = new Callable<Void>() { public Void call() { assert ( pending.get() > 0 ) : "Call without being scheduled"; flushBuffer(); return null; } }; private final AtomicInteger pending = new AtomicInteger( 0 ); /** * volatile чтобы обойтись без блокировки в методе isClosed() */ private volatile boolean closed = false; private volatile ScheduledFuture<Void> scheduledTask = null; /** * lock-free Queue implementation */ private final ConcurrentLinkedQueue<ItemType> queue = new ConcurrentLinkedQueue<ItemType>(); public LockFreeBufferedQueue( final ICallback<Collection<ItemType>> callback, final long delay, final TimeUnit timeUnit, final ScheduledExecutorService executor ) { this.callback = callback; this.delay = delay; this.timeUnit = timeUnit; this.executor = executor; } /** * Не специфицированно, что должен делать метод close(), поэтому мы делаем так: * * Уже добавленные задачи будут выполнены немедленно, в текущем потоке, добавление * новых приведет к IllegalStateException в методе append(). */ public void close() { if ( !closed ) { closed = true; if ( scheduledTask != null ) { if ( scheduledTask.cancel( false ) ) { flushBuffer(); } scheduledTask = null; } //todo but if executor is external? executor.shutdown(); } } /** * @throws IllegalStateException если очередь уже закрыта вызовом close() * @throws NullPointerException если item == null */ public void append( final ItemType item ) { if ( closed ) { throw new IllegalStateException( "Queue was closed" ); } queue.add( item ); if ( pending.getAndIncrement() == 0 ) { schedule(); } } private void flushBuffer() { final int size = pending.get(); assert ( !queue.isEmpty() ) : "Call with empty queue"; assert ( size > 0 ) : "Call with 0 pending"; final Collection<ItemType> buffer = new ArrayList<ItemType>( size ); for ( int i = 0; i < size; i++ ) { buffer.add( queue.poll() ); } if ( pending.getAndAdd( -size ) > size ) { schedule(); } //todo что делать с исключениями? callback.call( buffer ); } private void schedule() { scheduledTask = executor.schedule( flushBufferTask, delay, timeUnit ); } public boolean isClosed() { return closed; } } //============================= public interface ICallback<T> { public void call( final T arg ); }
Это первый мой опыт в самостоятельном написании lock-free алгоритмов на CAS-примитивах, поэтому написал я этот код часа за два, а медитировал над ним дня 3. Но косяков так и не нашел -- все должно работать как задумано. Увлекательное это занятие -- обдумывать корректность таких алгоритмов, мозг работает явно на повышенных оборотах
что будет если метод close() вызвать в нескольких потоках одновременно?
ОтветитьУдалитьЯ вижу что scheduledTask.cancel( false ) может быть вызван несколько раз.
Мы можем получить NPE здесь : scheduledTask.cancel, т.к. допустим первый поток уже заканселил таски и обнулил ссылку: scheduledTask = null; а после второй поток попытался вызвать scheduledTask.cancel( false )