28 сентября 2011 г.

AtomicXXX.lazySet() strikes back

Наткнулся тут на любопытную вещь -- я всегда был уверен, что в JMM все поведение всегда можно описать с помощью несложного набора правил построения ребер happens-before. Ну, то есть, я не говорю, что применение этих правил всегда просто -- законы Ньютона тоже не очень сложные, а какой мрак и ужас из них получается, скажем, в небесной механике и движении твердого тела -- но сами правила довольно просты, и применимы к любому java-коду. Но похоже это не совсем так. Этот самый AtomicXXX.lazySet() мутит воду.

Простое объяснение -- неволатильная запись в волатильную переменную -- меня только запутывает. До этого момента в джаве, вообще-то, не было волатильной/не волатильной записи в отрыве от типа поля (ну, если не трогать сатанинский Unsafe). То есть если поле волатильное -- то все операции с ним волатильные, поле не волатильное -- все операции не волатильные.

А теперь получается такая штука, что я могу записать значение в переменную не волатильно, а потом считать его волатильно. И что за отношение порядка будет у меня между этими операциями, если они в разных потоках?

Это не праздный интерес. В коде Disruptor-а встречается такая идиома:

AtomicLong sequence = new AtomicLong();
...
//поток 1: издатель
...write some values into shared objects...
sequence.lazySet(seq);

//поток 2: обработчик
while( sequence.get() != seq ){
   //spin wait
}
...read shared objects fields

По смыслу кода понятно, что автор его уверен -- последовательность sequence.lazySet()/get() обеспечивает завершение всех операций записи, идущих до нее в program order, и видимость результатов этих операций из второго потока. Другими словами -- это практически happens-before edge. Но вот с чего он так в этом уверен -- для меня совершенно не очевидно.

Как минимум, документация lazySet() не дает никаких гарантий относительно того когда запись будет реально выполнена. Eventually -- когда-нибудь. То есть возможно, что и вообще никогда. И непонятно, в какой момент запись -- если она завершится -- будет увидена вторым потоком.

А с другой стороны -- я сейчас гоняю бенчмарки, разбирая по частям disruptor и вытаскивая какие компоненты ключевые для его производительности. И вот оказывается, что если вместо lazySet использовать обычный set() -- скорость падает в 2 раза примерно (это без нагрузки -- на холостом ходу). То есть смысл определенно есть -- но вот надежен ли этот способ оптимизации -- мне как-то неочевидно.

Задал вопрос на stackoverflow -- посмотрим, может там понимают

16 сентября 2011 г.

LMAX Disruptor #2: Реализация

Общую идею, которая стоит за дизайном disruptor я описывал в предыдущей статье, теперь хочу разобрать непосредственно реализацию. Итак, из чего построен disruptor?


Компоненты


Event -- объект-сообщение, который пересылается от производителя ко всем обработчикам через disruptor. Обычно событие создается-пересылается-обрабатывается-выкидывается, а здесь же все события создаются заранее, при инициализации RingBuffer (для этого есть интерфейс фабрики событий EventFactory), и живут, как минимум, столько же, сколько и сам буфер, а все, кто работает с RingBuffer только обновляют поля уже существующих событий (и поэтому с точки зрения реализации было бы точнее называть их Entry, и так оно и было в старых версиях -- но теперь авторы стараются быть ближе к пользователям). Ранее события должны были реализовывать AbstractEvent, но последней версии (2.5) было решено от этого отказаться, и теперь в буфер можно помещать совершенно произвольные объекты -- фреймворк больше ничего конкретного от них не хочет.

Sequence По смыслу это что-то вроде volatile long с некоторыми дополнительными оптимизациями -- например, защитой от false sharing и ослабленной семантикой set(). Объекты этого класса используются для нумерации последовательности событий.

Вообще сама концепция последовательности (Sequence) довольно фундаментальна для disruptor. По-сути, смысл ее в том, что все события в рамках конкретного экземпляра disruptor имеют глобальную логическую нумерацию. Логическую в том смысле, что физически-то экземпляры события переиспользуются -- как я уже писал, их сразу создается ровно столько, сколько нужно чтобы заполнить кольцевой буфер. Т.о. один и тот же физический экземпляр события в разные моменты времени может иметь разный логический номер. Нумерация идет с момента старта конкретного экземпляра disruptor-a, и покуда не закончится long (а он закончится весьма нескоро: даже при обработке 50 млн событий в секунду его хватит на сотни лет -- для практических целей можно считать его бесконечным).

Зная размер буфера этот порядковый номер элементарно преобразовать в индекс в массиве. С другой стороны, покуда речь не заходит об отображении номера на конкретный экземпляр события, можно оперировать только самой последовательностью, и это довольно удобно. Любые два события всегда можно упорядочить -- т.е. поток событий не только сериализуем, но эта сериализация еще и задана в явном виде. Логически мы получаем (почти)бесконечную, однозначно упорядоченную последовательность событий. Внутренняя же логика disruptor-а следит за тем, чтобы физически в каждый момент времени было задействовано не более bufferSize событий.

RingBuffer -- сам кольцевой буфер.

