24 октября 2011 г.

taskset для бенчмарков

Экспериментировал с бенчмарками с привязыванием потоков к ядрам через taskset. И вот не дает мне покоя мысль: ведь если я привяжу процесс, тестирующий какой-то двухпоточный код к двум конкретным ядрам (taskset -c 1,2 mvn exec:java ...) то с одной стороны мои потоки не будут кочевать по другим ядрам. А с другой -- у JVM кроме моих 2 потоков еще дюжина фоновых запущена, не считая GC -- и они ведь все тоже будут за эти два ядра конкурировать. Хотя у меня еще 16 ядер свободны... Это не считая того, что никто не помешает ведь моим двум потокам периодически просто меняться ядрами :)

Похоже, все-таки надо искать способ назначать thread affinity mask изнутри, для конкретных потоков. Где-то в сети я видел код, делающий это через JNA довольно компактно.

UPD:Бросил все, и написал свою собственную утилиту, с преферансом и куртизанками: ThreadAffinity.java. Утилита использует JNA, работает только на linux -- винды у меня сейчас банально нет под рукой, а на MacOS найти нужный системный вызов мне пока не удалось.

Open Source: Retryer

У меня наконец-то дошли руки выложить что-то из своих работ в ОС. Библиотечка, созданная изначально еще во времена работы над DataGuard-ом, потом допиленная уже на нынешней работе -- Retryer.

Задача, которую она решает: часто бывает так, что какой-то код может не выполниться успешно "без причины", и если такое происходит, нужно просто попытаться выполнить его еще несколько раз -- возможно, с различными паузами между попытками. Чаще всего такое бывает с кодом, который дергает какие-то внешние сервисы -- тут может быть куча временных/случайных причин для сбоев: временная перегрузка сети, временная неработоспособность сети, временная перегрузка сервиса, перезапуск сервиса... Какую-то степень устойчивости к этим факторам предоставляют сами протоколы (например, тот же TCP какие-то сетевые сбои обрабатывает незаметно для пользователя) но этой степени часто недостаточно для целей конкретного приложения.

Вот, собственно, задачу "стучаться пока не откроют" библиотека и выполняет. Что с ней можно делать:
  1. Задать код, который нужно выполнять "пока не получится" (==он завершится без исключений)
  2. Задать стратегию задержки (back off) между отдельными попытками. Есть библиотека простых стратегий (без задержки, фиксированная пауза, линейно растущая пауза, экспоненциально растущая пауза...) + билдер/dsl для их комбинации. Например:
    Backoff
                .withExponentialGrowingDelay()
                .startingWithDelay( 1, TimeUnit.SECONDS )
                .maxTryes( 5 )
                .maxDelay( 10, TimeUnit.SECONDS )
                .build()
    
  3. Задать "фатальные ошибки" -- т.е. при некотором классе ошибок дальнейшее продолжение попыток смысла не имеет. Например, если сервис выбросил исключение ConnectionTimeout -- имеет смысл попробовать еще разок позже, авось сеть заработает. Но если сервис выкинул IncorrectProtocolVersionException, то дальше пробовать смысла нет -- мы явно ломимся куда-то не туда

Пример использования (запрос по URL-у):

private static final String URL_TO_QUERY = "http://google.com/?q=Retryer";

public static String simpleQuery( final String urlString ) throws Exception {
    final URL url = new URL( urlString );
    final InputStream is = url.openStream();
    try {
        final InputStreamReader r = new InputStreamReader( is, "ISO-8859-1" );
        try {
            return CharStreams.toString( r );
        } finally {
            r.close();
        }
    } finally {
        is.close();
    }
}

....

public static String queryRetryableComplex( final String urlQuery ) throws Exception {
    return new Retryer().doRetryable(
            new IRetryableTask<String, Exception>() {
                public String execute( final int tryNo ) throws Exception {
                    return simpleQuery( urlQuery );
                }

                public boolean isFatalReason( final int tryNo,
                                                                     final Throwable reason ) {
                    return (reason instanceof MalformedURLException);
                }
            },
            Backoff
                        .withExponentialGrowingDelay()
                        .startingWithDelay( 1, TimeUnit.SECONDS )
                        .maxTryes( 5 )
                        .maxDelay( 10, TimeUnit.SECONDS )
                        .build()
    );
}

здесь мы будем пытаться запросить гугл с экспоненциально растущей задержкой, начиная с 1 секунды, но не более 10 секунд, и не более 5 раз подряд. При этом мы не будем повторять запрос если он выбросит MalformedURLException (какой смысл-то?).

