23 августа 2011 г.

LMAX trading

Статья про архитектуру трейдинговой платформы LMAX. Кажется, про нее рассказывали на JavaOne, но я пропустил этот доклад. Читать здесь

Несколько любопытных вещей. Во-первых, они ухитряются обрабатывать 6 миллионов транзакций в секунду на обычных серверных Nehalem-ах, при том, что вся бизнес-логика у них в одном потоке. А, да, все это на java.

Во-вторых, архитектура, с помощью которой они этого добились довольно необычна. Я думаю, тут мне информации еще на несколько постов, по мере того, как я буду разбираться.

Вкратце: во-первых, у них три главных части в приложении -- входная "мультиочередь" (хитрая структура данных, которую они назвали Disruptor, по-сути -- циклическая конкурентная очередь-конвейер, она одна стоит разговора), в которую приходят все заявки, обработчик бизнес-логики (тот самый, однопоточный), который берет заявки из этой очереди, и выходная такая же мультиочередь.

Обработчик бизнес-логики -- это, по-сути, message-loop, однопоточный, написаный максимально оптимизированно. Все данные ("мир") он держит исключительно в оперативке, весь код очень простой и максимально короткий, чтобы JIT его наиболее удачно оптимизировал (JIT-friendly code). Это, в частности, и позволило им получить 6 миллионов транзакций в секунду. Но здесь возникает вопрос надежности (устойчивости к сбоям), и вопрос ввода-вывода -- он, по-любому, медленный, и традиционно в одном потоке не делается.

И тут на сцену выходят как раз мультиочереди-конвейеры. Идея состоит в том, что, прежде чем попасть на вход к обработчику бизнес-логики, сообщение проходит целый конвейер обработчиков. Конвейер этот, по сути, ориентированный граф с определенной топологией. На вход конвейеру сообщение подается чуть ли не в виде массива байт, как оно принято из сети -- причем делать это могут сразу несколько потоков (т.е. они могут слушать сразу несколько сетевых интерфейсов, в частности). Дальше, по мере продвижения по конвейеру, данные могут обрабатываться различными обработчиками.

Большая часть из обработчиков read-only -- и они вполне себе работают параллельно, загружая разные ядра процессора. В частности, какие-то обработчики дублируют данные на вторичные сервера -- реально в продакшене в каждый момент времени работают 3 территориально разнесенных сервера, с полностью идентичным состоянием -- это решает проблему устойчивости к сбоям. Другие обработчики сохраняют данные в персистентное хранилище -- так что состояние системы (которое, напомню, все в оперативке) в любой момент можно восстановить заново накатив весь лог событий (реально раз в сутки делается снапшот, так что накатывать приходится только за последние сутки). Третьи отправляют часть событий для какого-то анализа и последующей генерации отчетов. И так далее.

Часть обработчиков -- read-write, в частности, те, которые собственно парсят и раскодируют сообщение, превращая его из массива байт в джава-объекты, и в конечном счете, в набор команд для процессора бизнес-логики.

Причем команда разработчиков сумела реализовать этот конвейер обработчиков событий очень эффективно, с минимальными накладными расходами, при сохранении высокого уровня конкурентности. Их Disruptor по их бенчмаркам обеспечивает примерно в 4-8 раз большую пропускную способность, чем реализация аналогичного графа с помощью стандартных ArrayBlockingQueue, и на несколько порядков меньшие задержки. Плюс намного меньшую нагрузку на GC.

