append(T item) из нескольких потоков сыплются некие объекты, и, раз в какой-то интервал времени, задаваемый при создании, накопившиеся объекты сбрасываются списком в заранее заданный callback. Причем, понятно, очередь должна иметь как можно большую пропускную способность, т.е. засинхронизировать метод append -- это не выход. Первая моя версия использовала ReadWriteLock -- неэксклюзивным ReadLock-ом защищался append, эксклюзивным WriteLock-ом -- flush. Но потом я взмедитнул, и подумал, что блокировки там вообще лишние. И получилась у меня lock-free версия вот такой:
public class LockFreeBufferedQueue<ItemType> {
private final ICallback<Collection<ItemType>> callback;
private final long delay;
private final TimeUnit timeUnit;
private final ScheduledExecutorService executor;
private final CallableVoid> flushBufferTask = new Callable<Void>() {
public Void call() {
assert ( pending.get() > 0 ) : "Call without being scheduled";
flushBuffer();
return null;
}
};
private final AtomicInteger pending = new AtomicInteger( 0 );
/**
* volatile чтобы обойтись без блокировки в методе isClosed()
*/
private volatile boolean closed = false;
private volatile ScheduledFuture<Void> scheduledTask = null;
/**
* lock-free Queue implementation
*/
private final ConcurrentLinkedQueue<ItemType> queue = new ConcurrentLinkedQueue<ItemType>();
public LockFreeBufferedQueue( final ICallback<Collection<ItemType>> callback,
final long delay,
final TimeUnit timeUnit,
final ScheduledExecutorService executor ) {
this.callback = callback;
this.delay = delay;
this.timeUnit = timeUnit;
this.executor = executor;
}
/**
* Не специфицированно, что должен делать метод close(), поэтому мы делаем так:
*
* Уже добавленные задачи будут выполнены немедленно, в текущем потоке, добавление
* новых приведет к IllegalStateException в методе append().
*/
public void close() {
if ( !closed ) {
closed = true;
if ( scheduledTask != null ) {
if ( scheduledTask.cancel( false ) ) {
flushBuffer();
}
scheduledTask = null;
}
//todo but if executor is external?
executor.shutdown();
}
}
/**
* @throws IllegalStateException если очередь уже закрыта вызовом close()
* @throws NullPointerException если item == null
*/
public void append( final ItemType item ) {
if ( closed ) {
throw new IllegalStateException( "Queue was closed" );
}
queue.add( item );
if ( pending.getAndIncrement() == 0 ) {
schedule();
}
}
private void flushBuffer() {
final int size = pending.get();
assert ( !queue.isEmpty() ) : "Call with empty queue";
assert ( size > 0 ) : "Call with 0 pending";
final Collection<ItemType> buffer = new ArrayList<ItemType>( size );
for ( int i = 0; i < size; i++ ) {
buffer.add( queue.poll() );
}
if ( pending.getAndAdd( -size ) > size ) {
schedule();
}
//todo что делать с исключениями?
callback.call( buffer );
}
private void schedule() {
scheduledTask = executor.schedule( flushBufferTask, delay, timeUnit );
}
public boolean isClosed() {
return closed;
}
}
//=============================
public interface ICallback<T> {
public void call( final T arg );
}
Это первый мой опыт в самостоятельном написании lock-free алгоритмов на CAS-примитивах, поэтому написал я этот код часа за два, а медитировал над ним дня 3. Но косяков так и не нашел -- все должно работать как задумано. Увлекательное это занятие -- обдумывать корректность таких алгоритмов, мозг работает явно на повышенных оборотах
что будет если метод close() вызвать в нескольких потоках одновременно?
ОтветитьУдалитьЯ вижу что scheduledTask.cancel( false ) может быть вызван несколько раз.
Мы можем получить NPE здесь : scheduledTask.cancel, т.к. допустим первый поток уже заканселил таски и обнулил ссылку: scheduledTask = null; а после второй поток попытался вызвать scheduledTask.cancel( false )