С удовольствием послушаю всяческую ругань. Обещаю активно и интересно ругаться в ответ :)

11 октября 2011 г.

LMAX Disruptor #3.1: эксперименты с производительностью

Разбирательства с lazySet и прочими тонкостями моделей памяти несколько отвлекли меня от продолжения вивисекции disruptor-а. А ведь сам пристальный интерес к lazySet у меня возник как раз после того, как бенчмарки показали, что если заменить его на обычный .set() -- пропускная способность падает почти вдвое. Так что пока суд да дело -- я напишу об этих самых бенчмарках.

Сначала про задумку. Задумка возникла еще во время первого знакомства с disruptor-ом, и укрепилась в ходе дискуссии с Алексеем о его мнимых и реальных достоинствах. Дело в том, что бенчмарки disruptor-а, которые приводят разработчики, сравнивают его с j.u.c.ArrayBlockingQueue. Это достаточно справедливо в том смысле, что ABQ наиболее подходящий готовый компонент из стандартной библиотеки, которым воспользуется обычный разработчик, буде ему понадобится сконструировать конвейерную многопоточную обработку сообщений. Но ABQ это вовсе не самая производительная (ни в плане пропускной способности, ни в плане задержек) реализация конкурентной очереди. Это достаточно простая и надежная реализация, с приемлемой производительностью -- именно поэтому (как я полагаю) она и является частью стандартной библиотеки. И если уж ставить вопрос "disruptor vs queues" -- как его пытаются поставить разработчики, то надо сравнивать disruptor c сопоставимыми по степени оптимизированности реализациями очередей. Которых -- увы -- в стандартной поставке JDK нет. Но это всего лишь означает, что их надо написать :)

Таким образом и возникла идея этого теста. Вместо тестирования "disruptor vs queues" в благоприятных для него условиях попробуем тестировать в неблагоприятных. Возьмем disruptor в простейшей топологии A->B (один поток -- поставщик, один поток -- обработчик-потребитель). Возьмем простейшую реализацию single enqueuer single dequeuer очереди на основе массива (собственно, это будет очень близко к реализации самого disruptor-а -- по-сути, он и есть такая очередь, только на стероидах). Будем постепенно вносить в эту очередь различные оптимизации, или/и будем постепенно "убирать" различные оптимизации из disruptor-а. Поскольку эти две структуры данных весьма близки изначально, то на каком-то этапе они станут почти неразличимы, и производительность должна оказаться равной. То есть на выходе у нас получится последовательность шагов, каждый из которых на сколько-то улучшает очередь/ухудшает disruptor -- так, что они в конце-концов становятся неразличимы. И можно будет смотреть, какие факторы собственно являются в дизайне disruptor-а ключевыми для производительности, а какие -- второстепенными.

Дисклаймер: я провожу тесты без нагрузки. То есть измеряю пропускную способность в сферическом случае в вакууме -- она может коррелировать с производительностью в боевых условиях, а может и не коррелировать. Меня здесь интересуют именно накладные расходы на холостом ходу -- "стоимость" работы внутренней механики. Просто хотя бы потому, что рабочий ход мне не на чем исследовать -- нет конкретного примера.

Single Enqueuer, Single Dequeuer Queue

Итак, откуда начинаем плясать: вот так выглядит single enqueuer single dequeuer array-based queue:
public class SESDQueue<T> {
    private final int length;
    private volatile long headCursor = 0;
    private volatile long tailCursor = 0;

    private final T[] elements;
    /* Elements range: [headCursor, tailCursor)
    *
    * (tailCursor - headCursor) == elements count
    *
    * 0 <= (tailCursor - headCursor) <= length  => state invariant
    *
    * tailCursor - headCursor == length         => queue is full
    * tailCursor - headCursor == 0              => queue is empty
    *
    * (headCursor % length ) is the index of first item in queue
    * (tailCursor % length ) is the index of _cell_ for _next last item_
    */

    public SESDQueue( final int length ) {
        checkArgument( length > 0, "length(%s) must be >0", length );
        this.length = length;
        elements = ( T[] ) new Object[length];
    }

    private long nextTail() {
        final long tail = tailCursor;
        waitWhileNotFull( tail );

        assert ( tail == tailCursor ) : "tail=" + tail + " <> tailCursor=" + tailCursor + " -- seems like multi-threaded enqueue() detected!";
        return tail + 1;
    }

    private long nextHead() {
        final long head = headCursor;
        waitWhileNotEmpty( head );
        assert ( head == headCursor ) : "head=" + head + " <> headCursor=" + headCursor + " -- seems like multi-threaded dequeue() detected!";
        return head + 1;
    }


