22 августа 2010 г.

Lock-free BufferedQueue

Дали тестовое задание перед собеседованием -- написать буфферизированную очередь. Т.е. на вход метода append(T item) из нескольких потоков сыплются некие объекты, и, раз в какой-то интервал времени, задаваемый при создании, накопившиеся объекты сбрасываются списком в заранее заданный callback. Причем, понятно, очередь должна иметь как можно большую пропускную способность, т.е. засинхронизировать метод append -- это не выход.

Первая моя версия использовала ReadWriteLock -- неэксклюзивным ReadLock-ом защищался append, эксклюзивным WriteLock-ом -- flush. Но потом я взмедитнул, и подумал, что блокировки там вообще лишние. И получилась у меня lock-free версия вот такой:
public class LockFreeBufferedQueue<ItemType> {

private final ICallback<Collection<ItemType>> callback;
private final long delay;
private final TimeUnit timeUnit;

private final ScheduledExecutorService executor;

private final CallableVoid> flushBufferTask = new Callable<Void>() {
public Void call() {
assert ( pending.get() > 0 ) : "Call without being scheduled";
flushBuffer();
return null;
}
};

private final AtomicInteger pending = new AtomicInteger( 0 );

/**
* volatile чтобы обойтись без блокировки в методе isClosed()
*/
private volatile boolean closed = false;

private volatile ScheduledFuture<Void> scheduledTask = null;

/**
* lock-free Queue implementation
*/
private final ConcurrentLinkedQueue<ItemType> queue = new ConcurrentLinkedQueue<ItemType>();

public LockFreeBufferedQueue( final ICallback<Collection<ItemType>> callback,
final long delay,
final TimeUnit timeUnit,
final ScheduledExecutorService executor ) {        
this.callback = callback;
this.delay = delay;
this.timeUnit = timeUnit;
this.executor = executor;
}

/**
* Не специфицированно, что должен делать метод close(), поэтому мы делаем так:
* 
* Уже добавленные задачи будут выполнены немедленно, в текущем потоке, добавление
* новых приведет к IllegalStateException в методе append().
*/
public void close() {
if ( !closed ) {
closed = true;
if ( scheduledTask != null ) {
if ( scheduledTask.cancel( false ) ) {
flushBuffer();
}
scheduledTask = null;
}
//todo but if executor is external?
executor.shutdown();
}
}

/**
* @throws IllegalStateException если очередь уже закрыта вызовом close()
* @throws NullPointerException  если item == null
*/
public void append( final ItemType item ) {
if ( closed ) {
throw new IllegalStateException( "Queue was closed" );
}

queue.add( item );

if ( pending.getAndIncrement() == 0 ) {
schedule();
}
}

private void flushBuffer() {
final int size = pending.get();

assert ( !queue.isEmpty() ) : "Call with empty queue";
assert ( size > 0 ) : "Call with 0 pending";

final Collection<ItemType> buffer = new ArrayList<ItemType>( size );

for ( int i = 0; i < size; i++ ) {
buffer.add( queue.poll() );
}

if ( pending.getAndAdd( -size ) > size ) {
schedule();
}

//todo что делать с исключениями?
callback.call( buffer );
}

private void schedule() {
scheduledTask = executor.schedule( flushBufferTask, delay, timeUnit );
}

public boolean isClosed() {
return closed;
}
}
//=============================
public interface ICallback<T> {
public void call( final T arg );
}


Это первый мой опыт в самостоятельном написании lock-free алгоритмов на CAS-примитивах, поэтому написал я этот код часа за два, а медитировал над ним дня 3. Но косяков так и не нашел -- все должно работать как задумано. Увлекательное это занятие -- обдумывать корректность таких алгоритмов, мозг работает явно на повышенных оборотах

1 комментарий:

  1. что будет если метод close() вызвать в нескольких потоках одновременно?
    Я вижу что scheduledTask.cancel( false ) может быть вызван несколько раз.
    Мы можем получить NPE здесь : scheduledTask.cancel, т.к. допустим первый поток уже заканселил таски и обнулил ссылку: scheduledTask = null; а после второй поток попытался вызвать scheduledTask.cancel( false )

    ОтветитьУдалить