Во-первых, это сам массив записей (сообщений) с методами доступа. Размер массива должен быть степенью двойки, потому что авторы очень хотят иметь возможность вычислять остаток от деления через битовую маску. (Лично мне это стремление не очень понятно -- я где-то со времен P2 был уверен, что экономить на таких мелочах уже бессмысленно).

Во-вторых, здесь сосредоточена та самая внутренняя логика, которая позволяет иметь логически бесконечную последовательность событий, аккуратно отображенную на ограниченный буфер -- логика управления последовательностью. Здесь три части:
  1. Текущий курсор. Это логический номер последнего опубликованного сообщения. Сообщения с номерами (cursor-bufferSize, cursor] доступны для дальнейшей обработки (это не совсем точно, есть еще выделенные но не опубликованные сообщения, но про это далее)
  2. Список курсоров всех обработчиков событий. С его помощью мы следим, чтобы не переиспользовать еще кем-то не обработанную ячейку -- а именно, мы не можем опережать последний (минимальный) из этих курсоров более, чем на bufferSize.
  3. Стратегия ожидания -- WaitStrategy -- каким способом мы будем ожидать, если кончились доступные нам события (либо производитель не успел еще опубликовать новые, либо наоборот, обработчики еще не успели обработать все опубликованное). Доступны BusySpin/Yelding/Sleeping/Blocking реализации.
  4. Стратегия выделения номеров -- ClaimStrategy. Это, пожалуй, самая сложная часть, разберу ее далее.

SequenceBarrier -- это объект, инкапсулирующий в себе набор курсоров (Sequence), которые нам не нужно опрежать. Т.е. если наш обработчик зависим от обработчиков №1 и №2, то у курсора нашего обработчика (последовательности сообщений, им обрабатываемой) будет барьер сверху из курсоров обработчиков №1 и №2. И прежде , чем перейти к обработке следующего события (сдвинуть наш курсор на следующее событие) мы должны дождаться, пока курсоры, от которых мы зависим, уйдут вверх. Вот именно эта операция не лезть поперек батьки "убедиться, что наши предшественники ушли вперед" и является основной функциональностью SequenceBarrier-а. Поскольку ему иногда придется подождать, пока предшественники в самом деле уйдут, внутри него так же лежит стратегия ожидания WaitStrategy. Барьер создается RingBuffer-ом, так что его WaitStrategy та же самая, что использует и сам RingBuffer (вообще, WaitStrategy задается однажды при создании RingBuffer-а, и она используется в большинстве случаях, когда кому-то кого-то приходится ждать).


EventHandler -- это, собственно, интерфейс для обработчика событий, и ничего более.

EventProcessor -- это хелпер, который отвечает за пересылку событий из RingBuffer в EventHandler. Он реализует Runnable, и его нужно засовывать либо в Executor, либо явно в Thread.


Как это работает?



Начнем с производителя

Публикация сообщения в буфер disruptor-а происходит в три этапа: сначала мы резервируем (claim) под себя слот (а еще точнее -- номер события), в который собираемся что-то опубликовать, потом записываем в слот данные, и подтверждаем публикацию (publish). Авторы предпочитают говорить о двух этапах, намекая на то, что запись значений в слот -- внутреннее дело производителя. Пример кода с вики:
// Publishers claim events in sequence
final long sequence = ringBuffer.next();
final ValueEvent event = ringBuffer.get(sequence);

event.setValue(1234); // this could be more complex with multiple fields

// make the event available to EventProcessors
ringBuffer.publish(sequence);   

Зачем нужна такая хитрость? Ну, это обратная сторона решения использовать заранее преаллоцированные объекты событий. Т.е. если рассуждать отвлеченно, для публикации события нужно а) выделить место в памяти куда будем класть содержание б) положить содержание в) сгенерировать номер события г) сделать событие доступным для обработчиков. В случае использования, например, очереди (ABQ), выделение памяти и наполнение обычно производятся где-то в стороне (в клиентском коде), а генерация номера события и публикация объединены -- событие доступно с момента помещения его в очередь, и в этот же момент определяется его порядок относительно других событий. Поэтому в случае с очередью эти 4 этапа как бы не заметны -- первые два вообще не имеют отношения к очереди, а последние 2 объединены.

В случае disruptor ситуация иная: выделение места под данные производится через сам RingBuffer (который, в этом сценарии, выступает в роли пула/кэша/аллокатора объектов сообщений), при этом, в силу дизайна самого disruptor-а номер события однозначно определяет объект события (как buffer[sequence % bufferSize]), мы не можем выделить событие не задав его номер -- поэтому выделение и генерация номера в disruptor-е объединены в фазу 1. С другой стороны, чтобы сделать событие доступным (опубликовать его, фаза "г") нам нужно во-первых, подвинуть курсор RingBuffer-а, во-вторых -- нужен мембар, гарантирующий что все записи в поля сообщения были реально выполнены. Оба эти элемента получаются автоматом за счет volatile write при обновлении курсора.

Более подробно: функциональность выделения объектов событий, и их последующей публикации -- это как раз и есть ответственность ClaimStrategy. Реально, конечно, стратегия с объектами событий не работает -- она работает только с их номерами. Стратегия заявок держит внутри себя свой собственный курсор, указывающий какие номера событий уже выделены. При заявке на очередной номер стратегия делает внутри себя incrementAndGet(), проверяет, что объект с данным номером свободен -- т.е. все обработчики с предыдущего круга его уже обработали (если это не так, мы будем ждать -- ClaimStrategy не использует общую WaitStrategy, здесь, например используется spin->yield->sleep(1)) -- и выдает получившийся номер клиенту.