Что в итоге: в итоге main code path для обработки входящей транзакции включает в себя начальную предобработку (парсинг/проверка/конвертация в команды), которая делается конвейером параллельных обработчиков, работающих на разных ядрах, и передающих друг другу данные с минимальными накладными расходами (через Disruptor), потом отработку команд центральным процессором бизнес-логики -- который максимально короткий, и работает только с оперативкой, и выдачу результата в выходной Disruptor -- что опять же почти даром. Т.е. этот центральный путь -- максимально короткий, и максимально предсказуемый -- на нем нет ничего, что могло бы вызвать непредсказуемые задержки, ни аллокации больших объемов памяти (что приближает full stop-the-world GC), ни блокировок, ни сетевого ввода-вывода в базу данных. Практически, этот центральный путь не вполне real-time, но что-то очень близкое к тому. Все тяжелые вещи -- типа анализа, без которого все равно не обойтись -- можно вынести на отдельные машины, получающие данные с входного/выходного конвейера -- без малейшей задержки центральной части.

Вот такая вот красота. По-моему, просто шедевр инженерного искусства -- всего пара-тройка сравнительно простых и понятных центральных идей, и возможность приклеивать практически все, что угодно -- но "сбоку", не задевая работоспособность и производительность ядра. Ядро можно вылизать, заоптимизировать и затестировать по самое нехочу -- благо, оно сравнительно небольшое.

15 августа 2011 г.

Highly Scalable Java: 3

Интереснейшая еще идея была в презентации Клиффа на JavaOne 2008 -- высококонкурентная lock-free очередь, best-effort FIFO, для использования в архитектуре producer-consumer.

Идея такая: что очередь -- это простой линейный массив (универсальная штука этот массив). Каждый поток-производитель выбирает себе произвольный (случайный) стартовый индекс в этом массиве, и кладет в очередь элементы так:
while( !CAS( items[producerLocalIndex], null, newItem ) ){

  producerLocalIndex = ( producerLocalIndex + 1 ) % items.length;

}

Т.е. просто пытаемся положить элемент в массив, предполагая что очередная ячейка пуста. Если мы не угадываем (или кто-то уже успел перед нами) -- просто продвигаемся к следующей ячейке.

Берем мы элементы из очереди аналогично:
do{
  consumerLocalIndex = ( consumerLocalIndex + 1 ) % items.length;
  item = items[consumerLocalIndex];
} while( !CAS( items[consumerLocalIndex], item, null ) );
return item;

(consumerLocalIndex изначально каждый потребитель тоже выбирает случайно).

Клифф замечает, что тут важно грамотно управлять размером массива -- если массив слишком большой, потребители будут долго сканировать пустые ячейки в поисках заполненных. Если слишком маленький -- производители будут долго искать пустую ячейку для очередной записи.

Мне нравится идея. Во-первых, она простая и красивая. Я несколько месяцев назад думал о чем-то похожем в контексте своих идей про синхронизацию без синхронизации, но у меня не было ничего про изменение размера (так было проще), и я больше акцентировался на минимальных накладных расходах, чем на масштабировании до 1000 процессоров, как Клифф. Тем не менее, приятно, когда у умных людей мысли сходятся :)

А еще мне очень понравилось как именно Клифф пришел к этой идее. Логика рассуждений (по крайней мере, как она изложена в презентации) была такой: нам нужна очередь, в которую могут писать/читать много агентов параллельно. Логичная идея -- взять несколько очередей (в идеале -- сколько писателей/производителей) -- массив очередей, и при записи/чтении перебирать очереди. Теперь возьмем число агентов очень большим -- 1000. Во что превратится наш "массив очередей"? Будет очень много очень маленьких (коротких) очередей. Возьмем совсем крайний случай -- очередь может быть либо пустой, либо содержать один элемент, и таких очередей -- дофига. Получаем как раз простой массив, где в ячейке либо null (пусто), либо что-то лежит.

Эта манера рассуждения мне очень напомнила ТРИЗ. Есть там очень похожий прием обострения технического противоречия, доведения до крайности -- "оператор РВС" (размер-время-стоимость).