    private void publishTail( final long newTail ) {
        assert ( newTail >= headCursor ) : "newTail(" + newTail + ") < headCursor(" + headCursor + ")";
        tailCursor = newTail;
    }

    private void publishHead( final long newHead ) {
        assert ( newHead <= tailCursor ) : "newHead(" + newHead + ") > tailCursor(" + tailCursor + ")";
        headCursor = newHead;
    }


    private void waitWhileNotFull( final long tail ) {
        //spin-wait: "while not full"
        while ( tail == headCursor + length ) {
        }
    }

    private void waitWhileNotEmpty( final long head ) {
        //spin-wait: "while not empty"
        while ( head == tailCursor ) {
        }
    }

    public void enqueue( final T item ) {
        final long newTail = nextTail();

        final int index = ( int ) ( ( newTail - 1 ) % length );
        elements[index] = item;

        publishTail( newTail );
    }

    public T dequeue() {
        final long newHead = nextHead();

        final int index = ( int ) ( ( newHead - 1 ) % length );
        final T item = elements[index];
        elements[index] = null;//for GC

        publishHead( newHead );
        return item;
    }

    public int size() {
        return ( int ) ( tailCursor - headCursor );
    }
}
В ней нет ни одного CAS, только volatile load/stores - это ее плюс. Писать/читать ее может только по одному потоку -- это ее минус, в данном случае несущественный, так как disruptor тоже в своем наилучшем варианте single publishing, и принципиально single subsriber.

Схема бенчей


Один поток пишет в "очередь" 300 миллионов записей (для начала -- LongValueEntry), другой поток их из "очереди" вытаскивает. Время выполнения определяется по моменту вытаскивания последней записи из "очереди".

Цикл выполняется 5 раз подряд в одном запуске -- 1-2 раз обычно выбрасывается, чтобы не учитывать разогрев JVM. Запусков обычно 2-3 -- сколько достаточно для более-менее уверенной статистики.

В бенчмарках присутствует так же ArrayBlockingQueue -- в качестве реперной точки.

Код здесь.

Поехали...


Система: MBP, i7 2GHz, MacOS X 10.6.8,
JDK 1.6.0_26-b03-377, "-da -server -Xmx512m -Xms512"

Емкость очереди -- 1024 элемента.

Базовые результаты (ops/sec == количество сообщений, прошедших "очередь" за секунду).
ABQ SESD Disruptor
(2.65+/-0.05)e6 (9.0+/-0.3)e6 (65+/-3)e6

Нифиговое такое различие, а? Меня, правда, немного напрягает, что разница в скорости между ABQ и Disruptor составляет 25 раз, вместо 5-6 в их собственных тестах...

Ок, начинаем оптимизацию. Шаг №1: первое, что приходит в голову -- аллокация. Disruptor использует однажды преаллоцированные объекты, обе наши очереди -- создают 300 миллионов временных объектов. Проверим какое влияние это оказывает -- сделаем пул объектов. Задача упрощается тем, что пул имеет заранее известную емкость (==емкости очереди), плюс он не должен быть потоковобезопасным, потому что мы знаем, что у нас только один поток-производитель:
class EventPool<T> implements EventFactory<T> {
        private final EventFactory<T> factory;
        private final Object[] items;

        private int counter = 0;

        private EventPool( final int size,
                           final EventFactory<T> factory ) {
            this.factory = factory;
            this.items = new Object[size];
            for ( int i = 0; i < items.length; i++ ) {
                items[i] = factory.newInstance();
            }
        }

        @Override
        @SuppressWarnings( "unchecked" )
        public T newInstance() {
            try {
                return ( T ) items[counter];
            } finally {
                counter = ( counter + 1 ) % items.length;
            }
        }        
}

ABQ SESD Disruptor
(2.70+/-0.04)e6 (10.7+/-0.4)e6 (65+/-3)e6

Различие не то, чтобы особо радикальное, но процентов 15 наша SESD очередь выиграла.

Шаг №2 -- добавляем padding, чтобы избежать false sharing:
private volatile long headCursor = 0;
    
    public volatile long p11, p12, p13, p14, p15, p16, p17, p18 = 7;
    
    private volatile long tailCursor = 0;
    
    public volatile long p21, p22, p23, p24, p25, p26, p27, p28 = 8;

    public long sumPaddingToPreventOptimisation() {
        return p11 + p12 + p13 + p14 + p15 + p16 + p17 + p18
                + p21 + p22 + p23 + p24 + p25 + p26 + p27 + p28;
    }
результат SESDQueue: (10.8+/-0.2)e6 -- какие-то крохи, даже говорить стыдно. Даже как-то странно...