Коммит (публикация) несколько сложнее -- здесь ответственность за разные фазы поделена между ClaimStrategy и самим RingBuffer. А именно: ClaimStrategy отвечает за порядок публикации сообщений -- они должны публиковаться ровно в том порядке, в котором они пронумерованы. Т.е. если сообщение №3 уже готово к публикации, но сообщение №2 еще нет -- мы будем ждать (by spin-and-yield), пока сообщение №2 таки не опубликуется, и только потом сможем перейти к публикации нашего №3. После того, как ClaimStrategy убедилась, что порядок в порядке -- RingBuffer просто обновляет свой курсор (здесь даже CAS не нужен, достаточно volatile write -- потому что мы можем быть уверены в текущем значении курсора by design). Это обновление, по совместительству, обеспечивает мембар, необходимый для публикации изменений в самом объекте сообщения (чуть более строго, по JMM: видимость изменений в полях объекта обеспечивается ребром HB между VW(cursor) при публикации, и VR(cursor) который делает каждый обработчик в ходе своего spin-wait-а). Это разделение ответственности за коммит на две части мне не очень понятно -- по-моему, было бы логичнее чтобы всю ответственность взяла на себя ClaimStrategy

Разумеется, вся эта машинерия сильно упрощается, если издатель только один -- тогда все запросы идут из одного потока, и ClaimStrategy может быть сильно проще. Разработчики это предусмотрели, и существует SingleThreadedStrategy, которая этим преимуществом пользуется

Чисто технически, логика работы с последовательностями (выделение, публикация) вынесена в базовый класс Sequencer, а RingBuffer наследует Sequencer, и добавляет функциональность хранения самих объектов сообщений, и отображение sequence -> event. Сделано это для того, чтобы можно было легко реализовать свою собственную логику хранения сообщений. Например, если сообщения у нас это просто некий ID команды -- мы можем использовать для его хранения long[] без необходимости boxing/unboxing в/из Long

Обработчики

Ок, сообщение мы опубликовали. Что насчет обработчиков?

За пересылку сообщений из RingBuffer к соответствующим EventHandler-ам отвечает реализация интерфейса EventProcessor. Основная рабочая реализация BatchEventProcessor.
BatchEventProcessor держит внутри себя курсор (Sequence) на последнее обработанное сообщение и SequenceBarrier, задающий список курсоров, от которых зависит наш курсор (==список производителей/обработчиков, которых мы должны пропустить вперед нас, таким образом задаются взаимозависимостями между разными обработчиками).

Основной цикл работы BatchEventProcessor-а состоит в том, что он ждет на SequenceBarrier доступности события, следующего за последним обработанным (как я уже писал, SequenceBarrier как раз и позволяет дожидаться, пока событие с таким-то номером будет обработано всеми впереди идущими == SequenceBarrier.waitFor(cursor+1)). Дождавшись этого, он проверяет, какие события нам на самом деле доступны (гранулярность ожидания может оказаться слишком большой, и реально окажется доступен не только cursor+1, но и, скажем, вплоть до cursor+142), и в цикле все их пересылает в EventHandler. После чего -- один раз -- обновляет курсор, сдвигая его на количество обработанных сообщений (это и есть местный batching). Ну и плюс здесь еще всякая вспомогательная машинерия, типа обработки исключений и остановки обработчика.

Эпилог

Это, собственно все. Несмотря на кажущуюся сложность, идейно все довольно-таки просто. По всему коду повторяется один и тот же шаблон: есть курсор на последнее обработанное (в том или ином смысле) сообщение, есть список курсоров, которые ни в коем случае нельзя опережать. Мы сидим в spin-wait-е на списке "необгоняемых курсоров" (возможно, делая back off через yield/sleep(1)/park), и ждем пока минимальный из них не обгонит нас. Как только он это делает -- мы проверяем насколько именно он нас обогнал, и все эти сообщения -- в нашем распоряжении. Благодаря тому, что курсор везде volatile, пара операций "обновление значения курсора -- чтение нового значения курсора в ходе spin-wait" создает ребро HB, а значит все изменения, сделанные потоком, обновившим курсор до этого обновления будут гарантированно видны всем, прочитавшим новое значение курсора после этой самой операции чтения.

Единственное место, где требуется здесь CAS -- это выделение сообщений из пула для случая многих потоков-поставщиков -- но здесь и нельзя ничего сделать, потому что это в чистом виде consensus problem, которая (для потенциально неограниченного числа участников) обычными volatile write/read не решается.

Все остальное -- это оптимизационные тонкости, где какую стратегию ожидания с каким backoff использовать, где какой padding создать.