Что мне в этой идее не очень нравится -- неочевидно, как здесь обеспечить локальность. Т.е. если поток-производитель и поток-потребитель сидят физически на одном ядре, то хотелось бы, чтобы они с большей вероятностью передавали данные друг другу, и только если такой возможности нет -- использовали бы данные других ядер. Так, например, реализованы очереди задач в Fork/Join у Daug Lea. Как реализовать такую штуку в этой модели мне пока неочевидно. Еще более важным это становится если архитектура памяти не SMP, как у AzulBox, а NUMA, что у действительно многоядерных систем встречается гораздо чаще -- тогда, мне кажется, все будет вообще плохо, потому что массив будет скорее всего выделен в локальной памяти инициализирующего потока, а использоваться будет всеми потоками.

12 августа 2011 г.

Highly Scalable Java: 2

Давно собирался разобраться в устройстве highly scalable java от Cliff Click -- наконец дошли руки.

Итак, Клифф Клик, Главный Инженер (Distinguished Engineer) в компании Azul System. Компания специализируется на готовых решениях для запуска java-приложений. Наисовременейший Azul Box имеет на борту 768 ядер -- специально заточенных под исполнение JVM. Разумеется, JVM у них тоже своя, специальная -- хотя, очевидно, разработана на базе Сановской. Такое количество ядер, конечно, загрузить непросто -- большинство популярных даже высококонкурентных алгоритмов настолько не масштабируются. Клифф предлагает Новый Подход к разработке таких алгоритмов -- на примере своей реализации NonBlockingHashMap (ну и, до кучи, NonBlockingHashSet, и прочих производных).

Подход вкратце описан в презентации на JavaOne 2008, чуть более подробно -- и зубодробительно -- в исходном коде самой библиотеки, которую легко найти в сети. Реализация, которую предлагает Клифф -- lock-free, то есть на каждый шаг кто-нибудь из задействованных потоков прогрессирует.

Идея вкратце: данные храним в простом массиве. Определяем для каждого слота граф состояний, в которых он может находиться в процессе работы -- конечный автомат для каждого слота. Перемещения по ребрам графа состояний выполняются атомарными CAS-ами. CAS либо удался -- тогда мы сменили состояние, либо провалился -- значит, он удался у какого-то другого потока -- отсюда мы получаем общий прогресс и lock freedom. Ключевой момент здесь -- что все, необходимое для определения/изменения текущего состояния слота находится в нем самом -- т.е. все переходы получаются локальными.

Важно: даже традиционно глобальные операции -- такие как resize/rehash/compaction -- здесь организованы так, что распадаются на локальные операции с каждым слотом в отдельности. Соответственно, даже они почти идеально параллелятся между всеми потоками, оперирующими над таблицей.

Собственно, на мой взгляд, это самая интересная часть. В самом деле, реализовать идеально параллелящийся только-на-чтение HashMap совершенно тривиально -- обычный j.u.HashMap вполне сойдет при условии корректной начальной публикации ссылки (safe publishing). Реализовать обновление значений для уже существующих ключей тоже несложно -- один CAS и дело с концом. Удаление в стиле mark removed реализуется так же, как обновление. Все эти операции требуют только локального CAS-а над конкретной ячейкой массива -- ничего сложного.

Но вот изменение размера таблицы -- как следствие роста количества записей, как следствие высокого количества попыток (reprobes count) из-за плохой хэш-функции ключей, для компактификации при большом проценте удаленных записей -- гораздо более сложная операция. Нужно ведь создать новую таблицу, скопировать туда все самые свежие данные старой, и подменить старую таблицу на новую. Главная сложность -- посторонний поток может влезть с обновлением уже после того, как значение какой-то ячейки уже скопировано в новую таблицу, но еще до того, как ссылка на новую таблицу подменит старую -- и тогда его обновление останется в старой таблице, и будет потеряно. Избежать такой ситуации -- огранизовать идеально параллелящееся копирование, не мешая (не блокируя) при этом конкурирующим чтениям/обновлениям/удалениям -- это и есть основной вызов, с которым Клифф работал, и с которым он красиво справился.