Шаг №3 -- оптимизируем spin-loop-ы. Опять же, подглядывая в код disruptor-а можно заметить такой прием:
private void waitWhileNotEmpty( final long head ) {
        //spin-wait: "while not empty"
        while ( head == tailCursor ) {
        }
    }
Зачем нам здесь каждый раз читать tailCursor (volatile load), когда единожды обнаружив, что он на сколько-то больше head мы можем просто запомнить его текущее значение в локальной для потока переменной, и пересчитывать ее значение только когда head будет снова ему равно:
//тут опять же будет padding...
    private long lastTailObserved = 0;
    //и тут --- тоже...
    private void waitWhileNotEmpty( final long head ) {
        while ( head == lastTailObserved ) {
            lastTailObserved = tailCursor;
        }
    }

Результат: (12.9+/-0.1)e6 (== +20%).

Шаг №3 -- используем lazySet. Опять же подсмотренно в классе Sequence из disruptor -- похоже, что операции обновления курсоров здесь не требуют всей строгости семантики volatile store -- достаточно LoadStore+StoreStore барьеров перед ними -- то есть гарантии, что все, что произошло до обновления курсора в program order будет видимо остальным потокам до того, как станет видимым это обновление. Для этого не нужно volatile store, достаточно lazySet:

Результат: (13.9+/-0.1)e6 (== +10%).

В общем-то, прямолинейные оптимизации закончились -- а мы все еще отстаем от disruptor в 5 раз. Продолжение -- в следующих сериях :)



7 октября 2011 г.

AtomicXXX.lazySet() -- разъяснения

По совету друга я вознес мольбу Господу о ниспослании мудрости, и Господь послал мне этого. Немного. Зато несколько раз -- с первого раза я не понял. Произошло это посредством листа рассылки concurrency-interest -- ну, нечего придираться, пути господни неисповедимы. Начало треда я в архивах не нашел почему-то, так что могу дать ссылку только на продолжение: вот.

Вкратце:

Что касается спецификации:
lazySet был введен после последней редакции JMM, поэтому в JMM нет описания такого отношения порядка, который он порождает. Т.е. по сути javadoc специфицирующий lazySet можно рассматривать как расширение JMM. Аналогичным свойством обладает weakCompareAndSet -- его ordering constraints тоже не формализуются в текущей редакции JMM. Идет обсуждение того, как включить такие вещи в очередную редакцию JMM, но пока в попутчиках согласья нет, и вряд ли очередная редакция JMM выйдет в ближайшее время. Вообще варианты более "тонких" мембаров, частным случаем которых является lazySet (== Fences.orderWrites + store) планируется внедрять в специальный класс Fences, соответственно все обсуждение подобных вещей обычно сосредоточено вокруг него. И черновики будущей спецификации можно посмотреть в его class-javadoc. Я там пока нифига не понял, но попытки буду продолжать.

Что касается текущего состояния:

Да, то, что написано в lazySet javadoc действительно означает, что все записи, произведенные до lazySet() гарантированно станут видимы до того, как станет видимым запись lazySet(). При этом чтения, идущие до lazySet() в program order вовсе не обязаны реально происходить до него -- они могут быть переупорядочены и отложены, и завершиться уже после завершения lazySet() -- в этом, собственно, отличие от обычного volatile store.

При этом момент выполнения записи действительно не определен -- "когда нибудь". Означает это то, что запись может быть отложена процессором или компилятором хоть навечно. На практике это вряд ли произойдет, если поток, выполнивший lazySet продолжает что-то писать -- и у компиляторов и у процессора ограничен размер буфера отложенных операций, так что когда-нибудь запись все-таки будет выполнена.

На самом деле я здесь не вижу существенной разницы с обычным volatile store -- нигде в JMM не прописано, что volatile store должен завершиться немедленно. JMM задает лишь ограничения на то, что все записи и чтения идущие до vstore в program order действительно будут исполнены и их результаты видимы до того, как будет исполнена и видима vstore.

Что касается текущей реализации:

Хотя формально запись может быть отложена на сколько угодно, фактически все существующие JVM реализуют lazySet() консервативно -- выполняют запись немедленно. То есть запись может быть отложена уже только процессором, задержавшись в его store buffer.

Таким образом, использование lazySet в disruptor, похоже, в целом легально, и ускорение, им создаваемое -- вполне объяснимо.

P.S. В процессе разбирательств выяснилось, что некоторые вещи в JMM я до сих пор понимаю больше интуитивно. Кажется, в ближайшее время у меня будут посты на эту тему