Интересно, что в disruptor производитель ничем принципиально от обработчика не отличается. В самом деле, обработчики же вполне свободны модифицировать сообщения, так что можно сделать "производителя" в виде заглушки
while(!Thread.currentThread().interrupted()){
  // Publisher claim events in sequence
  final long sequence = ringBuffer.next();
  // make the event available to EventProcessors
  ringBuffer.publish(sequence);   
}
Такой производитель (он будет однопоточным, кстати) большую часть времени будет проводить в ожидании, чтобы не "съесть свой собственный хвост" -- зато все сообщения будут доступны для обработчиков, следующих за ним. Почему так не сделано? Потому что это не позволяет эффективно использовать batching для производителей:
while(!Thread.currentThread().interrupted()){
  final byte[] message = network.receiveEvents(...);
  final int eventsCount = findEventsCountIn(message);
  // Publisher claim events in batch 
  final SequenceBatch sequenceBatch = new SequenceBatch(eventsCount);
  ringBuffer.next(sequenceBatch);
  //parse and copy event content from message to
  for( int i=sequenceBatch.getStart();i<=sequenceBatch.getEnd();i++){
    final Event event = ringBuffer.get(i);
    //copy i-th event content from message to event object
    ...
  }

  // make the events available to EventProcessors
  ringBuffer.publish(sequenceBatch);   
}
То есть размер "пачки" для публикации зависит от текущей ситуации на рынке в сети -- от текущего уровня нагрузки. Если мы реализуем производителя как обработчика -- мы вынуждены публиковать сообщения по одному -- каждый раз неся накладные расходы на volatile read/write (реально, скорее, на cache coherence traffic вызванный выполнением соответствующих барьеров). Если же мы подбираем размер пачки под текущую нагрузку -- мы амортизируем накладные расходы на передачу данных от ядра к ядру на количество этих сообщений, тем самым мы лучше адаптируемся к изменениям в трафике, сохраняя задержку обработки сообщений на минимальном уровне.

14 сентября 2011 г.

False sharing

Размышляю на досуге про false sharing в его простейшем смысле -- про данные, принадлежащие одной строке кэша. Обычно предлагается с этим бороться введением полей (padding) -- добить объект фиктивными полями типа, скажем, long до обычного размера строки в 64 байта. Проблем с этим методом -- не оберешься. Даже если забыть на минутку про то, насколько такая вещь как padding (тем более -- "в рукопашную") не вписывается в концепцию java как высокоуровневого языка, остается еще масса вопросов:

Переносимость: cache-line имеет разный размер на разных процессорах -- от 32 до 128 байт. Можно, конечно, закладываться на максимальный размер, но тратить 128 байт на каждую конкурентную переменную может оказаться многовато. Дело не в том, что это много само по себе -- память нынче дешевая. Дело в том, что это же априори горячие в кэше данные. Т.е. это не 128 байт из основной памяти, которой десятки гигабайт, это 128 байт из кэша первого уровня -- который все еще весьма невелик. Но даже если и так -- кто гарантирует, что завтра не появится процессор со строкой 256 байт? Если приблизиться к реалиям -- по факту везде, где я встречал padding он сделан из расчета на строку в 64 байта. То есть даже на существующих процессорах со строкой кэша длиннее этой -- производительность резко просядет. А ведь я еще нигде не видел, чтобы размер полей можно было хоть как-то "настраивать" -- да и сделать это довольно непросто.

Надежность: padding нынче делается просто набором неиспользуемых примитивных полей. Еще когда я впервые увидел этот трюк где-то в недрах JDK у меня возник вопрос -- а почему собственно JIT не выкинет эти поля нафиг? Ведь даже моя IDE очевидно мне подчеркивала их как неиспользуемые, уж JIT-то должен был бы это увидеть. И если он на данный момент этого не делает -- что будет, если в очередном обновлении он таки научится? Собственно, похоже это произошло: Martin Thompson, один из авторов disruptor уже жалуется, что используемый ими ранее простой padding в очередном обновлении Java 7 работать перестал. Чтобы таки надурить компилятор им пришлось наследовать PaddedAtomicLong от AtomicLong, и делать padding через 6 штук public volatile long (Кстати, почему их 6 а не 7 -- для меня загадка. Могу только предположить, что Мартин заложился на то, что еще 8 байт уйдет на object header. Надо ли говорить, что закладываться на такие детали внутренней реализации JVM не очень надежно?). Пока что это работает -- непонятно только, насколько долго будет.

Или вот такой вопрос -- ну хорошо, добили мы свой объект полями до размера строки. Но ведь мы не можем (из java) гарантировать, по какому адресу будет расположен наш объект. Нам ведь никто не гарантирует, что в памяти наш объект будет выравнен по границе строк. Реально он может оказаться половиной на одной строке, половиной -- на второй. Да, мы можем быть уверены, что если у нас есть два точно таких же объекта, то их одинаковые поля (в предположении что они всегда идут в одном и том же порядке) точно на одной строке не окажутся. То есть, если взять этот самый PaddedAtomicLong -- покуда мы реально используем только "первый" из составляющих его 7 long-ов , мы можем быть уверены, что false sharing это не про нас. Но если бы мы хотели использовать уже 2 его long-а ("первый" и "второй") -- отсутствие false sharing уже не гарантируется, поскольку "второй" может оказаться уже на следующей строке, и на этой же строке может уместиться начало уже другого PaddedAtomicLong, и наш "второй" будет конфликтовать с его "первым". Поэтому, если мы в самом деле хотим избежать false sharing здесь, нам нужен padding с обеих сторон -- и до "защищаемых" нами полей, и после них, размером заведомо больше половины строки в каждом случае.

