Сначала про задумку. Задумка возникла еще во время первого знакомства с disruptor-ом, и укрепилась в ходе дискуссии с Алексеем о его мнимых и реальных достоинствах. Дело в том, что бенчмарки disruptor-а, которые приводят разработчики, сравнивают его с j.u.c.ArrayBlockingQueue. Это достаточно справедливо в том смысле, что ABQ наиболее подходящий готовый компонент из стандартной библиотеки, которым воспользуется обычный разработчик, буде ему понадобится сконструировать конвейерную многопоточную обработку сообщений. Но ABQ это вовсе не самая производительная (ни в плане пропускной способности, ни в плане задержек) реализация конкурентной очереди. Это достаточно простая и надежная реализация, с приемлемой производительностью -- именно поэтому (как я полагаю) она и является частью стандартной библиотеки. И если уж ставить вопрос "disruptor vs queues" -- как его пытаются поставить разработчики, то надо сравнивать disruptor c сопоставимыми по степени оптимизированности реализациями очередей. Которых -- увы -- в стандартной поставке JDK нет. Но это всего лишь означает, что их надо написать :)
Таким образом и возникла идея этого теста. Вместо тестирования "disruptor vs queues" в благоприятных для него условиях попробуем тестировать в неблагоприятных. Возьмем disruptor в простейшей топологии A->B (один поток -- поставщик, один поток -- обработчик-потребитель). Возьмем простейшую реализацию single enqueuer single dequeuer очереди на основе массива (собственно, это будет очень близко к реализации самого disruptor-а -- по-сути, он и есть такая очередь, только на стероидах). Будем постепенно вносить в эту очередь различные оптимизации, или/и будем постепенно "убирать" различные оптимизации из disruptor-а. Поскольку эти две структуры данных весьма близки изначально, то на каком-то этапе они станут почти неразличимы, и производительность должна оказаться равной. То есть на выходе у нас получится последовательность шагов, каждый из которых на сколько-то улучшает очередь/ухудшает disruptor -- так, что они в конце-концов становятся неразличимы. И можно будет смотреть, какие факторы собственно являются в дизайне disruptor-а ключевыми для производительности, а какие -- второстепенными.
Дисклаймер: я провожу тесты без нагрузки. То есть измеряю пропускную способность в сферическом случае в вакууме -- она может коррелировать с производительностью в боевых условиях, а может и не коррелировать. Меня здесь интересуют именно накладные расходы на холостом ходу -- "стоимость" работы внутренней механики. Просто хотя бы потому, что рабочий ход мне не на чем исследовать -- нет конкретного примера.
Single Enqueuer, Single Dequeuer Queue
Итак, откуда начинаем плясать: вот так выглядит single enqueuer single dequeuer array-based queue:public class SESDQueue<T> { private final int length; private volatile long headCursor = 0; private volatile long tailCursor = 0; private final T[] elements; /* Elements range: [headCursor, tailCursor) * * (tailCursor - headCursor) == elements count * * 0 <= (tailCursor - headCursor) <= length => state invariant * * tailCursor - headCursor == length => queue is full * tailCursor - headCursor == 0 => queue is empty * * (headCursor % length ) is the index of first item in queue * (tailCursor % length ) is the index of _cell_ for _next last item_ */ public SESDQueue( final int length ) { checkArgument( length > 0, "length(%s) must be >0", length ); this.length = length; elements = ( T[] ) new Object[length]; } private long nextTail() { final long tail = tailCursor; waitWhileNotFull( tail ); assert ( tail == tailCursor ) : "tail=" + tail + " <> tailCursor=" + tailCursor + " -- seems like multi-threaded enqueue() detected!"; return tail + 1; } private long nextHead() { final long head = headCursor; waitWhileNotEmpty( head ); assert ( head == headCursor ) : "head=" + head + " <> headCursor=" + headCursor + " -- seems like multi-threaded dequeue() detected!"; return head + 1; } private void publishTail( final long newTail ) { assert ( newTail >= headCursor ) : "newTail(" + newTail + ") < headCursor(" + headCursor + ")"; tailCursor = newTail; } private void publishHead( final long newHead ) { assert ( newHead <= tailCursor ) : "newHead(" + newHead + ") > tailCursor(" + tailCursor + ")"; headCursor = newHead; } private void waitWhileNotFull( final long tail ) { //spin-wait: "while not full" while ( tail == headCursor + length ) { } } private void waitWhileNotEmpty( final long head ) { //spin-wait: "while not empty" while ( head == tailCursor ) { } } public void enqueue( final T item ) { final long newTail = nextTail(); final int index = ( int ) ( ( newTail - 1 ) % length ); elements[index] = item; publishTail( newTail ); } public T dequeue() { final long newHead = nextHead(); final int index = ( int ) ( ( newHead - 1 ) % length ); final T item = elements[index]; elements[index] = null;//for GC publishHead( newHead ); return item; } public int size() { return ( int ) ( tailCursor - headCursor ); } }В ней нет ни одного CAS, только volatile load/stores - это ее плюс. Писать/читать ее может только по одному потоку -- это ее минус, в данном случае несущественный, так как disruptor тоже в своем наилучшем варианте single publishing, и принципиально single subsriber.
Схема бенчей
Один поток пишет в "очередь" 300 миллионов записей (для начала -- LongValueEntry), другой поток их из "очереди" вытаскивает. Время выполнения определяется по моменту вытаскивания последней записи из "очереди".
Цикл выполняется 5 раз подряд в одном запуске -- 1-2 раз обычно выбрасывается, чтобы не учитывать разогрев JVM. Запусков обычно 2-3 -- сколько достаточно для более-менее уверенной статистики.
В бенчмарках присутствует так же ArrayBlockingQueue -- в качестве реперной точки.
Код здесь.
Поехали...
Система: MBP, i7 2GHz, MacOS X 10.6.8,
JDK 1.6.0_26-b03-377, "-da -server -Xmx512m -Xms512"
Емкость очереди -- 1024 элемента.
Базовые результаты (ops/sec == количество сообщений, прошедших "очередь" за секунду).
ABQ | SESD | Disruptor |
---|---|---|
(2.65+/-0.05)e6 | (9.0+/-0.3)e6 | (65+/-3)e6 |
Нифиговое такое различие, а? Меня, правда, немного напрягает, что разница в скорости между ABQ и Disruptor составляет 25 раз, вместо 5-6 в их собственных тестах...
Ок, начинаем оптимизацию. Шаг №1: первое, что приходит в голову -- аллокация. Disruptor использует однажды преаллоцированные объекты, обе наши очереди -- создают 300 миллионов временных объектов. Проверим какое влияние это оказывает -- сделаем пул объектов. Задача упрощается тем, что пул имеет заранее известную емкость (==емкости очереди), плюс он не должен быть потоковобезопасным, потому что мы знаем, что у нас только один поток-производитель:
class EventPool<T> implements EventFactory<T> { private final EventFactory<T> factory; private final Object[] items; private int counter = 0; private EventPool( final int size, final EventFactory<T> factory ) { this.factory = factory; this.items = new Object[size]; for ( int i = 0; i < items.length; i++ ) { items[i] = factory.newInstance(); } } @Override @SuppressWarnings( "unchecked" ) public T newInstance() { try { return ( T ) items[counter]; } finally { counter = ( counter + 1 ) % items.length; } } }
ABQ | SESD | Disruptor |
---|---|---|
(2.70+/-0.04)e6 | (10.7+/-0.4)e6 | (65+/-3)e6 |
Различие не то, чтобы особо радикальное, но процентов 15 наша SESD очередь выиграла.
Шаг №2 -- добавляем padding, чтобы избежать false sharing:
private volatile long headCursor = 0; public volatile long p11, p12, p13, p14, p15, p16, p17, p18 = 7; private volatile long tailCursor = 0; public volatile long p21, p22, p23, p24, p25, p26, p27, p28 = 8; public long sumPaddingToPreventOptimisation() { return p11 + p12 + p13 + p14 + p15 + p16 + p17 + p18 + p21 + p22 + p23 + p24 + p25 + p26 + p27 + p28; }результат SESDQueue: (10.8+/-0.2)e6 -- какие-то крохи, даже говорить стыдно. Даже как-то странно...
Шаг №3 -- оптимизируем spin-loop-ы. Опять же, подглядывая в код disruptor-а можно заметить такой прием:
private void waitWhileNotEmpty( final long head ) { //spin-wait: "while not empty" while ( head == tailCursor ) { } }Зачем нам здесь каждый раз читать tailCursor (volatile load), когда единожды обнаружив, что он на сколько-то больше head мы можем просто запомнить его текущее значение в локальной для потока переменной, и пересчитывать ее значение только когда head будет снова ему равно:
//тут опять же будет padding... private long lastTailObserved = 0; //и тут --- тоже... private void waitWhileNotEmpty( final long head ) { while ( head == lastTailObserved ) { lastTailObserved = tailCursor; } }
Результат: (12.9+/-0.1)e6 (== +20%).
Шаг №3 -- используем lazySet. Опять же подсмотренно в классе Sequence из disruptor -- похоже, что операции обновления курсоров здесь не требуют всей строгости семантики volatile store -- достаточно LoadStore+StoreStore барьеров перед ними -- то есть гарантии, что все, что произошло до обновления курсора в program order будет видимо остальным потокам до того, как станет видимым это обновление. Для этого не нужно volatile store, достаточно lazySet:
Результат: (13.9+/-0.1)e6 (== +10%).
В общем-то, прямолинейные оптимизации закончились -- а мы все еще отстаем от disruptor в 5 раз. Продолжение -- в следующих сериях :)
Клёво. Я уже месяца два как хочу написать такой же тест. Вместо этого теперь покручу твой ;)
ОтветитьУдалитьКак-то я внезапно ощутил себя голым :)
ОтветитьУдалитьТам вторая часть грядет -- через пару дней, я надеюсь. Забегая вперед -- техническими оптимизациями можно получить где-то вдвое-втрое хуже Д. Похоже, что вся оставшаяся разница в производительности -- это GC write barrier (который card table marking). Поскольку у меня на маке JDK 1.6 еще -- +UseCondCardMark у меня нету, пока проверить прямо не могу.