Общую идею, которая стоит за дизайном disruptor я описывал в предыдущей статье, теперь хочу разобрать непосредственно реализацию. Итак, из чего построен disruptor?
Компоненты
Event -- объект-сообщение, который пересылается от производителя ко всем обработчикам через disruptor. Обычно событие создается-пересылается-обрабатывается-выкидывается, а здесь же
все события создаются заранее, при инициализации RingBuffer (для этого есть интерфейс фабрики событий
EventFactory), и живут, как минимум, столько же, сколько и сам буфер, а все, кто работает с RingBuffer только обновляют поля уже существующих событий (и поэтому с точки зрения реализации было бы точнее называть их Entry, и так оно и было в старых версиях -- но теперь авторы стараются быть ближе к пользователям). Ранее события должны были реализовывать AbstractEvent, но последней версии (2.5) было решено от этого отказаться, и теперь в буфер можно помещать совершенно произвольные объекты -- фреймворк больше ничего конкретного от них не хочет.
Sequence По смыслу это что-то вроде volatile long с некоторыми дополнительными оптимизациями -- например, защитой от false sharing и ослабленной семантикой set(). Объекты этого класса используются для нумерации последовательности событий.
Вообще сама концепция последовательности (Sequence) довольно фундаментальна для disruptor. По-сути, смысл ее в том, что все события в рамках конкретного экземпляра disruptor имеют глобальную логическую нумерацию. Логическую в том смысле, что физически-то экземпляры события переиспользуются -- как я уже писал, их сразу создается ровно столько, сколько нужно чтобы заполнить кольцевой буфер. Т.о. один и тот же физический экземпляр события в разные моменты времени может иметь разный логический номер. Нумерация идет с момента старта конкретного экземпляра disruptor-a, и покуда не закончится long (а он закончится весьма нескоро: даже при обработке 50 млн событий в секунду его хватит на сотни лет -- для практических целей можно считать его бесконечным).
Зная размер буфера этот порядковый номер элементарно преобразовать в индекс в массиве. С другой стороны, покуда речь не заходит об отображении номера на конкретный экземпляр события, можно оперировать только самой последовательностью, и это довольно удобно. Любые два события всегда можно упорядочить -- т.е. поток событий не только сериализуем, но эта сериализация еще и задана в явном виде. Логически мы получаем (почти)бесконечную, однозначно упорядоченную последовательность событий. Внутренняя же логика disruptor-а следит за тем, чтобы физически в каждый момент времени было задействовано не более bufferSize событий.
RingBuffer -- сам кольцевой буфер.
Во-первых, это сам массив записей (сообщений) с методами доступа. Размер массива должен быть степенью двойки, потому что авторы очень хотят иметь возможность вычислять остаток от деления через битовую маску. (
Лично мне это стремление не очень понятно -- я где-то со времен P2 был уверен, что экономить на таких мелочах уже бессмысленно).
Во-вторых, здесь сосредоточена та самая внутренняя логика, которая позволяет иметь логически бесконечную последовательность событий, аккуратно отображенную на ограниченный буфер -- логика управления последовательностью. Здесь три части:
- Текущий курсор. Это логический номер последнего опубликованного сообщения. Сообщения с номерами (cursor-bufferSize, cursor] доступны для дальнейшей обработки (это не совсем точно, есть еще выделенные но не опубликованные сообщения, но про это далее)
- Список курсоров всех обработчиков событий. С его помощью мы следим, чтобы не переиспользовать еще кем-то не обработанную ячейку -- а именно, мы не можем опережать последний (минимальный) из этих курсоров более, чем на bufferSize.
- Стратегия ожидания -- WaitStrategy -- каким способом мы будем ожидать, если кончились доступные нам события (либо производитель не успел еще опубликовать новые, либо наоборот, обработчики еще не успели обработать все опубликованное). Доступны BusySpin/Yelding/Sleeping/Blocking реализации.
- Стратегия выделения номеров -- ClaimStrategy. Это, пожалуй, самая сложная часть, разберу ее далее.
SequenceBarrier -- это объект, инкапсулирующий в себе набор курсоров (Sequence), которые нам не нужно опрежать. Т.е. если наш обработчик зависим от обработчиков №1 и №2, то у курсора нашего обработчика (последовательности сообщений, им обрабатываемой) будет барьер сверху из курсоров обработчиков №1 и №2. И прежде , чем перейти к обработке следующего события (сдвинуть наш курсор на следующее событие) мы должны дождаться, пока курсоры, от которых мы зависим, уйдут вверх. Вот именно эта операция
не лезть поперек батьки "убедиться, что наши предшественники ушли вперед" и является основной функциональностью SequenceBarrier-а. Поскольку ему иногда придется подождать, пока предшественники в самом деле уйдут, внутри него так же лежит стратегия ожидания WaitStrategy. Барьер создается RingBuffer-ом, так что его WaitStrategy та же самая, что использует и сам RingBuffer (вообще, WaitStrategy задается однажды при создании RingBuffer-а, и она используется в большинстве случаях, когда кому-то кого-то приходится ждать).
EventHandler -- это, собственно, интерфейс для обработчика событий, и ничего более.
EventProcessor -- это хелпер, который отвечает за пересылку событий из RingBuffer в EventHandler. Он реализует Runnable, и его нужно засовывать либо в Executor, либо явно в Thread.
Как это работает?
Начнем с производителя
Публикация сообщения в буфер disruptor-а происходит в три этапа: сначала мы резервируем (claim) под себя слот (а еще точнее -- номер события), в который собираемся что-то опубликовать, потом записываем в слот данные, и подтверждаем публикацию (publish). Авторы предпочитают говорить о двух этапах, намекая на то, что запись значений в слот -- внутреннее дело производителя. Пример кода с вики:
// Publishers claim events in sequence
final long sequence = ringBuffer.next();
final ValueEvent event = ringBuffer.get(sequence);
event.setValue(1234); // this could be more complex with multiple fields
// make the event available to EventProcessors
ringBuffer.publish(sequence);
Зачем нужна такая хитрость? Ну, это обратная сторона решения использовать заранее преаллоцированные объекты событий. Т.е. если рассуждать отвлеченно, для публикации события нужно а) выделить место в памяти куда будем класть содержание б) положить содержание в) сгенерировать номер события г) сделать событие доступным для обработчиков. В случае использования, например, очереди (ABQ), выделение памяти и наполнение обычно производятся где-то в стороне (в клиентском коде), а генерация номера события и публикация объединены -- событие доступно с момента помещения его в очередь, и в этот же момент определяется его порядок относительно других событий. Поэтому в случае с очередью эти 4 этапа как бы не заметны -- первые два вообще не имеют отношения к очереди, а последние 2 объединены.
В случае disruptor ситуация иная: выделение места под данные производится через сам RingBuffer (который, в этом сценарии, выступает в роли пула/кэша/аллокатора объектов сообщений), при этом, в силу дизайна самого disruptor-а номер события однозначно определяет объект события (как buffer[sequence % bufferSize]), мы не можем выделить событие не задав его номер -- поэтому выделение и генерация номера в disruptor-е объединены в фазу 1. С другой стороны, чтобы сделать событие доступным (опубликовать его, фаза "г") нам нужно во-первых, подвинуть курсор RingBuffer-а, во-вторых -- нужен мембар, гарантирующий что все записи в поля сообщения были реально выполнены. Оба эти элемента получаются автоматом за счет volatile write при обновлении курсора.
Более подробно: функциональность выделения объектов событий, и их последующей публикации -- это как раз и есть ответственность ClaimStrategy. Реально, конечно, стратегия с объектами событий не работает -- она работает только с их
номерами. Стратегия заявок держит внутри себя свой собственный курсор, указывающий какие номера событий уже выделены. При заявке на очередной номер стратегия делает внутри себя incrementAndGet(), проверяет, что объект с данным номером свободен -- т.е. все обработчики с предыдущего круга его уже обработали (если это не так, мы будем ждать -- ClaimStrategy не использует общую WaitStrategy, здесь, например используется spin->yield->sleep(1)) -- и выдает получившийся номер клиенту.
Коммит (публикация) несколько сложнее -- здесь ответственность за разные фазы поделена между ClaimStrategy и самим RingBuffer. А именно: ClaimStrategy отвечает за
порядок публикации сообщений -- они должны публиковаться ровно в том порядке, в котором они пронумерованы. Т.е. если сообщение №3 уже готово к публикации, но сообщение №2 еще нет -- мы будем ждать (by spin-and-yield), пока сообщение №2 таки не опубликуется, и только потом сможем перейти к публикации нашего №3. После того, как ClaimStrategy убедилась, что порядок в порядке -- RingBuffer просто обновляет свой курсор (здесь даже CAS не нужен, достаточно volatile write -- потому что мы можем быть уверены в текущем значении курсора by design). Это обновление, по совместительству, обеспечивает мембар, необходимый для публикации изменений в самом объекте сообщения (чуть более строго, по JMM: видимость изменений в полях объекта обеспечивается ребром HB между VW(cursor) при публикации, и VR(cursor) который делает каждый обработчик в ходе своего spin-wait-а).
Это разделение ответственности за коммит на две части мне не очень понятно -- по-моему, было бы логичнее чтобы всю ответственность взяла на себя ClaimStrategy
Разумеется, вся эта машинерия сильно упрощается, если издатель только один -- тогда все запросы идут из одного потока, и ClaimStrategy может быть сильно проще. Разработчики это предусмотрели, и существует SingleThreadedStrategy, которая этим преимуществом пользуется
Чисто технически, логика работы с последовательностями (выделение, публикация) вынесена в базовый класс Sequencer, а RingBuffer наследует Sequencer, и добавляет функциональность хранения самих объектов сообщений, и отображение sequence -> event. Сделано это для того, чтобы можно было легко реализовать свою собственную логику хранения сообщений. Например, если сообщения у нас это просто некий ID команды -- мы можем использовать для его хранения long[] без необходимости boxing/unboxing в/из Long
Обработчики
Ок, сообщение мы опубликовали. Что насчет обработчиков?
За пересылку сообщений из RingBuffer к соответствующим EventHandler-ам отвечает реализация интерфейса
EventProcessor. Основная рабочая реализация
BatchEventProcessor.
BatchEventProcessor держит внутри себя курсор (Sequence) на последнее обработанное сообщение и SequenceBarrier, задающий список курсоров, от которых зависит наш курсор (==список производителей/обработчиков, которых мы должны пропустить вперед нас, таким образом задаются взаимозависимостями между разными обработчиками).
Основной цикл работы BatchEventProcessor-а состоит в том, что он ждет на SequenceBarrier доступности события, следующего за последним обработанным (как я уже писал, SequenceBarrier как раз и позволяет дожидаться, пока событие с таким-то номером будет обработано всеми впереди идущими == SequenceBarrier.waitFor(cursor+1)). Дождавшись этого, он проверяет, какие события нам на самом деле доступны (гранулярность ожидания может оказаться слишком большой, и реально окажется доступен не только cursor+1, но и, скажем, вплоть до cursor+142), и в цикле все их пересылает в EventHandler. После чего -- один раз -- обновляет курсор, сдвигая его на количество обработанных сообщений (это и есть местный batching). Ну и плюс здесь еще всякая вспомогательная машинерия, типа обработки исключений и остановки обработчика.
Эпилог
Это, собственно все. Несмотря на кажущуюся сложность, идейно все довольно-таки просто. По всему коду повторяется один и тот же шаблон: есть курсор на последнее обработанное (в том или ином смысле) сообщение, есть список курсоров, которые ни в коем случае нельзя опережать. Мы сидим в spin-wait-е на списке "необгоняемых курсоров" (возможно, делая back off через yield/sleep(1)/park), и ждем пока минимальный из них не обгонит нас. Как только он это делает -- мы проверяем насколько именно он нас обогнал, и все эти сообщения -- в нашем распоряжении. Благодаря тому, что курсор везде volatile, пара операций "обновление значения курсора -- чтение нового значения курсора в ходе spin-wait" создает ребро HB, а значит все изменения, сделанные потоком, обновившим курсор
до этого обновления будут гарантированно видны всем, прочитавшим новое значение курсора
после этой самой операции чтения.
Единственное место, где требуется здесь CAS -- это выделение сообщений из пула для случая многих потоков-поставщиков -- но здесь и нельзя ничего сделать, потому что это в чистом виде consensus problem, которая (для потенциально неограниченного числа участников) обычными volatile write/read не решается.
Все остальное -- это оптимизационные тонкости, где какую стратегию ожидания с каким backoff использовать, где какой padding создать.
Интересно, что в disruptor производитель ничем принципиально от обработчика не отличается. В самом деле, обработчики же вполне свободны модифицировать сообщения, так что можно сделать "производителя" в виде заглушки
while(!Thread.currentThread().interrupted()){
// Publisher claim events in sequence
final long sequence = ringBuffer.next();
// make the event available to EventProcessors
ringBuffer.publish(sequence);
}
Такой производитель (он будет однопоточным, кстати) большую часть времени будет проводить в ожидании, чтобы не "съесть свой собственный хвост" -- зато все сообщения будут доступны для обработчиков, следующих за ним. Почему так не сделано? Потому что это не позволяет эффективно использовать batching для производителей:
while(!Thread.currentThread().interrupted()){
final byte[] message = network.receiveEvents(...);
final int eventsCount = findEventsCountIn(message);
// Publisher claim events in batch
final SequenceBatch sequenceBatch = new SequenceBatch(eventsCount);
ringBuffer.next(sequenceBatch);
//parse and copy event content from message to
for( int i=sequenceBatch.getStart();i<=sequenceBatch.getEnd();i++){
final Event event = ringBuffer.get(i);
//copy i-th event content from message to event object
...
}
// make the events available to EventProcessors
ringBuffer.publish(sequenceBatch);
}
То есть размер "пачки" для публикации зависит от текущей ситуации
на рынке в сети -- от текущего уровня нагрузки. Если мы реализуем производителя как обработчика -- мы вынуждены публиковать сообщения по одному -- каждый раз неся накладные расходы на volatile read/write (реально, скорее, на cache coherence traffic вызванный выполнением соответствующих барьеров). Если же мы подбираем размер пачки под текущую нагрузку -- мы амортизируем накладные расходы на передачу данных от ядра к ядру на количество этих сообщений, тем самым мы лучше адаптируемся к изменениям в трафике, сохраняя задержку обработки сообщений на минимальном уровне.