Но даже это еще не решает проблемы окончательно, потому что у нас ведь в приложении не только одни PaddedAtomicLong-и существуют. Если мы возьмем, для примера, тот же disruptor -- есть еще и объекты сообщений, которые тоже модифицируются (потенциально) многими потоками. И вполне может так случиться, что на одной строке кэша окажется кусок от MyEvent, и начало от PaddedAtomicLong, как раз где самое важное для нас поле value и лежит. И все усилия Мартина и команды по мегаоптимизации своего dsiruptor-а пойдут прахом моментально и внезапно.

...А самое поганое здесь, на мой взгляд, это то, что сам-то disruptor пишет уже упомянутый Мартин Томпсон, который и знает много, и может себе позволить потратить недели на тонкую оптимизацию и выяснение причин каких-либо тормозов. А вот MyEvent будет создавать обычный прикладной программист, у которого скорее всего ни квалификации такой нет, ни времени на исследования. И вероятность того, что этот программист будет свои MyEvent защищать от false sharing мала.

...Но даже если он и будет это делать -- он ведь еще должен это делать таким же способом, как Мартин. Про что я говорю -- я говорю про то, что если, скажем, наш гипотетический программист Вася будет защищать свои MyEvent от false sharing с помощью двустороннего padding-а размером чуть больше полстроки (как я описывал выше) -- то все равно он не избежит кары небесной, ибо Мартин свои объекты защищает односторонним padding-ом. Легко понять, что в этом случае на одной строке не могут оказаться используемые поля от двух разных MyEvent, точно так же как и от двух разных PaddedAtomicLong, но могут оказаться используемые поля от одного MyEvent и другого PaddedAtomicLong. По-сути получается, что страдает инкапсуляция.

Получается, что либо надо рассчитывать на самый наихудший случай -- т.е. с обеих сторон защищать каждый свой объект padding-ом размером в строку, причем максимальную строку (128 байт). Либо мириться с тем, что отлаженная и заоптимизированная тобой структура данных может легко "лечь" из-за сложнопредсказуемой интерференции с совершенно посторонним кодом.

Чего бы хотелось, чтобы жизнь была безоблачной и радостной? Очевидно, хочется всю эту магию возложить на виртуальную машину -- которая, как минимум, знает размер строки, и имеет возможность управлять расположением объектов в памяти.

Идеальный вариант, разумеется, чтобы JIT сам определял, какие объекты испытывают наибольшее давление false sharing, и прямо в рантайме перемещал их в памяти так, чтобы это давление уменьшить. Мне этот вариант не очень нравится просто потому, что непонятно, за что тогда будут платить деньги мне вряд ли это будет реализовано скоро, и еще более вряд ли скоро и качественно.

Более реальный вариант: хочется стандартную аннотацию типа @PreventFalseSharing (хотелось бы для нее какое-нибудь название, больше отражающее идею, нежели реализацию), особым образом интерпретируемую JIT-ом -- он для таких объектов использует отдельный аллокатор, как минимум размещающий их строго по границам строк, и не позволяющий никому более использовать место, остающееся до целого количества строк (хотя это уже и не обязательно -- первой части уже будет достаточно, чтобы такие объекты гарантированно не конфликтовали хотя бы между собой). Сюда же можно добавить и включение для ссылочных полей таких объектов оптимизацию write barrier-а в виде test-and-mark вместо только mark. Ну и так далее -- думаю, там еще много чего можно добавить.

Причем на первое время такую штуку можно сделать вообще отдельно от Oracle -- сделать byte-code preprocessor, который при загрузке классов ищет соответствующую аннотацию, и модифицирует байт-код соответствующих классов, добавляя фиктивные поля. Можно даже сделать автоматическое определение размера строки кэша -- через нативный вызов, например. Потом можно прикрутить обработку к javac -- опять же под worst-case, но хотя бы исходный код не будет загрязнен всякими непонятными полями. И уж на последнем этапе можно перевести обработку на уровень JVM/JIT.

Вот такие вот эротические фантазии меня последнее время посещают.

UPD: По сообщениям из заслуживающих доверия источников, в казематах оракла суровыми инженерами сана, там заточенными, обсуждается аннотация @Contended, реализующая что-то вроде варианта 2. Так что не пройдет и 10 лет, как, возможно, мы узрим свет в окошке...

5 сентября 2011 г.

"История одной оптимизации"

На хабре промелькнула статья Как убрать все управляющие символы из строки -- история одной бурной оптимизации.

Каждый раз жалею, что там еще не зарегистрирован -- полдня бы точно потратил на веселье в комментах :)

По-правде говоря, статья произвела фиговое впечатление. Я не хочу вдаваться в подробности оптимизаций, дело не в этом -- мне не понравилось отсутствие какой-то системы. Т.е. если первые 5 шагов оптимизации еще как-то понятны -- это переделка алгоритма с использованием априорной информации о задаче -- то дальше идет почти в чистом виде попытка переплюнуть JIT. И вот уже эта часть -- просто какая-то каша. Раз за разом все более странный код, все более в C-style, но главное -- нет никакой системы, просто набор слепых попыток "а если вот эдак?".