Что он делает? При изменении размера создается новый массив, и ссылка на него записывается в специальное поле, доступное всем потокам. Потом данные из старого массива копируются в новый -- причем перед копированием каждого слота его помечают как "в процессе копирования", и только потом реально копируют. После завершения копирования делается мембар, и слот в старой таблице помечается как "скопированный".

Что делают при этом конкурирующие операции?

Операции чтения: если видят обычный (непомеченный) слот в исходном массиве -- его и возвращают, поскольку это все еще актуальное значение слота (и это fast path, реализующийся чаще всего -- очень быстрый). Если видят слот "в процессе копирования" -- значит таблица в процессе ресайза, и нужно во-первых, постараться вписаться -- скопировать значение слота в новый массив: CAS(newArray[index], null, oldArray[index]) (почему это обязательно -- потому что тот поток, что пометил старый слот как "в процессе копирования" мог еще не успеть реально его скопировать -- и тогда это за него сделаем мы). Дальше -- независимо от успеха предыдущей операции -- мы просто читаем актуальное значение слота из нового массива.

Операции обновления/удаления (пометки): опять же, если слот в основном массиве еще не помечен -- можно свободно работать с ним, поскольку его еще не скопировали. Если же пометка есть -- мы, так же, как и при чтении, впрягаемся и сами копируем текущее значение -- и потом меняем его на свое новое.

Получается, что "частично скопированная" таблица вполне себе рабочая -- ее можно читать/писать без проблем, лишь немного медленнее, чем в обычном состоянии. При этом "ответственный" за копирование поток -- тот, чья очередная операция первая наткнулась на необходимость ресайза (строго говоря их может быть несколько -- но маловероятно, чтобы много). Но поскольку все остальные потоки ему помогают, реальная нагрузка будет поделена между всеми читателями и писателями таблицы.

Выглядит это очень круто, хотя в реальном коде все значительно сложнее и запутаннее. Например, Клифф очень аккуратно (экономно) расставляет мембары -- а в джаве нет явных мембаров, поэтому в нужных местах ему приходится читать/писать волатильные переменные, иногда совсем ненужные по смыслу кода -- только ради memory ordering effects. Далее -- он не использует j.u.c.AtomicXXX, вместо этого он активно использует s.m.Unsafe, что не добавляет коду краткости. Зачем? Из-за одной тонкости, о которой я недавно писал -- AtomicXXX.CAS() по контракту обязан делать full fence -- хотя здесь он Клиффу часто излишен. А вот относительно s.m.Unsafe.compareAndSwapObject() контракт ничего такого не требует, хотя сановская реализация его по факту делает. И здесь, видимо, Клифф использует финт ушами -- реализация JVM на Azul Box мембара там не делает -- и поэтому на азулах его таблица будет работать еще быстрее.

...В качестве пометок/флагов он использует врапперы -- реальные значения оборачиваются в экземпляры специального класса Prime -- работа с обертыванием-развертыванием-проверкой добавляет веселья. Во многих местах у него data races, с которыми он обходится аккуратным (и не всегда очевидным) рассмотрением всех возможных вариантов. Еще есть куча "вторичных" оптимизаций -- таких, как сохранение однажды вычисленных хэшей для всех ключей, разные эвристики для ресайза, другие эвристики -- для уменьшения конкуренции за особо горячие данные... И так далее

3 августа 2011 г.

Всю свою жизнь я писал статические классы-утилиты таким образом:
public final class Helpers{
  private Helpers(){
  }
  public static String veryHelpfullMethod(){
  ....
  }
}

Недавно заметил у коллеги в коде такой вариант:
public final class Helpers{
  private Helpers(){
    throw new AssertionError("Not for instantiation!");
  }
}


Взял себе на вооружение. И правда, с нынешним reflection-ом закрытый конструктор уже не гарантирует, что класс невозможно будет создать. А вот так -- final-класс, с единственным конструктором и выбросом исключения -- я уже не могу придумать способа (кроме byte-code-modification, конечно -- но против лома нет приема).