Компоненты
Event -- объект-сообщение, который пересылается от производителя ко всем обработчикам через disruptor. Обычно событие создается-пересылается-обрабатывается-выкидывается, а здесь же все события создаются заранее, при инициализации RingBuffer (для этого есть интерфейс фабрики событий EventFactory), и живут, как минимум, столько же, сколько и сам буфер, а все, кто работает с RingBuffer только обновляют поля уже существующих событий (и поэтому с точки зрения реализации было бы точнее называть их Entry, и так оно и было в старых версиях -- но теперь авторы стараются быть ближе к пользователям). Ранее события должны были реализовывать AbstractEvent, но последней версии (2.5) было решено от этого отказаться, и теперь в буфер можно помещать совершенно произвольные объекты -- фреймворк больше ничего конкретного от них не хочет.
Sequence По смыслу это что-то вроде volatile long с некоторыми дополнительными оптимизациями -- например, защитой от false sharing и ослабленной семантикой set(). Объекты этого класса используются для нумерации последовательности событий.
Вообще сама концепция последовательности (Sequence) довольно фундаментальна для disruptor. По-сути, смысл ее в том, что все события в рамках конкретного экземпляра disruptor имеют глобальную логическую нумерацию. Логическую в том смысле, что физически-то экземпляры события переиспользуются -- как я уже писал, их сразу создается ровно столько, сколько нужно чтобы заполнить кольцевой буфер. Т.о. один и тот же физический экземпляр события в разные моменты времени может иметь разный логический номер. Нумерация идет с момента старта конкретного экземпляра disruptor-a, и покуда не закончится long (а он закончится весьма нескоро: даже при обработке 50 млн событий в секунду его хватит на сотни лет -- для практических целей можно считать его бесконечным).
Зная размер буфера этот порядковый номер элементарно преобразовать в индекс в массиве. С другой стороны, покуда речь не заходит об отображении номера на конкретный экземпляр события, можно оперировать только самой последовательностью, и это довольно удобно. Любые два события всегда можно упорядочить -- т.е. поток событий не только сериализуем, но эта сериализация еще и задана в явном виде. Логически мы получаем (почти)бесконечную, однозначно упорядоченную последовательность событий. Внутренняя же логика disruptor-а следит за тем, чтобы физически в каждый момент времени было задействовано не более bufferSize событий.
RingBuffer -- сам кольцевой буфер.
Во-первых, это сам массив записей (сообщений) с методами доступа. Размер массива должен быть степенью двойки, потому что авторы очень хотят иметь возможность вычислять остаток от деления через битовую маску. (Лично мне это стремление не очень понятно -- я где-то со времен P2 был уверен, что экономить на таких мелочах уже бессмысленно).
Во-вторых, здесь сосредоточена та самая внутренняя логика, которая позволяет иметь логически бесконечную последовательность событий, аккуратно отображенную на ограниченный буфер -- логика управления последовательностью. Здесь три части:
- Текущий курсор. Это логический номер последнего опубликованного сообщения. Сообщения с номерами (cursor-bufferSize, cursor] доступны для дальнейшей обработки (это не совсем точно, есть еще выделенные но не опубликованные сообщения, но про это далее)
- Список курсоров всех обработчиков событий. С его помощью мы следим, чтобы не переиспользовать еще кем-то не обработанную ячейку -- а именно, мы не можем опережать последний (минимальный) из этих курсоров более, чем на bufferSize.
- Стратегия ожидания -- WaitStrategy -- каким способом мы будем ожидать, если кончились доступные нам события (либо производитель не успел еще опубликовать новые, либо наоборот, обработчики еще не успели обработать все опубликованное). Доступны BusySpin/Yelding/Sleeping/Blocking реализации.
- Стратегия выделения номеров -- ClaimStrategy. Это, пожалуй, самая сложная часть, разберу ее далее.
SequenceBarrier -- это объект, инкапсулирующий в себе набор курсоров (Sequence), которые нам не нужно опрежать. Т.е. если наш обработчик зависим от обработчиков №1 и №2, то у курсора нашего обработчика (последовательности сообщений, им обрабатываемой) будет барьер сверху из курсоров обработчиков №1 и №2. И прежде , чем перейти к обработке следующего события (сдвинуть наш курсор на следующее событие) мы должны дождаться, пока курсоры, от которых мы зависим, уйдут вверх. Вот именно эта операция
EventHandler -- это, собственно, интерфейс для обработчика событий, и ничего более.
EventProcessor -- это хелпер, который отвечает за пересылку событий из RingBuffer в EventHandler. Он реализует Runnable, и его нужно засовывать либо в Executor, либо явно в Thread.
Как это работает?
Начнем с производителя
Публикация сообщения в буфер disruptor-а происходит в три этапа: сначала мы резервируем (claim) под себя слот (а еще точнее -- номер события), в который собираемся что-то опубликовать, потом записываем в слот данные, и подтверждаем публикацию (publish). Авторы предпочитают говорить о двух этапах, намекая на то, что запись значений в слот -- внутреннее дело производителя. Пример кода с вики:// Publishers claim events in sequence final long sequence = ringBuffer.next(); final ValueEvent event = ringBuffer.get(sequence); event.setValue(1234); // this could be more complex with multiple fields // make the event available to EventProcessors ringBuffer.publish(sequence);
Зачем нужна такая хитрость? Ну, это обратная сторона решения использовать заранее преаллоцированные объекты событий. Т.е. если рассуждать отвлеченно, для публикации события нужно а) выделить место в памяти куда будем класть содержание б) положить содержание в) сгенерировать номер события г) сделать событие доступным для обработчиков. В случае использования, например, очереди (ABQ), выделение памяти и наполнение обычно производятся где-то в стороне (в клиентском коде), а генерация номера события и публикация объединены -- событие доступно с момента помещения его в очередь, и в этот же момент определяется его порядок относительно других событий. Поэтому в случае с очередью эти 4 этапа как бы не заметны -- первые два вообще не имеют отношения к очереди, а последние 2 объединены.
В случае disruptor ситуация иная: выделение места под данные производится через сам RingBuffer (который, в этом сценарии, выступает в роли пула/кэша/аллокатора объектов сообщений), при этом, в силу дизайна самого disruptor-а номер события однозначно определяет объект события (как buffer[sequence % bufferSize]), мы не можем выделить событие не задав его номер -- поэтому выделение и генерация номера в disruptor-е объединены в фазу 1. С другой стороны, чтобы сделать событие доступным (опубликовать его, фаза "г") нам нужно во-первых, подвинуть курсор RingBuffer-а, во-вторых -- нужен мембар, гарантирующий что все записи в поля сообщения были реально выполнены. Оба эти элемента получаются автоматом за счет volatile write при обновлении курсора.
Более подробно: функциональность выделения объектов событий, и их последующей публикации -- это как раз и есть ответственность ClaimStrategy. Реально, конечно, стратегия с объектами событий не работает -- она работает только с их номерами. Стратегия заявок держит внутри себя свой собственный курсор, указывающий какие номера событий уже выделены. При заявке на очередной номер стратегия делает внутри себя incrementAndGet(), проверяет, что объект с данным номером свободен -- т.е. все обработчики с предыдущего круга его уже обработали (если это не так, мы будем ждать -- ClaimStrategy не использует общую WaitStrategy, здесь, например используется spin->yield->sleep(1)) -- и выдает получившийся номер клиенту.
Коммит (публикация) несколько сложнее -- здесь ответственность за разные фазы поделена между ClaimStrategy и самим RingBuffer. А именно: ClaimStrategy отвечает за порядок публикации сообщений -- они должны публиковаться ровно в том порядке, в котором они пронумерованы. Т.е. если сообщение №3 уже готово к публикации, но сообщение №2 еще нет -- мы будем ждать (by spin-and-yield), пока сообщение №2 таки не опубликуется, и только потом сможем перейти к публикации нашего №3. После того, как ClaimStrategy убедилась, что порядок в порядке -- RingBuffer просто обновляет свой курсор (здесь даже CAS не нужен, достаточно volatile write -- потому что мы можем быть уверены в текущем значении курсора by design). Это обновление, по совместительству, обеспечивает мембар, необходимый для публикации изменений в самом объекте сообщения (чуть более строго, по JMM: видимость изменений в полях объекта обеспечивается ребром HB между VW(cursor) при публикации, и VR(cursor) который делает каждый обработчик в ходе своего spin-wait-а). Это разделение ответственности за коммит на две части мне не очень понятно -- по-моему, было бы логичнее чтобы всю ответственность взяла на себя ClaimStrategy
Разумеется, вся эта машинерия сильно упрощается, если издатель только один -- тогда все запросы идут из одного потока, и ClaimStrategy может быть сильно проще. Разработчики это предусмотрели, и существует SingleThreadedStrategy, которая этим преимуществом пользуется
Чисто технически, логика работы с последовательностями (выделение, публикация) вынесена в базовый класс Sequencer, а RingBuffer наследует Sequencer, и добавляет функциональность хранения самих объектов сообщений, и отображение sequence -> event. Сделано это для того, чтобы можно было легко реализовать свою собственную логику хранения сообщений. Например, если сообщения у нас это просто некий ID команды -- мы можем использовать для его хранения long[] без необходимости boxing/unboxing в/из Long
Обработчики
Ок, сообщение мы опубликовали. Что насчет обработчиков?За пересылку сообщений из RingBuffer к соответствующим EventHandler-ам отвечает реализация интерфейса EventProcessor. Основная рабочая реализация BatchEventProcessor.
BatchEventProcessor держит внутри себя курсор (Sequence) на последнее обработанное сообщение и SequenceBarrier, задающий список курсоров, от которых зависит наш курсор (==список производителей/обработчиков, которых мы должны пропустить вперед нас, таким образом задаются взаимозависимостями между разными обработчиками).
Основной цикл работы BatchEventProcessor-а состоит в том, что он ждет на SequenceBarrier доступности события, следующего за последним обработанным (как я уже писал, SequenceBarrier как раз и позволяет дожидаться, пока событие с таким-то номером будет обработано всеми впереди идущими == SequenceBarrier.waitFor(cursor+1)). Дождавшись этого, он проверяет, какие события нам на самом деле доступны (гранулярность ожидания может оказаться слишком большой, и реально окажется доступен не только cursor+1, но и, скажем, вплоть до cursor+142), и в цикле все их пересылает в EventHandler. После чего -- один раз -- обновляет курсор, сдвигая его на количество обработанных сообщений (это и есть местный batching). Ну и плюс здесь еще всякая вспомогательная машинерия, типа обработки исключений и остановки обработчика.
Эпилог
Это, собственно все. Несмотря на кажущуюся сложность, идейно все довольно-таки просто. По всему коду повторяется один и тот же шаблон: есть курсор на последнее обработанное (в том или ином смысле) сообщение, есть список курсоров, которые ни в коем случае нельзя опережать. Мы сидим в spin-wait-е на списке "необгоняемых курсоров" (возможно, делая back off через yield/sleep(1)/park), и ждем пока минимальный из них не обгонит нас. Как только он это делает -- мы проверяем насколько именно он нас обогнал, и все эти сообщения -- в нашем распоряжении. Благодаря тому, что курсор везде volatile, пара операций "обновление значения курсора -- чтение нового значения курсора в ходе spin-wait" создает ребро HB, а значит все изменения, сделанные потоком, обновившим курсор до этого обновления будут гарантированно видны всем, прочитавшим новое значение курсора после этой самой операции чтения.Единственное место, где требуется здесь CAS -- это выделение сообщений из пула для случая многих потоков-поставщиков -- но здесь и нельзя ничего сделать, потому что это в чистом виде consensus problem, которая (для потенциально неограниченного числа участников) обычными volatile write/read не решается.
Все остальное -- это оптимизационные тонкости, где какую стратегию ожидания с каким backoff использовать, где какой padding создать.
Интересно, что в disruptor производитель ничем принципиально от обработчика не отличается. В самом деле, обработчики же вполне свободны модифицировать сообщения, так что можно сделать "производителя" в виде заглушки
while(!Thread.currentThread().interrupted()){ // Publisher claim events in sequence final long sequence = ringBuffer.next(); // make the event available to EventProcessors ringBuffer.publish(sequence); }Такой производитель (он будет однопоточным, кстати) большую часть времени будет проводить в ожидании, чтобы не "съесть свой собственный хвост" -- зато все сообщения будут доступны для обработчиков, следующих за ним. Почему так не сделано? Потому что это не позволяет эффективно использовать batching для производителей:
while(!Thread.currentThread().interrupted()){ final byte[] message = network.receiveEvents(...); final int eventsCount = findEventsCountIn(message); // Publisher claim events in batch final SequenceBatch sequenceBatch = new SequenceBatch(eventsCount); ringBuffer.next(sequenceBatch); //parse and copy event content from message to for( int i=sequenceBatch.getStart();i<=sequenceBatch.getEnd();i++){ final Event event = ringBuffer.get(i); //copy i-th event content from message to event object ... } // make the events available to EventProcessors ringBuffer.publish(sequenceBatch); }То есть размер "пачки" для публикации зависит от текущей ситуации
Комментариев нет:
Отправить комментарий