Две вещи мне особенно понравились. Автор сам задает, что 99.9% строк у него в изменении не нуждаются -- они "правильные". Казалось бы отсюда прямо-таки очевидно следует, что оптимизировать нужно не исправление неправильных строк, а поиск правильных -- т.е. маршрут, по которому проходят строки без единого управляющего символа должен быть самым быстрым и самым коротким. Ан нет -- самое лучшее решение, которое предложил автор выглядит так:

public static char[] buf = new char[1024];
...
int length = s.length();
char[] oldChars = (length < 1024) ? buf : new char[length];
s.getChars(0, length, oldChars, 0);
int newLen = 0;
for (int j = 0; j < length; j++) {
    char ch = oldChars[j];
    if (ch >= ' ') {
        oldChars[newLen] = ch;
        newLen++;
    }
}
if (newLen != length)
    s = new String(oldChars, 0, newLen);

Потрясающе -- на этом самом коротком маршруте мы дважды копируем символы исходной строки -- первый раз внутри getChars(), второй руками, внутри цикла -- и все это для того, чтобы в 999 случаях из 1000 обнаружить, что ни первое, ни второе копирование нам не нужно, потому что newLen == length. Мне это кажется немного странным.

Я не поленился прогнать бенчмарки дюжины разных реализаций. Вот лидер:

public class PlainScanLazyReplacer {

    public String replaceControls( final String str ) {
        final int length = str.length();
        for ( int i = 0; i < length; i++ ) {
            final char ch = str.charAt( i );
            if ( Util.isControlChar( ch ) ) {
                return realReplace( str, i );
            }
        }
        return str;
    }

    protected String realReplace( final String s,
                                  final int startWithIndex ) {
        final int length = s.length();
        final StringBuilder sb = new StringBuilder( s );
        int newLen = startWithIndex;
        for(int i=startWithIndex;i < length;i++){
            final char ch = sb.charAt( i );
            if ( Util.isNonControlChar( ch ) ) {
                sb.setCharAt( newLen, ch );
                newLen++;
            }
        }
        sb.setLength( newLen );
        return sb.toString();
    }

Метод realReplace() можно оптимизировать, но смысла в этом нет (я проверял) -- он ведь вызывается 1 раз из 1000. Обратите внимание, что основной цикл -- самый классический прогон по строке с помощью charAt(). Ничего больше -- не нужно.

Версия с reflection:
private static final Field buffField;
    private static final Field offsetField;

    static {

        try {
            buffField = String.class.getDeclaredField( "value" );
            buffField.setAccessible( true );
            offsetField = String.class.getDeclaredField( "offset" );
            offsetField.setAccessible( true );
        } catch ( NoSuchFieldException e ) {
            throw new RuntimeException( e );
        }

    }

    public String replaceControls( final String s ) {
        try {
            final char[] buff = ( char[] ) buffField.get( s );
            final int offset = ( Integer ) offsetField.get( s );
            final int length = s.length();

            for ( int i = 0; i < length; i++ ) {
                final char ch = buff[offset + i];
                if ( Util.isControlChar( ch ) ) {
                    return realReplace( s, i );
                }
            }
            return s;
        } catch ( IllegalAccessException e ) {
            throw new RuntimeException( e );
        }
    }
работает (у меня) за то же время, что и более простая и очевидна версия выше. Лично меня это даже и не удивляет -- потому что charAt() будет почти наверняка встроен JIT-ом, и получится тот же самый проход по массиву -- только уже безо всякой магии reflection.

Мораль сей басни: чтобы быть умнее JIT-a надо в самом деле быть очень, очень умным :)

Вторая вещь еще интереснее. "Важно не то, почему код работает быстро, важно то, почему он не может работать быстрее -- во что мы здесь уперлись?" (Алексей Шипилев). Во что мы упираемся в коде из листинга 1? Мне пришла в голову такая вот идея: когда я генерирую входные данные для бенчмарка, обычно я создаю строки каждую по-отдельности -- со своим внутренним массивом char[] value. А что будет, если я солью все строки в одну большую строку, а потом "нарежу" ее на отдельные строки -- так, что все строки будут разделять один-единственный массив символов (и, более того, их данные будут идти в этом массиве по порядку)? По-идее, это должно давать идеальный шаблон доступа к памяти. Если мы упираемся в какие-то вычисления (в скорость CPU) -- изменение шаблона доступа к памяти не должно особо влиять на производительность. А вот если мы упираемся в память -- должно.

Результат бенчмарка: cache-friendly код выполняется на треть быстрее обычного. Это, конечно, не 100% доказательство, но уже серьезный повод подумать, имеет ли смысл дальше пытаться оптимизировать код разными трюками типа изменения порядка обхода массива, если мы, похоже, упираемся в скорость доступа к памяти?

2 сентября 2011 г.

LMAX Disruptor #1: Идеи

Я уже упоминал в предыдущем посте о Disruptor -- замечательном фреймворке для организации высокопроизводительного многопоточного конвейера. Поскольку я такие вещи обожаю -- хлебом не корми -- я прочитал все статьи авторов, скачал его исходный код, и медитировал над ним в свободное и рабочее время последние пару недель. Могу сказать, что это и в самом деле очень достойный пример для подражания. Интересные архитектурные решения, достаточно прозрачный код, и весьма нетривиальные хитрости, которые позволяют ему не просто работать, а работать очень быстро. У меня есть большое желание попробовать описать, что показалось мне интересным, важным или просто заслуживающим внимания.

Для начала исходные данные. Сам проект -- open-source, хостится на google-code, вот здесь http://code.google.com/p/disruptor/. На вики проекта есть страничка, где аггрегируются ссылки на все статьи его разработчиков, где проясняются многие моменты его дизайна и использования. Если вы хотите сами поразбираться -- добро пожаловать. Между прочим один из основных разработчиков, и популяризаторов -- Триша -- женского пола А-а-а, крокодил в ванной женщины в программировании!!! :)

Важное замечание: disruptor нынче уже дорос до версии 2.0, причем в ходе взросления система именования основных интерфейсов тоже взрослела -- разработчики пытались подобрать названия, максимально проясняющие суть дела (я тоже обожаю этот naming poker :). Я к тому, что в статьях разной давности некоторые сущности могут называться иначе, чем здесь (я стараюсь использовать нотацию версии 2.0) -- как правило, это не принципиально, но может иногда запутывать.

Я буду стараться следовать логике разработчиков, комментируя места, которые мне не очевидны. Поехали:

Disruptor -- фреймворк для создания многопоточных конвейеров обработки событий. Из схожих вещей -- Акторы, или просто граф (ориентированный) Producers-Consumers. Классический способ организовать такой граф -- запустить код поставщиков-потребителей в различных потоках, и использовать bounded concurrent/thread-safe queue для создания ребер (неклассический вариант -- упоминавшийся мной фреймворк JetLang) . Именно этот подход и тестировали разработчики disruptor прежде, чем начать изобретать свой собственный велосипед. Производительность этого варианта их не устроила. Почему производительность системы очередей может быть неудовлетворительной (точнее -- значительно ниже теоретического предела), и что мы с этим можем сделать?

Общие принципы


Мы априори принимаем, что любая очередь используемая в production должна быть ограниченной емкости (bounded). В самом деле, если мы допускаем неограниченный рост -- мы допускаем, что наше приложение может в любой момент упасть с OutOfMemory. Если этот вариант нас не устраивает -- нужно задавать явные ограничения.

Во-первых, если мы хотим получить максимальную пропускную способность и минимальные задержки в высококонкурентном случае -- нам нужно стараться уйти от блокировок к более мелкозернистым операциям, типа CAS.

Во-вторых, если мы хотим минимальные задержки -- нам нужно минимизировать нагрузку на GC. Как следствие -- очереди основанные на связных списках уже в проигрыше, поскольку они генерируют много мусорных узлов.

В-третьих, желательно, чтобы текущие положения головы и хвоста очереди перемещались по памяти предсказуемым образом. Имеется в виду, что современные процессоры пытаются угадывать паттерн обращений к памяти, которые делает программа, и упреждающе загружать соответствующие данные в кэш. В частности, на данный момент процессоры умеют распознавать паттерн доступа с фиксированным шагом (fixed strides) размером <2Kb. То есть, если наша программа обращается к ячейкам памяти с постоянным шагом, скажем, 128 байт, то через несколько таких обращений процессор начнет понимать, чего мы от него хотим, и заранее запрашивать загрузку очередной такой ячейки (точнее -- строки, ее содержащей) в кэш. Если же наша программа бежит по памяти каким-то более хитрым образом -- процессор уже не осилит его угадать, и мы будем регулярно получать кэш-промахи со всеми вытекающими последствиями для производительности. Мы опять же приходим к выводу, что организация очереди на основе связанных списков менее предпочтительна, чем на основе массива. Это несколько спорные утверждения. Далеко не каждое приложение имеет настолько низкую "фоновую" нагрузку на GC, чтобы дополнительная нагрузка в виде узлов очереди стала заметна -- раз. Можно организовать быстрый пул узлов (тем более раз очередь ограниченного размера -- можно этот пул заранее наполнить) -- два. Можно даже постараться организовать выдачу узлов из пула в некотором определенном порядке, согласованном с их порядком в памяти -- три. С другой стороны все эти обходные пути, конечно, усложняют дело -- трудно судить, насколько они эффективны. Очевидно, имеет смысл рассматривать сначала более простые решения (массив с преаллоцированными ячейками), и только если они не дают нужного результата -- рыть глубже.

Далее -- нам надо стараться минимизировать трафик на межядерной шине interconnect. Для этого нужно стараться избегать параллельной записи в одну ячейку памяти. Более того, в силу существования эффекта false sharing нужно избегать параллельной записи даже просто в близкие ячейки памяти. Кроме того, нужно минимизировать число содаваемых мембаров.
Я еще об этом буду писать подробнее, но предварительно могу сказать, что большая часть хитростей, примененных разработчиками посвящена именно этому пункту -- игре вокруг уменьшения трафика на шине. По-видимому, это один из ключевых моментов в том, как disruptor зарабатывает свою рекордную производительность.

Архитектурные идеи


До сих пор мы рассуждали, как можно сделать максимально быструю конкурентную очередь. Но нам нужна не очередь сама по себе -- нам нужна возможность организовывать графы производителей-обработчиков произвольной топологии, и очереди -- это лишь один из способов их реализации. Посмотрим, можем ли мы получить что-то дополнительное из того, что мы знаем, каким примерно образом мы собираемся использовать проектируемую нами структуру данных?

Можно заметить, что если использовать очереди для соединения узлов графа, то в каждом узле мы вынуждены делать столько операций dequeue/enqueue, сколько выходящих/выходящих ребер у этого узла. Каждая операция enq/deq на конкурентной очереди -- это операция изменения разделяемого состояния -- а это барьеры памяти и трафик на межпроцессорной шине. Однако enc/deq редко когда бывают в самом деле нужны -- обычно очередной обработчик вовсе не "забирает" очередное событие себе, а лишь "берет попользоваться" и вскоре "отдает назад" (иногда -- модифицировав его как-то). И вся топология графа обработчиков состоит в том, что одни обработчики должны отработать гарантированно прежде других -- т.е. должен соблюдаться какой-то порядок вызова обработчиков.

Эти рассуждения приводят к мысли, что конвейер обработки произвольной топологии можно реализовать в виде одного единственного общего буфера событий. Производители и обработчики имеют каждый свой курсор, указывающий индекс текущего для них события в этом буфере. Чтобы гарантировать порядок обработки (==топологию конвейера) достаточно знать, какие производители/обработчики идут впереди тебя, и следить за их курсорами, чтобы не забегать вперед паровоза.

Эта идея не кажется чем-то особенным, пока мы не заметим одну тонкость. Предположим пока, что производитель у нас один-единственный. У нас есть набор курсоров, по одному на каждого участника конвейера. Эти курсоры по сути, и составляют разделяемое состояние конвейера. Любой шаг -- что производителя, что обработчика -- изменяет один из курсоров. Но каждый участник меняет только свой курсор -- чужие курсоры он только читает! То есть у нас получается лучший возможный паттерн работы с разделяемым состоянием -- полное состояние конвейера распадается на несколько частей, и у каждой части один писатель и много читателей.

Если производителей больше одного (что не очень типично -- обычно говна на вентилятор подбрасывают в одну лопату) ситуация, конечно, малость ухудшается -- разрулить несколько производителей, желающих одновременно что-то вписать в буфер без write contention тут не получится, но кое-какие оптимизации и здесь можно сделать -- хотя и без них получается неплохо, ибо производителей (даже если их несколько), чаще всего все равно меньше, чем обработчиков -- а значит мы все равно в плюсе.

Есть у этой схемы еще одна чудесная особенность -- пакетная обработка событий. В самом деле, пусть какой-то обработчик по своим внутренним причинам вдруг взял да ушел в себя на пару миллисекунд. Затем он возвращается, и проверяет курсоры всех производителей, и всех обработчиков, которых он должен пропустить вперед -- и обнаруживает, что самый последний из них убежал от него уже аж на 9000 элементов. Делать нечего, надо догонять -- но ведь нет никакой необходимости делать это элемент за элементом, каждый раз обновляя свой курсор и генерируя трафик на шине -- можно обработать все 9000 одним махом -- ведь гарантированно они все ему доступны -- и обновить курсор лишь однажды! Получается очень удобная штука: автоматическое сглаживание неоднородностей в потоке событий. Если кто-то из участников конвейера вдруг резко рванул вперед, а остальные от него отстали -- они естественным образом ускорятся, потому что обработка событий получается тем дешевле, чем больше их можно обработать за раз, ведь стоимость межпроцессорной коммуникации при обновлении курсора будет амортизироваться на большее число обработанных элементов. От сильных неоднородностей это, конечно, не спасет -- в этом случае затраты на коммуникацию будут каплей в море -- но это уж работа прикладного программиста, составить конвейер из максимально мелкозернистых обработчиков.

Собственно говоря, про идею это все. Вокруг этой аккуратной реализации идеи -- со всеми возможными оптимизациями -- и постоен весь Disruptor. Про эту реализацию -- в следующих сериях, а то и так что-то я сегодня больно дохрена написал, пора и честь знать...

P.S. Чтобы не казалось, что все вообще радужно -- не все, что можно реализовать очередями, можно реализовать эффективно с помощью Disruptor. Первый же попавшийся мне на глаза пример -- автоматическая балансировка нагрузки. Эту штуку элементарно сделать с помощью одной-единственной очереди -- в один конец сколько угодно поставщиков кладут некие задачи, с другого конца N потоков-обработчиков (где N = числу ядер в системе) их разбирают. Нагрузка балансируется по отдельным потокам автоматически -- если кому-то почему-то достаются более легкие задачи, этот кто-то просто быстрее возвращается за новыми. С помощью Disruptor такую штуку не реализуешь -- для каждого обработчика явно, при конструировании конвейера, задано, какие события он получит. Полученные им события никак не будут зависеть от относительной загрузки различных элементов конвейера -- отстающих просто будут ждать. Конечно, никто не помешает создать 10 одинаковых обработчиков, каждый из которых будет обрабатывать лишь одно событие из 10, и пропускать остальные -- но это не будет автоматическим балансированием нагрузки, это будет балансированием числа событий.