30 декабря 2011 г.

Thread Affinity

Peter Lawrey, которого я уже как-то упоминал, решил написать свою собственную библиотеку для привязки потоков к ядрам из джавы. Ну, понятно, с шахматами и поэтессами. Его реализация (сам код можно посмотреть здесь) изначально основывалась на JNI-обертке вокруг sched_setaffinity(3)/sched_getaffinity(3). Я не смог удержаться, и вписался -- благодаря чему теперь у него есть и "моя" реализация, использующая JNA. Буду стараться уговорить его забить на поддержку JNI -- гемморой с поддержанием portable build для сборки JNI-обертки по-моему не стоит свеч.

Что меня заинтересовало в его реализации -- так это дизайн. Вместо тривиального setAffinityMask/getAffinityMask он ввел абстракцию "блок кода, выполнение которого привязано к ядру". Выглядит это так:
public void run() {
            final AffinityLock al = AffinityLock.acquireLock();
            try {
                //performance-critical block of code
            } finally {
                al.release();
            }
        }
С текущей реализацией есть несколько проблем.

Во-первых, реализация-то есть только для unix/linux -- для платформ, поддерживающих sched_setaffinity. Моя попытка портировать на винду сходу не удалась -- SetThreadAffinity я еще нашел, а вот где взять GetThreadAffinity -- так и не разобрался. Поскольку у меня еще и нет винды для тестов -- эту часть я решил отложить.
Попытка портировать на Мак (что для меня очень актуальна) обломалась еще более серьезно -- на Маке вообще нет возможности назначать привязку. Интерфейс взаимодействия с планировщиком потоков, который представляет MacOS X начиная с 10.5+ позволяет только давать рекомендации планировщику, относительно того, что данные потоки неплохо бы поместить "как можно ближе" друг к другу -- в смысле разделяемых кэшей. Это практически обратное тому, что хочется.

Во-вторых, сама по себе идея запрещать relocation для preformance-critical (скорее, latency-critical, если уж быть точным) потоков -- она, конечно, хороша. Но это далеко не все, чего бы хотелось от интерфейса управления привязками. Навскидку, я могу придумать такие пункты:
  1. Запрещать перемещение потока во время выполнения определенного кода. Это то, что делает библиотека Питера. Т.е. мне пофигу, на каком ядре будет выполняться код, но я хочу, чтобы это ядро было фиксированным все время выполнения данного участка кода. Что я пытаюсь этим выиграть? -- я хочу не тратить время на смену контекста и прогревание кэша при перемещении потока.
  2. Закрепить ядро за конкретным потоком эксклюзивно. Расширение предыдущего пункта -- мало того, что я хочу запретить перемещение моего потока с текущего ядра, так я еще и хочу запретить перемещение любого другого потока на это ядро (и, разумеется, выпинать с этого ядра те потоки, которые уже на нем сидят сейчас). Что я хочу выиграть здесь? -- в основном, я хочу получить весь кэш ядра только для своего потока.
    Сложность здесь в том, что внутри работающей JVM есть хуева туча потоков, выполняющих всякие системные надобности. Начиная от совсем внутренних потоков JVM, типа GC/Finalizer и JIT-compiler, и кончая потоками, запускаемыми внутри стандартной библиотеки -- типа всяких таймеров, чистильщиков очередей слабых ссылок, и прочего мусора. Получить доступ к этим потокам, чтобы можно было явно запретить им использовать определенные ядра -- довольно непросто.
    Немного усиленный вариант -- запретить использование не только конкретного логического ядра, а именно физического -- т.е. в случае hyper-threading запретить использование парного аппаратного потока. Смысл такой: "а ну отъебитесь все от моего кэша".
    Дополнительная, хоть и сомнительная, опция -- распространить эту эксклюзивную привязку на всю систему. То есть не только в рамках текущего процесса JVM все потоки кроме выделенного обходят данное ядро стороной -- но и глобально во всей системе никто более на это ядро не претендует. Сомнительна эта опция потому, что кажется не очень хорошей идеей позволять конкретному приложению вмешиваться в работу планировщика потоков на уровне всей системы -- не по чину ему.
  3. Связать группу потоков (идея из MacOS X API). Т.е. я хочу, чтобы определенная группа потоков была расположена на таких ядрах, коммуникация между которыми будет максимально дешева. При этом мне все равно, какие именно это ядра.

Вопрос состоит в том, как именно максимально абстрактно соединить в одном API все эти опции...

P.S. Как мне подсказывают, это мой сотый пост за время ведения блога. Ура мне!

16 декабря 2011 г.

final-поля для безопасной публикации

В догонку к обсуждениям семантики final-полей. Обычный класс, с изменямыми полями, сложно безопасно опубликовать через data race (т.е. без синхронизации):
sharedRef = new Mutable();
Но можно опубликовать через "обертку" -- класс с final-ссылкой:
sharedRef = new ImmutableReference(new Mutable());
И у меня давно уже бродит идея -- почему бы не сделать сам класс для себя "публикующей оберткой"?
public class SelfPublisher {

    private int value;

    private final SelfPublisher safeThis;

    public SelfPublisher( final int value ) {
        this.value = value;
        this.safeThis = this;
    }

    public int getValue(){
        return safeThis.value;
    }
}
Насколько я вижу -- это позволит совершенно безопасно публиковать его через даже data race. Ценой -- увы -- дополнительных затрат памяти на избыточное поле. Ну и дополнительной операции присваивания в конструкторе.

P.S. Код -- уродлив, по моим критериям изящества. Это, скорее, трюк, чем пример для подражания :) И use case для него тоже довольно туманен. Но сама идея мне показалась любопытной.

9 декабря 2011 г.

Гарантии для final-полей

На днях была интересная переписка с Глебом Смирновым, автором статьи про модель памяти на Хабре. Мы обсуждали тонкости спецификации final-полей. Камнем преткновения был такой пример:
public class A {
    public int value;
    public final int finalValue;

    public A(final int value, final int finalValue) {
        this.value = value;
        this.finalValue = finalValue;
    }
    ....
}

//где-то в коде Thread1:
public A sharedReference = new A();

//где-то в коде Thread2:
if( sharedReference != null ){
    //
    final int finalValue = sharedReference.finalValue;
    final int value = sharedReference.value;
}
Довольно общеизвестно, что спецификация final-полей в JMM гарантирует, что доступ к .finalValue корректный (== запись значения в .finalValue внутри конструктора происходит до чтения .finalValue через общедоступную ссылку, присвоенную после завершения конструктора). Вопрос в том, является ли корректным в том же смысле чтение поля .value? Т.е. можно ли сесть на хвост (piggyback) той магии, которая приводит к корректной передаче значений final-полей между потоками?

На первый взгляд кажется, что можно -- ведь обычные ребра happens-before транзитивны: A hb B, B hb C => A hb C. При этом дано, что действия, идущие до А в program order идут до А и в happens-before order -- т.е. в рамках одного потока частичный порядок HB совпадает с порядком инструкций в коде программы. Значит, поскольку присвоение значения полю .value в конструкторе происходит до присвоения .finalValue, а присвоение .finalValue происходит до чтения в потоке 2, а оно, в свою очередь, происходит до чтения .value в потоке 2 -- то по транзитивности получается, что присвоение .value в конструкторе happens-before чтения .value в потоке 2.

Однако©, на самом деле© -- это неправда. Ну, мне так кажется :)



Во-первых, определение отношения порядка в случае операций с final-полями содержит такое уточнение (Евангелие от ИоанаJMM, 17.5.1): Given a write w, a freeze f, action a (that is not a read of a final field), a read r1 of the final field frozen by f and a read r2 such that hb(w, f), hb(f, a), mc(a, r1) and dereferences(r1 , r2), then when determining which values can be seen by r2, we consider hb(w, r2) (but these orderings do not transitively close with other happens-before orderings). (выделение мое)

То есть то отношение порядка, порождаемое семантикой final-полей -- это такое особое happens-before. Оно почти как обычное happens-before, но не транзитивно с ним.

Во-вторых, чтобы быть совсем строгим, я попытаюсь продраться через формальное определение семантики в 17.5.1. Читать такое на русском я когда-то очень неплохо умел, но буржуйский -- другое дело, так что прошу ногами пианиста не бить, он играет как умеет. Лучше в комментариях предлагайте свои варианты трактовки. Итак, поехали:



Чтобы определить семантику final-полей нам понадобится несколько новых терминов. А именно:

Заморозка (freeze) -- это специальное действие, которое происходит в момент завершения (нормального, или с выбросом исключения) конструктора объекта, содержащего final-поле.

Помимо заморозки нам понадобятся еще два специальных частичных порядка (кроме уже всем знакомого happens-before):
  • порядок разыменования (dereference chain, обозначается далее как dereferences(a,b))
  • порядок доступа к памяти (memory chain, обозначается далее как mc(a,b))
Оба этих порядка считаются частью сценария выполнения (трассы над кодом, execution), и поэтому, для конкретного сценария, считаются фиксированными. Эти два частичных порядка должны удовлетворять определенным ограничениям (но решение, удовлетворяющее этим ограничениям, не обязано быть единственным)


То, что идет дальше, лично мне кажется смесью определений самих порядков ("что такое MC/DC"), и условий (которые формулируются, в том числе, в терминах только что введенных MC/DC), которым должны удовлетворять допустимые по JMM сценарии исполнения. Мне это кажется очень неудобным для восприятия, но я оставляю эту часть как она есть

Порядок разыменования, dereferences: Если A -- чтение/запись поля объекта О, причем О инициализирован не текущим потоком, тогда в текущем потоке должна существовать операция чтения R, которая видит адрес объекта О, и такая, что dereferences(R, A). Другими словами: операция чтения адреса объекта должна происходить до (в смысле порядка разыменования) любой операции чтения/записи полей объекта.

Порядок доступа к памяти, mc:
  1. Если чтение R видит результат записи W, то mc(W,R) (запись происходит до чтения в смысле частичного порядка доступа к памяти)
  2. Если dereferences(А,Б) (А происходит до Б в смысле порядка разыменования), то и mc(А, Б) (А происходит до Б и в смысле порядка доступа к памяти) Т.е dereferences является "подмножеством" mc.
  3. Если W -- запись адреса объекта О, производимая не тем потоком, который О инициализировал, то, в этом же потоке должно существовать некоторое чтение R, которое видит адрес объекта О, и такое, что mc(R,W) (R происходит до W в смысле порядка доступа к памяти)


Теперь само определение семантики final-полей:
Дано:
  1. Некоторая запись W
  2. Заморозка F
  3. Произвольное действие с памятью (кроме чтения final-поля) A
  4. Чтение R1 финального поля, замороженного F
  5. Чтение R2
Пусть между собой эти действия связаны такими соотношениями: hb(W,F), hb(F, A), mc(A, R1), dereferences(R1, R2).

Тогда: определяя, какие значения могут быть прочитаны R2, мы можем полагать, что W и R2 связаны порядком happens-before: hb(W, R2). Но: это отношение порядка не транзитивно с другими отношениями порядка HB.

Отдельно заметим, что отношение порядка разыменования (dereferences) рефлексивно -- т.е. dereferences(a,a) всегда верно. Поэтому в определении выше R2 может совпадать с R1.

Только те записи, что подходят под определение семантики final-полей -- гарантированно упорядочены до чтения final-поля. (Я понимаю этот пункт просто как еще одно напоминание, что гарантии, даваемые для final-полей распространяются ровно настолько, насколько указывает данное определение, и не дальше)



Теперь попробую изложить то же самое на простом языке и более развернуто.

hb(W,F): Наша "некоторая запись" происходит до завершения конструктора, или до "заморозки", что то же самое в данном случае.

hb(F, A): "некоторое действие с памятью" происходит после завершения конструктора ("заморозки").

mc(A, R1): Чтение final-поля видит результат "некоторого действия с памятью"

dereferences(R1, R2): R2 это либо само чтение значения поля (т.е. R2==R1), либо это чтение поля/элемента какого-то объекта, доступного по цепочке ссылок, начинающейся в final-поле.

Что это за загадочное "некоторое действие А"? Насколько я понимаю смысл -- это должна быть публикация ссылки на объект. Но точнее описать не могу -- для меня пока загадка, почему же это действие описано настолько абстрактно -- может ли А быть чем-то кроме публикации ссылки?



Уф. Выдыхаем...

Что можно видеть из этого определения? Во-первых, возможности "сесть на хвост" -- по крайней мере, в том смысле, в котором был пример в начале статьи -- нельзя. Семантика final-полей гарантирует видимость записей, идущих до freeze только для тех чтений, что идут через цепь разыменований, начиная с самого final-поля. Поскольку увидеть .value через эту цепь нельзя, то видимость значения, записанного в это поле в конструкторе не гарантируется.

В этом определении мне интересны еще несколько вещей (помимо его зубодробительности, конечно).

Во-первых, оно идет в некотором смысле в обратном направлении по отношению к обычным определениям JMM. В "обычном" определении логика такая: мы сначала определяем упорядоченность действий (через набор правил порождения ребер happens-before + транзитивность), а потом говорим, что если hb(А,Б) => то Б гарантированно видит результат действия А. Здесь же ситуация обратная: мы говорим, что если Б видит результат А => то определенные действия происходящие до А происходят до определенных действий, происходящих после Б.

Во-вторых здесь необычная ситуация с определением того, для каких операций чтения/записи пригодно это определение. Формально все начинается с любой операции записи, идущей до заморозки. Но в конце оказывается, что ребро HB можно установить по этому определению начиная с любой записи -- да не к любому чтению. А поскольку ребро это еще и не транзитивно -- то получается, что ограничивая спектр операций чтения, на которых ребро может заканчиваться -- мы тем самым ограничиваем и спектр операций записи, на которых оно может начинаться. А именно: ребро может начинаться только на таких операциях записи, результаты которых могут увидеть чтения из того самого ограниченного списка. Для других операций записи эффект ребра HB просто не наблюдаем :) Причем если бы нам оставили транзитивность -- мы бы могли по транзитивности продлить ребро HB с "разрешенного" чтения до следующей в program order операции, которая уже могла бы быть любым (в том числе и "не разрешенным") чтением. Но у нас транзитивность отобрали. И поэтому вместо любой операции записи это определение позволяет проводить ребра HB только от таких записей, которые а) происходят до завершения конструктора б) результаты которых видны по цепочке ссылок начиная с final-поля, инициализированного в этом конструкторе.

В третьих, обратите внимание, что от записей W не требуется, чтобы они происходили внутри конструктора. Они могут происходить где угодно, только бы была возможность показать, что они происходят до завершения конструктора (до заморозки). Это как раз и означает, что мы можем передать в конструктор объекта сколь угодно сложный объектный граф, заполненный где-то на стороне (но гарантированно до окончания конструктора), присвоить в конструкторе ссылку на него final-полю, и далее мы можем быть уверены, что всё его содержимое будет видимо любому потоку, читающему граф через разыменование final-поля. Это тоже своеобразный piggybacking, и вот такой piggybacking моделью памяти джавы разрешен.

6 декабря 2011 г.

Java puzzle System.exit and locks.

Встретил сегодня у Peter Lawrey в его блоге Vanilla Java статью с головоломкой. Известно, что при вызове System.exit() секции finally не выполняются. А что будет с блокировками?
private static final Object lock = new Object();

public static void main( String... args ) {
    Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() {
        @Override
        public void run() {
            System.out.println( "Locking" );
            synchronized ( lock ) {
                System.out.println( "Locked" );
            }
        }
    }));
    synchronized ( lock ) {
        System.exit(0);
    }
}
Что напечатает этот код?

Собственно, уже по тому, как это подано -- можно догадаться об ответе :) Да, код напишет Locking, после чего зависнет -- JVM не будет завершаться.

Лишнее напоминание о том, что это только на уровне языка java конструкция synchronized( lock ){..} выглядит чем-то неразделимым. На уровне байт-кода инструкции monitorenter/monitorexit -- вполне себе отдельные инструкции, и вовсе не обязаны идти парами в рамках одного метода. И блок synchronized на уровне байт-кода выглядит примерно как
monitorenter lock
try{
   ...
}finally{
    monitorexit lock
}
И то, что System.exit() может эту парность разорвать -- выглядит здесь уже довольно очевидно

UPD: Как мне правильно указали в комментариях, парность/непарность здесь ни при чем. Согласно спецификации Runtime.exit() (она более подробная, чем у System.exit) выполнение кода приостанавливается в точке вызова exit(), и выполняется shutdown sequence. Первая фаза shutdown sequence -- выполнение, каждый в своем, отдельном потоке, shutdown hooks. Поскольку основной поток остановлен в точке вызова exit() -- монитор захвачен. Поскольку shutdown hook выполняется в отдельном потоке -- он подвисает на попытке захватить монитор. И, в полном соответствии со спекой, подвисший shutdown hook подвешивает и весь процесс завершения JVM.

1 декабря 2011 г.

StackOverflow: AtomicReferenceFieldUpdater semantic

Очередной интересный вопрос на stackoverflow -- неочевидное место в документации к AtomicReferenceFieldUpdater:

Note that the guarantees of the compareAndSet method in this class are weaker than in other atomic classes. Because this class cannot ensure that all uses of the field are appropriate for purposes of atomic access, it can guarantee atomicity and volatile semantics only with respect to other invocations of compareAndSet and set.

Автору вопроса (да и мне теперь тоже) не очень понятен смысл фразы "weaker guarantees" -- в чем именно-то они "слабее"?. Что за загадочная фраза "all uses of the field are appropriate for purposes of atomic access" -- какие использования поля здесь имеются в виду "подходящими" для атомарных операций, а какие -- нет?

Меня сильно смущает приводимый пример с тем, что атомарность и волатильность поддерживаются только между вызовами compareAndSet() и set(). А что с обычным присвоением (this.value = ...)? Я-то наивно считал, что set() полностью эквивалентен по семантике обычному присвоению значения полю (поля, с которыми работает *Updater, обязаны быть volatile -- это проверяется при создании *Updater-а), просто set() это то же самое, только через reflection, с соответствующими дополнительными проверками и накладными расходами. Получается -- это не так?

Исследования кода показывают, что, например, в обычных AtomicXXX классах set() -- это просто присвоение:
public class AtomicReference<V>  implements java.io.Serializable {
    //...
    private volatile V value;
    //...
    public final void set(V newValue) {
        value = newValue;
    }
В AtomicReferenceFieldUpdaterImpl это несколько иначе:
public void set(T obj, V newValue) {
        if (obj == null || obj.getClass() != tclass || cclass != null ||
            (newValue != null && vclass != null &&
             vclass != newValue.getClass()))
           updateCheck(obj, newValue);

        unsafe.putObjectVolatile(obj, offset, newValue);
    } 
Но название putObjectVolatile(..) как бы намекает, что это то же самое, что и обычное присвоение значения волатильному полю.

Будем ждать прояснения ситуации.

UPD: В дискуссии с ryakh прояснилось, что же имеется в виду. Судя по всему, этот любопытный пункт в документации оставлен для платформ, на которых нет аппаратной поддержки для атомарных CAS-ов (или чего-то, им равномощного). В этом случае реализация RMW-операций потребует использования програмных блокировок. Но блокировки могут обеспечить атомарность только по отношению к другим операциям, защищенным той же блокировкой. Таким образом возникает выбор: либо на таких платформах надо все операции volatile store защищать программными блокировками -- что будет дико неэффективно. Либо оставлять обычные операции чтения/записи volatile-переменных как есть, а блокировками защищать только операции классов AtomicXXX и XXXUpdater. Но во втором случае мы получаем потенциальную возможность для обычной записи volatile a = 3 пересечься с атомарным изменением
synchronized(internalLock){ 
    if(a == 1) {
        /*a=3 может вклиниться сюда*/ 
        a = 2; 
    }
}.
Отсюда и получается ограничение, описанное в документации: если вы используете XXXUpdater для выполнения CAS-ов над каким-то полем, то вы должны забыть про обычную запись в это поле, и использовать для этого только метод XXXUpdater.set() -- только тогда рантайм может гарантировать, что атомарные изменения действительно будут атомарными на всех доступных платформах.

Хочу заметить, что чтение можно спокойно делать напрямую, без XXXUpdater.get() -- чтение, очевидно, никак не интерферирует с атомарными изменениями.

Мне кажется, это еще один пример того, что write once, run anywhere -- слоган довольно условный. Есть масса способов написать на джаве вполне рабочую программу, которая будет годами отлично выполнять свои функции на целом спектре аппаратных платформ -- но не будет полноценно кроссплатформенной из-за какой-нибудь такого рода пропущенной "мелочи".

На этом примере так же легко можно видеть, как желание дать разработчикам более низкоуровневые инструменты вступает в противоречие с кроссплатформенными абстракциями. В самом деле, если бы у нас были только классы AtomicXXX -- полностью инкапсулирующие все манипуляции со своим значением -- никаких уточнений в документацию и не потребовалось бы. Но нам дали XXXUpdater, с помощью которого можно несколько улучшить компоновку объектов в памяти, и уменьшить затраты памяти -- и в нагрузку мы получили еще и тонкости взаимодействия между атомарными изменениями через Updater, и обычными записями...

29 ноября 2011 г.

Puzzler из concurrency-interest

Замечательную штуку вчера продемонстрировал Hans Boehm в листе рассылки concurrency-interest (любопытно, что тема началась с запроса на легализацию разных трюков с s.m.Unsafe -- дискуссии concurrency-interest частенько очень сильно плутают). Ханс писал про опасность методов типа lock.tryLock() -- в смысле сложности их правильного понимания. Вот, например, код:

//Thread 1:
x = 17;
l.lock();

//Thread 2:
while ( l.tryLock() ) {
    l.unlock();
}
... x ...  // какое значение у x в этой точке?

Чтобы ответить на вопрос, обязан ли x быть равным 17 в конце второго потока надо ответить на вопрос "есть ли здесь data race?"

С одной стороны, с точки зрения "универсального" определения программа считается data race free если для всех sequentally consistent трасс над ней отсутствует конкурентный доступ к данным. В данном случае, кажется, это выполняется -- во всех SC трассах сначала в x пишется 17, и только потом x читается -- поэтому кажется, что код data race free, а значит после цикла во втором потоке x должен быть равен 17.

С другой стороны, JMM определяет data race free несколько иначе -- все конкурирующие доступы к данным должны быть упорядочены через happens-before. Поскольку между потоком 1 и 2 нет никаких ребер HB, то доступы к x на запись и на чтение в разных потоках не упорядочены через HB, и, следовательно, они образуют гонку -- а значит, значение x после цикла во втором потоке не определено.



Найти правильную точку зрения сравнительно просто -- достаточно вспомнить, что критические секции (и вообще, и в java в частности) имеют семантику "можно войти, нельзя выйти" (roach motel semantics) -- код может вноситься внутрь секции, но не может выноситься из нее. Это значит, что x=17 и l.lock() в потоке 1 могут быть переупорядочены -- присвоение может быть внесено в критическую секцию. Это сразу дает право сказать, что здесь таки есть гонка, и значение x во втором потоке действительно не определено.

Но меня это объяснение не полностью удовлетворяет -- мы здесь просто предъявили пример преобразования кода, допустимого в соответствии с JMM, и дающего неопределенный результат всего примера. А хочется же понять, где в рассуждениях ошибка.

А ошибка в рассуждениях в том, что, когда мы придумываем SC трассы над кодом, мы делаем два неявных предположения о том, что такое лок. Первое из них -- что лок неявно содержит некое состояние -- каким потоком он сейчас захвачен. Это состояние меняется вызовами lock()/unlock(), а так же запрашивается, и, возможно, меняется вызовом tryLock(). И это предположение верно -- у лока действительно есть это внутреннее состояние.

Но есть еще и второе предположение -- что изменения этого состояния методами lock()/unlock()/tryLock() -- имеют семантику volatile store/load. И вот это уже не верно. В соответствии со спецификацией, lock()/tryLock() имеют (==обязаны иметь) только семантику acquire, а unlock() -- только семантику release. А это значит, что мы не все трассы рассмотрели.

Путаницы добавляет тот факт, что на реализация блокировок обычно делается через CAS, а CAS на x86 имеет семантику full fence. То есть на x86 для любой реализации монитора (из мне известных) lock() будет иметь семантику full fence просто в силу того, что аппаратная модель памяти соответствующего процессора более слабых барьеров не предусматривает. И код Ханса будет скорее всего работать корректно (==не будет содержать гонок). Но это особенности именно аппаратной модели памяти конкретного процессора. Скажем, на пресловутых Azul-ах есть CAS без семантики барьера, и есть односторонние барьеры. И там вполне можно реализовать (правда, не на уровне java, а разве что на уровне интринсиков JIT-а) монитор, у которого lock() будет иметь честную семантику только acqiure.

Собственно, если я правильно понял Ханса, он и обращает внимание на то, что с введением методов типа tryLock() монитор становится легко принять за этакую неявную volatile переменную -- что не соответствует реальной семантике операций с мониторами.

28 ноября 2011 г.

В продолжение темы: глубже в synchronized vs ReentrantLock

После провокационной статьи Мартина у нас с Алексеем Шипилевым и Артемом состоялась любопытная переписка. Результаты ее более полно описал Артем, в своей статье.

Для меня там была одна важная, но неожиданная деталь. С предыдущей своей попытки разобраться в различиях synchronized и RL блокировок у меня создалось ощущение, что они во многом схожи. То есть synchronized имеет иерархию biased->thin(spin)->fat (строго говоря, скорее thin->(biased|fat) ), а RL имеет иерархию spin->park (с опциональным "выхватыванием" (barging). Это значит (как мне казалось), что оба типа блокировок почти идентичны на уровнях thin-fat. Я видел synchronized lock как (thin+fat) реализованный на уровне JVM + biasing, а ReentrantLock -- как (thin+fat) реализованный на уровне java + barging. При этом (thin+fat) часть у них очень близка (так мне казалось).

Оказывается, что это неправда. Действительно, и у synchronized и у RL есть части thin (CAS-based), и fat (OS-level parking). Но они сильно отличаются по политикам перехода между этими двумя режимами. А именно:

ReentrantLock на каждую попытку захвата монитора сначала пытается захватить монитор быстрым CAS-ом, и, в случае неуспеха этого благого начинания -- паркуется. То есть, по-сути, у RL нет "состояния" thin/fat -- он всегда является их суперпозицией.

synchronized же имеет явное поле (часть markword, вроде бы) "текущий тип блокировки" -- со значениями biased/thin/fat. И у него есть определенные правила перехода между этими типами. В частности, лок стартует в состоянии thin. Если привязка блокировок включена, то, по истечении определенного времени (biasing delay) если лок все время захватывается одним потоком -- он становится biased. Если лок захватывается разными потоками, но остается при этом uncontended (т.е. в каждый момент времени им владеет только один поток) - лок остается в состоянии thin. Если же случается столкновение потоков на блокировке, то лок "надувается" (inflate) -- переходит в состояние fat.

Так вот, тонкость здесь в том, что "сдувается" однажды надутый лок очень редко. Здесь (на данный момент) очень консервативная реализация, которая предполагает, что уж если лок однажды оказался contended -- то он таким и будет дальше, и метаться туда-сюда тут не стоит -- дороже обойдется (изменение типа лока требуют во-первых CAS, во-вторых, частенько, довольно непростой координации всех потоков, сейчас этот лок задействовавших -- это все непросто, и недешево).

Вторая тонкость состоит в том, что "надувшийся" (fat) монитор -- это еще не kernel-space lock. Это свой собственный, JVM-ный, библиотечный лок, который сначала немного спиннится в user-space, и только потом, если спиннингом разрулить contention не удалось -- все-таки идет на поклон к батькеOS thread scheduler-у, прося разрулить ситуевину.

И когда говорят об адаптивном спиннинге в приложении к synchronized мониторам -- имеют в виду именно этот спиннинг внутри "надувшегося" монитора -- последний шанс разрулиться перед прыжком в омут с головойkernel space.

Если посмотреть на эту картину, то получается, что с точки зрения алгоритмов, у RL перед synchronized нет ни одного преимущества -- все, что умеет делать RL умеет и synchronized, но synchronized умеет делать и больше. Но при этом RL проще -- что может оказаться критичным, например, если наш конкретный use case этих сложностей не требует, а попадает на область применимости простейшего uncontended CAS lock-а -- RL может выиграть, например, за счет более простого memory layout-а.

То есть если мы пишем некий компонент, который хотим сделать просто потоковобезопасным (thread-safe) -- то есть мы допускаем его использование в многопоточном окружении, но не рассчитываем на особо большую нагрузку -- то пользуем synchronized и не парим себе мозг. synchronized наиболее адаптивен -- может сам приспособиться к различным профилям нагрузки. Плюс к тому, biased locking + поддержка со стороны JIT-а (выбрасывание блокировок по результатам escape analysis, эскалация блокировок, и прочая белая магия) делает synchronized исключительно эффективным в тех случаях, когда конкретный объект используется в однопоточном сценарии.

Если же мы пишем конкурентный компонент -- то есть заранее предполагаем его использование именно в многопоточном окружении, и под высокой нагрузкой -- то тут уже можно смотреть в сторону ReentrantLock -- весьма возможно, что при стабильно высоком профиле нагрузки он выиграет за счет более простой внутренней логики. Но а) все равно придется тестировать б) если нам нужен совсем-совсем максимум -- то, скорее всего, все равно придется смотреть в сторону самодельных решений на базе атомиков.

Вывод я для себя делаю такой -- что теоретические рассуждения о том, в каких ситуациях RL лучше synchronized -- это очень условная вещь. Теоретически ни один априори не лучше. Только эксперименты спасут мировую революцию.

P.S. Алексей -- спасибо за терпеливые разъяснения :)

Снова о biased locking

Очередной виток истории начался с провокационного поста Мартина Томпсона (это автор Disruptor-а, если чё) -- и его продолжения, написанного после того, как Мартину указали на некоторые мелкие недостатки его бенчмарков :)

Во-первых, поднятый Мартином вопрос "есть ли сейчас смысл в привязанных блокировках" породил переписку Дэйва Дайса с Клиффом Кликом. В этой переписке довольно отчетливо высказана интересная мысль -- что привязанные блокировки скорее всего скоро отойдут в прошлое. Почему? Потому что это, по меткому выражению Клиффа "software solution to a hardware problem (expensive CAS)".

То есть привязка блокировок была введена потому, что ранние версии CAS-ов на мультипроцессорах были гораздо менее эффективны, чем теоретически могли бы -- просто инженерам Интела было не до их оптимизации. Теоретически ничего не мешает реализовать CAS, который, в uncontended случае, будет выполняться со скоростью обычного store. Но пока суд да дело, пока инженеры учились делать процессоры с быстрым CAS-ом -- нужно было сделать какую-то затычку, на программном уровне. И этой затычкой и стали привязанные блокировки -- которые программным трюком сводят формально необходимый для блокировки CAS к обычному store.

Сейчас же Клифф хвастается тем, что на Azul-ах uncontended {CAS + full_fence} (обычный CAS у них не имеет семантики барьера) выполняется за 5-6 тиков. На текущих Nehalem-ах локальный CAS требует 10-15 тиков (против мифических >100, с которых все когда-то начиналось) -- и есть шанс, что будет оптимизироваться дальше. В таких условиях Клифф пишет, что в версиях jdk для Azul они уже отказались вовсе от biased locking -- нет смысла, обычный CAS-lock выигрывает у любой реализации biasing. Для версии своего jdk под x86 они все еще используют biased locks (правда, какую-то свою реализацию -- не такую, как у Sun/Oracle JDK) -- но видимо, это вопрос времени.

С другой же стороны, обновленные бенчи Мартина показывают, что пока что, на Nehalem-ах, с использованием стандартного Oracle JDK, biased locks обходят RL/synchronized где-то в 10 раз. Хотя к бенчам Мартина я теперь отношусь как-то с осторожностью :(

24 ноября 2011 г.

Хорошая архитектура...

...позволяет откладывать принятие ключевых решений. Хорошая архитектура максимизирует количество непринятых решений. (с) Боб Мартин

Прочитал сегодня на хабрахабре, и просто поразился -- насколько это соответствует моему собственному ощущению. Огромное спасибо Бобу, и автору статьи за вербализацию.

Вообще, мне кажется, это применимо не только в архитектуре программных систем, но и в проектировании любых систем вообще -- нужно проектировать системы так, чтобы принятие ключевых решений было отложено до момента, когда для этих решений будет достаточно информации (а на начальной стадии проекта, когда закладывается большая часть дизайна, этой информации, как правило, еще очень мало).

9 ноября 2011 г.

Задачка с stack overflow

Ways to improve performance consistency -- некий Peter Lawrey -- довольно продвинутый чувак, если судить по его блогу -- написал message-passing штукакенцию, отдаленно напоминающую дизруптор -- в том смысле, что используется что-то вроде массива как носитель, и потоки передают друг другу сообщения, извещая друг друга мембарами. Правда, в качестве массива он использует ByteBuffer.allocateDirect(). Смысл этого для меня пока не очень понятен, но такова уж его идея.

Проблема у мужика в том, что результаты бенчмарков у него скачут примерно вдвое-втрое. И он спрашивает сообщество "WTF, gays?".

Причем его более тщательные исследования намекают, что скачки производительности происходят после срабатывания GC. Это как бы намекает, что дело тут в memory layout-е, который может меняться после дефрагментации памяти в ходе сборки мусора.

С одной стороны, ситуация похожа на ту, что я наблюдал в своих экспериментах. С другой -- в его коде пишутся не ссылки, а просто числа (ByteBuffer только их и принимает) -- а для long[] в качестве носителя у меня результаты вполне себе повторяемые, скачков в разы там нет. Это несколько удивляет.

Моя текущая версия происходящего состоит в том, что Питер использует для привязки к ядрам taskset. Но taskset привязывает процесс целиком. Это значит, что никак не фиксируется, как раскиданы по ядрам потоки внутри процесса. GC запускается, скорее всего, в отдельном потоке -- и создает нифиговую пиковую нагрузку -- при этом в момент его запуска собственно бенчмаркирующие потоки приостановлены (stop-the-world). Очень логично, что в этот момент ОС решедулит потоки, так, что ничего не делающие потоки скидываются на одно ядро, а потоку GC, аццки вкалывающему, выделяется ядро в безраздельное пользование. Соответственно, после того, как сборщик мусора успокоился, ОС почему-то не возвращает потоки назад, на отдельные ядра. Тут возникает вопрос "почему", но мне кажется, что это сценарий реалистичный -- обычно такие важные алгоритмы пишутся довольно консервативно, в духе "нет веских причин -- ничего не трогай". Т.е. если нет веских причин, перебрасывать потоки между ядрами шедулер не будет -- противное может привести к тому, что в каких-то ситуациях ему придется постоянно перекидывать потоки, что, понятно, хреново.

Жду от мужика отзыва -- мои сервера сейчас слишком заняты, чтобы я себе мог позволить погонять бенчмарки самому.

UPD: Питер отписался по результатам. Мое предположение оказалось неверным -- потоки не решедулятся (UPD2: на самом деле, они как раз решедулятся -- но это не оказывает существенного влияния на производительность). Причина оказалась (по его словам) в том, что буфер и счетчики в определенных случаях оказывались лежащими последовательно в памяти. Поскольку паддинг у его счетчиков сделан только после используемого поля value, то буфер, идущий в памяти перед счетчиками может легко заползти на их строку (я писал про такой сценарий в статье про false sharing). Вот и получается фигня всякая...

3 ноября 2011 г.

Disruptor #3.1.4.1: Эксперименты с производительностью -- анализ

Что можно сказать по поводу результатов экспериментов? Картина получается довольно непростая. Первый вопрос: какой из результатов Disruptor брать за основной -- при разнице в 3 раза между производительностью на малых и больших размерах буфера. Мое мнение, что за основу стоит брать именно 30 миллионов ops/sec, которые он выдает на большом буфере. 75 миллионов на малом буфере с батчингом кажутся мне артефактом "холостого хода". Другими словами, я почти уверен, что если начать использовать более-менее тяжелые объекты событий, и более-менее тяжелые обработчики, то этот результат радикально изменится. На это намекает и гиганский разброс результатов различных экспериментов, и то, что LMAX в реальном своем продакшене сообщает о размере очереди в 2М, и то, что при увеличении размера буфера результаты с батчингом и без него сходятся к одному значению пропускной способности в "жалкие" 30 миллионов операций.

Но если принять за рабочую гипотезу, что реперная точка Disruptor -- 30 миллионов оп/сек, то получается, что потестировать эффект от батчинга у меня толком не получилось -- разница между Д с батчингом и без него становится артефактом эксперимента на холостом ходу. Это не очень-то и удивительно -- батчинг, предположительно, должен сглаживать неравномерности во входном потоке данных, но ведь у нас-то входной поток искусственный, и никаких заметных неравномерностей не содержит, негде батчингу развернуться. Но дело в том, что батчинг -- это принципиальное отличие Disruptor-а от очередей -- отличие в интерфейсе/контракте, а не в реализации (для того, чтобы допустить пакетную обработку в очереди потребуется изменить ее интерфейс). И именно это принципиальное отличие надежно оценить и не удалось. Пичалька...

Тем не менее, если мы принимаем производительность Д за 30 млн. оп/сек, то самая базовая, "тупая" реализация single enq single deq queue может дать нам примерно треть от этой производительности. Применяя к этой тупой реализации последовательно: 1) пул для уменьшения аллокации объектов 2) оптимизацию spin-loop-ов 3) volatile write -> lazySet 4) остаток от деления -> битовая маска -- мы получаем ускорение чуть больше чем вдвое -- т.е. 2/3 от производительности Д, причем вклад от этих трех оптимизаций сравним.

Строго говоря, эти оптимизации не вполне независимы -- то есть, применив их в другой последовательности мы скорее всего получим другие цифры для эффекта каждой из них. Поэтому оценивать их вклад точнее, чем по порядку величины смысла я не вижу.

Ближе подойти к Д мне не удалось (я не готов ставить пока не особо стабильный jdk1.7 с его -XX:+UseCondCardMark на рабочий сервер), но косвенные эксперименты с заменой ссылок на long показывают, что очень вероятно, что еще 1/3 производительности теряется на false sharing by card marking.

И если это так, то самым "доходным" решением в дизайне Д признается идея применить заранее заполненный буфер. Поскольку это сразу позволило им и избежать аллокации объектов-событий, и нагрузки на GC для их сборки, и избежать false sharing при модификации ссылок. В сумме это дает почти половину разницы в производительности между Д и самой тупой реализацией SESDQueue. Остальные тонкости -- в том числе padding (на который любит напирать Мартин в своем блоге), и кэширование условий в spin-loop-ах, и lazySet (о которых он не упоминает:) ) -- на фоне этого выглядят довольно бледно.

Очень впечатляет рывок, который получается заменой Object[] -> long[] -- более чем втрое подскакивает производительность. Выглядит это так, что запись ссылочных полей стоит весьма недешево, по крайней мере в многопоточном окружении.

Вот такая получилась загогулина. Напоминаю только, что все это про холостой ход -- под нагрузкой результаты могут сильно отличаться, и ключевые факторы в производительности могут стать другими.

2 ноября 2011 г.

Disruptor #3.1.4: Эксперименты с производительностью (продолжение)

Я расчитывал написать продолжение предыдущей статьи уже через пару дней, но эксперименты затянулись...

В предыдущий заход я остановился на том, что оптимизированная single enqueuer single dequeuer очередь на холостом ходу обходит j.u.c.ABQ примерно в 5 раз -- и во столько же раз ее обходит disruptor. Поскольку я не верю в черную магию (тем более -- чужую), приходится ковырять дальше.

Во-первых, по совету мастеров я начал использовать taskset для привязки потоков к ядрам. Потом я еще немного подумал, и решил напрямую звать sched_setaffinity() через JNA для конкретных потоков. Плюс nice -20 чтобы уменьшить шанс вмешательства всяких фоновых процессов -- это несколько уменьшило дрожание и чуть улучшило результаты. Во-вторых я сделал несколько экспериментов на нашем рабочем сервере -- который (Xeon E5620, 2.40GHz)x8, Java: 1.6.0_18, Sun Microsystems Inc, Runtime: 1.6.0_18-b18, VM: 16.0-b13, OS: Linux 2.6.32

Обновленные результаты предыдущих серий такие (указан диапазон для различных значений размера буфера: 1024б-2Мб, проценты -- среднее значение стандартного отклонения):
ABQ: (2.6-2.8 ± 7%)
Optimized SESDQueue: (17-19 ± 20%)
Disruptor:(30-75 ± 30%)
*106ops/sec

Заходим с хвоста

После долгих раздумий я решил зайти с обратной стороны. Основные конкурентные операции в реализации очереди это volatile read/write указателей на голову/хвост очереди. Можно попробовать убрать вообще всю работу с массивом -- размещение/удаление оттуда элементов -- и оставить только механику передвижения курсоров:
public class SESDSequencer {
    //======================================================
    private static final AtomicLongFieldUpdater<SESDSequencer> tailUpdater = AtomicLongFieldUpdater.newUpdater( SESDSequencer.class, "tailCursor" );
    private static final AtomicLongFieldUpdater<SESDSequencer> headUpdater = AtomicLongFieldUpdater.newUpdater( SESDSequencer.class, "headCursor" );
    //======================================================
    private final int length;
    private volatile long headCursor = 0;
    private volatile long p11, p12, p13, p14, p15, p16, p17, p18 = 7;
    private volatile long tailCursor = 0;
    private volatile long p21, p22, p23, p24, p25, p26, p27, p28 = 8;

    private long lastHeadObserved = 0;
    private volatile long p31, p32, p33, p34, p35, p36, p37, p38 = 9;
    private long lastTailObserved = 0;

    public long sumPaddingToPreventOptimisation() {
        return p11 + p12 + p13 + p14 + p15 + p16 + p17 + p18
                + p21 + p22 + p23 + p24 + p25 + p26 + p27 + p28
                + p31 + p32 + p33 + p34 + p35 + p36 + p37 + p38;
    }

    public SESDSequencer( final int length ) {
        checkArgument( length > 0, "length(%s) must be >0", length );
        this.length = length;
        sumPaddingToPreventOptimisation();
    }

    private long nextTail() {
        final long tail = tailCursor;
        waitWhileNotFull( tail );
        return tail + 1;
    }

    private long nextHead() {
        final long head = headCursor;
        waitWhileNotEmpty( head );
        return head + 1;
    }

    private void publishTail( final long newTail ) {
        tailUpdater.lazySet( this, newTail );
    }

    private void publishHead( final long newHead ) {
        headUpdater.lazySet( this, newHead );
    }

    private void waitWhileNotFull( final long tail ) {
        //spin-wait: "while not full"
        final long target = tail - length;
        while ( target == lastHeadObserved ) {
            lastHeadObserved = headCursor;
        }
    }

    private void waitWhileNotEmpty( final long head ) {
        //spin-wait: "while not empty"
        while ( head == lastTailObserved ) {
            lastTailObserved = tailCursor;
        }
    }

    public void moveHead() {
        final long newTail = nextTail();
        publishTail( newTail );
    }

    public void moveTail() {
        final long newHead = nextHead();
        publishHead( newHead );
    }    
}

Эта штука работает в разы быстрее: (142-157± 10%)*106 -- более того, она примерно в два раза быстрее disruptor-а. Ну наконец-то, чудес, похоже, и правда не случается -- мы теперь можем сказать, где собака порыласьчто разница в производительности где-то здесь:
public void enqueue( final T item ) {
        final long newTail = nextTail();
        
        //это те 2 строчки, что отличают 
        //SESDSequencer.moveTail() от SESDQueue.enqueue():
        final int index = index( newTail - 1 );
        elements[index] = item;

        publishTail( newTail );
    }

    public T dequeue() {
        final long newHead = nextHead();

        //а это -- те 2 строчки, что отличают 
        //SESDSequencer.moveHead() от SESDQueue.dequeue():
        final int index = index( newHead - 1 );
        final T item = elements[index];

        publishHead( newHead );
        return item;
    }

Битовые маски

Дальнейшие поиски внутри указанных 4-х строчек приводят к неожиданному результату: замена вычисления индекса через остаток от деления ( sequence % length) на использование битовой маски ( sequence & (length-1) -- соответственно, ограничивая размер буфера степенью двойки) дает около 25% прироста производительности: (19-27± 20%)*106ops/sec.

Кто бы мог подумать? -- а я еще сомневался, стоит ли экономить на такой мелочи как деление. Посыпаю голову пеплом.

GC write barrier

Следующая идея состоит в том, чтобы проверить гипотезу о влиянии GC write barrier-а при записи ссылки. С этой целью я сделал реализацию очереди, которая хранит не ссылки на объекты (LongValueEntry) а просто long -- соответственно, внутри у нее не Object[], а long[]. Результат: (107-114± 10%)*106ops/sec.

Похоже, мы зажали disruptor в вилку, и очень похоже, что предположение о GC write barrier действительно оправдывается.

Деоптимизируем Disruptor

Еще одна идея возникла уже со стороны Disruptor-а -- почему бы не отключить в нем batching? Это несложно: нужно скопировать com.lmax.disruptor.BatchEventProcessor и заменить буквально одну строчку в методе run():
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
    while (nextSequence <= availableSequence)
    {
        event = ringBuffer.get(nextSequence);
        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
        nextSequence++;
    }
    //вот эту строку надо внести в цикл выше ^^
    sequence.set(nextSequence - 1L);
Результат интересный -- на малых размерах очереди (1024-32K) Disruptor без batching сильно тормозится: 28 вместо 70 миллионов ops/sec. На больших же размерах очереди (256К-2М) разница практически отсутствует (особенно учитывая немалую статистическую погрешность примерно в 20%). Можно сформулировать иначе -- Д без батчинга почти не зависит от размера буфера, в то время как с батчингом он сильно (почти в 3 раза) ускоряется на коротких буферах. Пока мне не приходит в голову внятное объяснение этого феномена. Можно заметить, что у Xeon E5620 кэш L1=64K, то есть малые буфера в него полностью влезают, а большие -- полностью не влезают. Этот факт явно что-то символизирует, но вот что именно -- пока не соображу

Выводы

Что у меня пока получается в итоге: 
ABQ: (2.6-2.8 ± 7%)
SESDQueue(+lazySet, +spinning): (17-19 ± 20%)
.............+bitmask: (19-27 ± 20%)
Disruptor (+batching):(30-75 ± 30%)
Disruptor (-batching): (26-30 ± 20%)
LongQueue: (107-114 ± 25%)
Sequencer: (143-157± 15%)
*106ops/sec

Интересно отметить как зависит производительность от размера буфера. Disruptor (+batching) очень сильно замедляется при увеличении буфера -- примерно в 3 раза при переходе от 1024 до 2Мб. Очереди же замедляются существенно меньше -- на 15-20% всего.

Так же меня все еще не удовлетворяет стабильность результатов -- между различными запусками результаты зачастую отличаются в разы. Привязка к ядрам и повышение приоритета процесса где-то раза в полтора уменьшили этот разброс, но он все равно остается очень большим -- стандартное отклонение для некоторых экспериментов (тут особо выделяется как раз Д) достигает 35%, а амплитуда разброса значений -- 45%. Причем заметно, что повторы внутри одного запуска (одной JVM) имеют значительно меньший разброс. Напрашивается вывод, что это не случайная, а систематическая погрешность, каким-то образом зависящая от момента запуска JVM. Но у меня пока кончились идеи "что еще можно сделать, чтобы уменьшить влияние внешних факторов". Мартин что-то еще писал про перенаправление прерываний -- чтобы они не посылались тем ядрам, на которых сидят бенчмарки...


Продолжение (окончание, надеюсь) -- в следующих сериях


P.S. Нужно сказать, что Алексей Шипилев проводил эти тесты на своем оборудовании, и (на JDK1.7, -XX:+UseCondCardMark ) получил у самой оптимизированной из реализаций очередей (с маской вместо остатка от деления, с lazySet, с большим padding-ом в 128 байт, но без оптимизации spin-wait) результаты практически не отличимые от Disruptor. Я у себя не могу повторить его результаты -- на рабочий сервер ставить экспериментальный пакет OpenJDK1.7 я не готов, а порт для мака на моем ноутбуке дает результаты заметно хуже (но надо помнить, что у меня на маке нет taskset, и разброс в этих результатах может быть гораздо выше -- кроме того на персональной машине гораздо сложнее обеспечить должную изоляцию).

24 октября 2011 г.

taskset для бенчмарков

Экспериментировал с бенчмарками с привязыванием потоков к ядрам через taskset. И вот не дает мне покоя мысль: ведь если я привяжу процесс, тестирующий какой-то двухпоточный код к двум конкретным ядрам (taskset -c 1,2 mvn exec:java ...) то с одной стороны мои потоки не будут кочевать по другим ядрам. А с другой -- у JVM кроме моих 2 потоков еще дюжина фоновых запущена, не считая GC -- и они ведь все тоже будут за эти два ядра конкурировать. Хотя у меня еще 16 ядер свободны... Это не считая того, что никто не помешает ведь моим двум потокам периодически просто меняться ядрами :)

Похоже, все-таки надо искать способ назначать thread affinity mask изнутри, для конкретных потоков. Где-то в сети я видел код, делающий это через JNA довольно компактно.

UPD:Бросил все, и написал свою собственную утилиту, с преферансом и куртизанками: ThreadAffinity.java. Утилита использует JNA, работает только на linux -- винды у меня сейчас банально нет под рукой, а на MacOS найти нужный системный вызов мне пока не удалось.

Open Source: Retryer

У меня наконец-то дошли руки выложить что-то из своих работ в ОС. Библиотечка, созданная изначально еще во времена работы над DataGuard-ом, потом допиленная уже на нынешней работе -- Retryer.

Задача, которую она решает: часто бывает так, что какой-то код может не выполниться успешно "без причины", и если такое происходит, нужно просто попытаться выполнить его еще несколько раз -- возможно, с различными паузами между попытками. Чаще всего такое бывает с кодом, который дергает какие-то внешние сервисы -- тут может быть куча временных/случайных причин для сбоев: временная перегрузка сети, временная неработоспособность сети, временная перегрузка сервиса, перезапуск сервиса... Какую-то степень устойчивости к этим факторам предоставляют сами протоколы (например, тот же TCP какие-то сетевые сбои обрабатывает незаметно для пользователя) но этой степени часто недостаточно для целей конкретного приложения.

Вот, собственно, задачу "стучаться пока не откроют" библиотека и выполняет. Что с ней можно делать:
  1. Задать код, который нужно выполнять "пока не получится" (==он завершится без исключений)
  2. Задать стратегию задержки (back off) между отдельными попытками. Есть библиотека простых стратегий (без задержки, фиксированная пауза, линейно растущая пауза, экспоненциально растущая пауза...) + билдер/dsl для их комбинации. Например:
    Backoff
                .withExponentialGrowingDelay()
                .startingWithDelay( 1, TimeUnit.SECONDS )
                .maxTryes( 5 )
                .maxDelay( 10, TimeUnit.SECONDS )
                .build()
    
  3. Задать "фатальные ошибки" -- т.е. при некотором классе ошибок дальнейшее продолжение попыток смысла не имеет. Например, если сервис выбросил исключение ConnectionTimeout -- имеет смысл попробовать еще разок позже, авось сеть заработает. Но если сервис выкинул IncorrectProtocolVersionException, то дальше пробовать смысла нет -- мы явно ломимся куда-то не туда

Пример использования (запрос по URL-у):

private static final String URL_TO_QUERY = "http://google.com/?q=Retryer";

public static String simpleQuery( final String urlString ) throws Exception {
    final URL url = new URL( urlString );
    final InputStream is = url.openStream();
    try {
        final InputStreamReader r = new InputStreamReader( is, "ISO-8859-1" );
        try {
            return CharStreams.toString( r );
        } finally {
            r.close();
        }
    } finally {
        is.close();
    }
}

....

public static String queryRetryableComplex( final String urlQuery ) throws Exception {
    return new Retryer().doRetryable(
            new IRetryableTask<String, Exception>() {
                public String execute( final int tryNo ) throws Exception {
                    return simpleQuery( urlQuery );
                }

                public boolean isFatalReason( final int tryNo,
                                                                     final Throwable reason ) {
                    return (reason instanceof MalformedURLException);
                }
            },
            Backoff
                        .withExponentialGrowingDelay()
                        .startingWithDelay( 1, TimeUnit.SECONDS )
                        .maxTryes( 5 )
                        .maxDelay( 10, TimeUnit.SECONDS )
                        .build()
    );
}

здесь мы будем пытаться запросить гугл с экспоненциально растущей задержкой, начиная с 1 секунды, но не более 10 секунд, и не более 5 раз подряд. При этом мы не будем повторять запрос если он выбросит MalformedURLException (какой смысл-то?).

С удовольствием послушаю всяческую ругань. Обещаю активно и интересно ругаться в ответ :)

11 октября 2011 г.

LMAX Disruptor #3.1: эксперименты с производительностью

Разбирательства с lazySet и прочими тонкостями моделей памяти несколько отвлекли меня от продолжения вивисекции disruptor-а. А ведь сам пристальный интерес к lazySet у меня возник как раз после того, как бенчмарки показали, что если заменить его на обычный .set() -- пропускная способность падает почти вдвое. Так что пока суд да дело -- я напишу об этих самых бенчмарках.

Сначала про задумку. Задумка возникла еще во время первого знакомства с disruptor-ом, и укрепилась в ходе дискуссии с Алексеем о его мнимых и реальных достоинствах. Дело в том, что бенчмарки disruptor-а, которые приводят разработчики, сравнивают его с j.u.c.ArrayBlockingQueue. Это достаточно справедливо в том смысле, что ABQ наиболее подходящий готовый компонент из стандартной библиотеки, которым воспользуется обычный разработчик, буде ему понадобится сконструировать конвейерную многопоточную обработку сообщений. Но ABQ это вовсе не самая производительная (ни в плане пропускной способности, ни в плане задержек) реализация конкурентной очереди. Это достаточно простая и надежная реализация, с приемлемой производительностью -- именно поэтому (как я полагаю) она и является частью стандартной библиотеки. И если уж ставить вопрос "disruptor vs queues" -- как его пытаются поставить разработчики, то надо сравнивать disruptor c сопоставимыми по степени оптимизированности реализациями очередей. Которых -- увы -- в стандартной поставке JDK нет. Но это всего лишь означает, что их надо написать :)

Таким образом и возникла идея этого теста. Вместо тестирования "disruptor vs queues" в благоприятных для него условиях попробуем тестировать в неблагоприятных. Возьмем disruptor в простейшей топологии A->B (один поток -- поставщик, один поток -- обработчик-потребитель). Возьмем простейшую реализацию single enqueuer single dequeuer очереди на основе массива (собственно, это будет очень близко к реализации самого disruptor-а -- по-сути, он и есть такая очередь, только на стероидах). Будем постепенно вносить в эту очередь различные оптимизации, или/и будем постепенно "убирать" различные оптимизации из disruptor-а. Поскольку эти две структуры данных весьма близки изначально, то на каком-то этапе они станут почти неразличимы, и производительность должна оказаться равной. То есть на выходе у нас получится последовательность шагов, каждый из которых на сколько-то улучшает очередь/ухудшает disruptor -- так, что они в конце-концов становятся неразличимы. И можно будет смотреть, какие факторы собственно являются в дизайне disruptor-а ключевыми для производительности, а какие -- второстепенными.

Дисклаймер: я провожу тесты без нагрузки. То есть измеряю пропускную способность в сферическом случае в вакууме -- она может коррелировать с производительностью в боевых условиях, а может и не коррелировать. Меня здесь интересуют именно накладные расходы на холостом ходу -- "стоимость" работы внутренней механики. Просто хотя бы потому, что рабочий ход мне не на чем исследовать -- нет конкретного примера.

Single Enqueuer, Single Dequeuer Queue

Итак, откуда начинаем плясать: вот так выглядит single enqueuer single dequeuer array-based queue:
public class SESDQueue<T> {
    private final int length;
    private volatile long headCursor = 0;
    private volatile long tailCursor = 0;

    private final T[] elements;
    /* Elements range: [headCursor, tailCursor)
    *
    * (tailCursor - headCursor) == elements count
    *
    * 0 <= (tailCursor - headCursor) <= length  => state invariant
    *
    * tailCursor - headCursor == length         => queue is full
    * tailCursor - headCursor == 0              => queue is empty
    *
    * (headCursor % length ) is the index of first item in queue
    * (tailCursor % length ) is the index of _cell_ for _next last item_
    */

    public SESDQueue( final int length ) {
        checkArgument( length > 0, "length(%s) must be >0", length );
        this.length = length;
        elements = ( T[] ) new Object[length];
    }

    private long nextTail() {
        final long tail = tailCursor;
        waitWhileNotFull( tail );

        assert ( tail == tailCursor ) : "tail=" + tail + " <> tailCursor=" + tailCursor + " -- seems like multi-threaded enqueue() detected!";
        return tail + 1;
    }

    private long nextHead() {
        final long head = headCursor;
        waitWhileNotEmpty( head );
        assert ( head == headCursor ) : "head=" + head + " <> headCursor=" + headCursor + " -- seems like multi-threaded dequeue() detected!";
        return head + 1;
    }


    private void publishTail( final long newTail ) {
        assert ( newTail >= headCursor ) : "newTail(" + newTail + ") < headCursor(" + headCursor + ")";
        tailCursor = newTail;
    }

    private void publishHead( final long newHead ) {
        assert ( newHead <= tailCursor ) : "newHead(" + newHead + ") > tailCursor(" + tailCursor + ")";
        headCursor = newHead;
    }


    private void waitWhileNotFull( final long tail ) {
        //spin-wait: "while not full"
        while ( tail == headCursor + length ) {
        }
    }

    private void waitWhileNotEmpty( final long head ) {
        //spin-wait: "while not empty"
        while ( head == tailCursor ) {
        }
    }

    public void enqueue( final T item ) {
        final long newTail = nextTail();

        final int index = ( int ) ( ( newTail - 1 ) % length );
        elements[index] = item;

        publishTail( newTail );
    }

    public T dequeue() {
        final long newHead = nextHead();

        final int index = ( int ) ( ( newHead - 1 ) % length );
        final T item = elements[index];
        elements[index] = null;//for GC

        publishHead( newHead );
        return item;
    }

    public int size() {
        return ( int ) ( tailCursor - headCursor );
    }
}
В ней нет ни одного CAS, только volatile load/stores - это ее плюс. Писать/читать ее может только по одному потоку -- это ее минус, в данном случае несущественный, так как disruptor тоже в своем наилучшем варианте single publishing, и принципиально single subsriber.

Схема бенчей


Один поток пишет в "очередь" 300 миллионов записей (для начала -- LongValueEntry), другой поток их из "очереди" вытаскивает. Время выполнения определяется по моменту вытаскивания последней записи из "очереди".

Цикл выполняется 5 раз подряд в одном запуске -- 1-2 раз обычно выбрасывается, чтобы не учитывать разогрев JVM. Запусков обычно 2-3 -- сколько достаточно для более-менее уверенной статистики.

В бенчмарках присутствует так же ArrayBlockingQueue -- в качестве реперной точки.

Код здесь.

Поехали...


Система: MBP, i7 2GHz, MacOS X 10.6.8,
JDK 1.6.0_26-b03-377, "-da -server -Xmx512m -Xms512"

Емкость очереди -- 1024 элемента.

Базовые результаты (ops/sec == количество сообщений, прошедших "очередь" за секунду).
ABQ SESD Disruptor
(2.65+/-0.05)e6 (9.0+/-0.3)e6 (65+/-3)e6

Нифиговое такое различие, а? Меня, правда, немного напрягает, что разница в скорости между ABQ и Disruptor составляет 25 раз, вместо 5-6 в их собственных тестах...

Ок, начинаем оптимизацию. Шаг №1: первое, что приходит в голову -- аллокация. Disruptor использует однажды преаллоцированные объекты, обе наши очереди -- создают 300 миллионов временных объектов. Проверим какое влияние это оказывает -- сделаем пул объектов. Задача упрощается тем, что пул имеет заранее известную емкость (==емкости очереди), плюс он не должен быть потоковобезопасным, потому что мы знаем, что у нас только один поток-производитель:
class EventPool<T> implements EventFactory<T> {
        private final EventFactory<T> factory;
        private final Object[] items;

        private int counter = 0;

        private EventPool( final int size,
                           final EventFactory<T> factory ) {
            this.factory = factory;
            this.items = new Object[size];
            for ( int i = 0; i < items.length; i++ ) {
                items[i] = factory.newInstance();
            }
        }

        @Override
        @SuppressWarnings( "unchecked" )
        public T newInstance() {
            try {
                return ( T ) items[counter];
            } finally {
                counter = ( counter + 1 ) % items.length;
            }
        }        
}

ABQ SESD Disruptor
(2.70+/-0.04)e6 (10.7+/-0.4)e6 (65+/-3)e6

Различие не то, чтобы особо радикальное, но процентов 15 наша SESD очередь выиграла.

Шаг №2 -- добавляем padding, чтобы избежать false sharing:
private volatile long headCursor = 0;
    
    public volatile long p11, p12, p13, p14, p15, p16, p17, p18 = 7;
    
    private volatile long tailCursor = 0;
    
    public volatile long p21, p22, p23, p24, p25, p26, p27, p28 = 8;

    public long sumPaddingToPreventOptimisation() {
        return p11 + p12 + p13 + p14 + p15 + p16 + p17 + p18
                + p21 + p22 + p23 + p24 + p25 + p26 + p27 + p28;
    }
результат SESDQueue: (10.8+/-0.2)e6 -- какие-то крохи, даже говорить стыдно. Даже как-то странно...


Шаг №3 -- оптимизируем spin-loop-ы. Опять же, подглядывая в код disruptor-а можно заметить такой прием:
private void waitWhileNotEmpty( final long head ) {
        //spin-wait: "while not empty"
        while ( head == tailCursor ) {
        }
    }
Зачем нам здесь каждый раз читать tailCursor (volatile load), когда единожды обнаружив, что он на сколько-то больше head мы можем просто запомнить его текущее значение в локальной для потока переменной, и пересчитывать ее значение только когда head будет снова ему равно:
//тут опять же будет padding...
    private long lastTailObserved = 0;
    //и тут --- тоже...
    private void waitWhileNotEmpty( final long head ) {
        while ( head == lastTailObserved ) {
            lastTailObserved = tailCursor;
        }
    }

Результат: (12.9+/-0.1)e6 (== +20%).

Шаг №3 -- используем lazySet. Опять же подсмотренно в классе Sequence из disruptor -- похоже, что операции обновления курсоров здесь не требуют всей строгости семантики volatile store -- достаточно LoadStore+StoreStore барьеров перед ними -- то есть гарантии, что все, что произошло до обновления курсора в program order будет видимо остальным потокам до того, как станет видимым это обновление. Для этого не нужно volatile store, достаточно lazySet:

Результат: (13.9+/-0.1)e6 (== +10%).

В общем-то, прямолинейные оптимизации закончились -- а мы все еще отстаем от disruptor в 5 раз. Продолжение -- в следующих сериях :)



7 октября 2011 г.

AtomicXXX.lazySet() -- разъяснения

По совету друга я вознес мольбу Господу о ниспослании мудрости, и Господь послал мне этого. Немного. Зато несколько раз -- с первого раза я не понял. Произошло это посредством листа рассылки concurrency-interest -- ну, нечего придираться, пути господни неисповедимы. Начало треда я в архивах не нашел почему-то, так что могу дать ссылку только на продолжение: вот.

Вкратце:

Что касается спецификации:
lazySet был введен после последней редакции JMM, поэтому в JMM нет описания такого отношения порядка, который он порождает. Т.е. по сути javadoc специфицирующий lazySet можно рассматривать как расширение JMM. Аналогичным свойством обладает weakCompareAndSet -- его ordering constraints тоже не формализуются в текущей редакции JMM. Идет обсуждение того, как включить такие вещи в очередную редакцию JMM, но пока в попутчиках согласья нет, и вряд ли очередная редакция JMM выйдет в ближайшее время. Вообще варианты более "тонких" мембаров, частным случаем которых является lazySet (== Fences.orderWrites + store) планируется внедрять в специальный класс Fences, соответственно все обсуждение подобных вещей обычно сосредоточено вокруг него. И черновики будущей спецификации можно посмотреть в его class-javadoc. Я там пока нифига не понял, но попытки буду продолжать.

Что касается текущего состояния:

Да, то, что написано в lazySet javadoc действительно означает, что все записи, произведенные до lazySet() гарантированно станут видимы до того, как станет видимым запись lazySet(). При этом чтения, идущие до lazySet() в program order вовсе не обязаны реально происходить до него -- они могут быть переупорядочены и отложены, и завершиться уже после завершения lazySet() -- в этом, собственно, отличие от обычного volatile store.

При этом момент выполнения записи действительно не определен -- "когда нибудь". Означает это то, что запись может быть отложена процессором или компилятором хоть навечно. На практике это вряд ли произойдет, если поток, выполнивший lazySet продолжает что-то писать -- и у компиляторов и у процессора ограничен размер буфера отложенных операций, так что когда-нибудь запись все-таки будет выполнена.

На самом деле я здесь не вижу существенной разницы с обычным volatile store -- нигде в JMM не прописано, что volatile store должен завершиться немедленно. JMM задает лишь ограничения на то, что все записи и чтения идущие до vstore в program order действительно будут исполнены и их результаты видимы до того, как будет исполнена и видима vstore.

Что касается текущей реализации:

Хотя формально запись может быть отложена на сколько угодно, фактически все существующие JVM реализуют lazySet() консервативно -- выполняют запись немедленно. То есть запись может быть отложена уже только процессором, задержавшись в его store buffer.

Таким образом, использование lazySet в disruptor, похоже, в целом легально, и ускорение, им создаваемое -- вполне объяснимо.

P.S. В процессе разбирательств выяснилось, что некоторые вещи в JMM я до сих пор понимаю больше интуитивно. Кажется, в ближайшее время у меня будут посты на эту тему

28 сентября 2011 г.

AtomicXXX.lazySet() strikes back

Наткнулся тут на любопытную вещь -- я всегда был уверен, что в JMM все поведение всегда можно описать с помощью несложного набора правил построения ребер happens-before. Ну, то есть, я не говорю, что применение этих правил всегда просто -- законы Ньютона тоже не очень сложные, а какой мрак и ужас из них получается, скажем, в небесной механике и движении твердого тела -- но сами правила довольно просты, и применимы к любому java-коду. Но похоже это не совсем так. Этот самый AtomicXXX.lazySet() мутит воду.

Простое объяснение -- неволатильная запись в волатильную переменную -- меня только запутывает. До этого момента в джаве, вообще-то, не было волатильной/не волатильной записи в отрыве от типа поля (ну, если не трогать сатанинский Unsafe). То есть если поле волатильное -- то все операции с ним волатильные, поле не волатильное -- все операции не волатильные.

А теперь получается такая штука, что я могу записать значение в переменную не волатильно, а потом считать его волатильно. И что за отношение порядка будет у меня между этими операциями, если они в разных потоках?

Это не праздный интерес. В коде Disruptor-а встречается такая идиома:

AtomicLong sequence = new AtomicLong();
...
//поток 1: издатель
...write some values into shared objects...
sequence.lazySet(seq);

//поток 2: обработчик
while( sequence.get() != seq ){
   //spin wait
}
...read shared objects fields

По смыслу кода понятно, что автор его уверен -- последовательность sequence.lazySet()/get() обеспечивает завершение всех операций записи, идущих до нее в program order, и видимость результатов этих операций из второго потока. Другими словами -- это практически happens-before edge. Но вот с чего он так в этом уверен -- для меня совершенно не очевидно.

Как минимум, документация lazySet() не дает никаких гарантий относительно того когда запись будет реально выполнена. Eventually -- когда-нибудь. То есть возможно, что и вообще никогда. И непонятно, в какой момент запись -- если она завершится -- будет увидена вторым потоком.

А с другой стороны -- я сейчас гоняю бенчмарки, разбирая по частям disruptor и вытаскивая какие компоненты ключевые для его производительности. И вот оказывается, что если вместо lazySet использовать обычный set() -- скорость падает в 2 раза примерно (это без нагрузки -- на холостом ходу). То есть смысл определенно есть -- но вот надежен ли этот способ оптимизации -- мне как-то неочевидно.

Задал вопрос на stackoverflow -- посмотрим, может там понимают

16 сентября 2011 г.

LMAX Disruptor #2: Реализация

Общую идею, которая стоит за дизайном disruptor я описывал в предыдущей статье, теперь хочу разобрать непосредственно реализацию. Итак, из чего построен disruptor?


Компоненты


Event -- объект-сообщение, который пересылается от производителя ко всем обработчикам через disruptor. Обычно событие создается-пересылается-обрабатывается-выкидывается, а здесь же все события создаются заранее, при инициализации RingBuffer (для этого есть интерфейс фабрики событий EventFactory), и живут, как минимум, столько же, сколько и сам буфер, а все, кто работает с RingBuffer только обновляют поля уже существующих событий (и поэтому с точки зрения реализации было бы точнее называть их Entry, и так оно и было в старых версиях -- но теперь авторы стараются быть ближе к пользователям). Ранее события должны были реализовывать AbstractEvent, но последней версии (2.5) было решено от этого отказаться, и теперь в буфер можно помещать совершенно произвольные объекты -- фреймворк больше ничего конкретного от них не хочет.

Sequence По смыслу это что-то вроде volatile long с некоторыми дополнительными оптимизациями -- например, защитой от false sharing и ослабленной семантикой set(). Объекты этого класса используются для нумерации последовательности событий.

Вообще сама концепция последовательности (Sequence) довольно фундаментальна для disruptor. По-сути, смысл ее в том, что все события в рамках конкретного экземпляра disruptor имеют глобальную логическую нумерацию. Логическую в том смысле, что физически-то экземпляры события переиспользуются -- как я уже писал, их сразу создается ровно столько, сколько нужно чтобы заполнить кольцевой буфер. Т.о. один и тот же физический экземпляр события в разные моменты времени может иметь разный логический номер. Нумерация идет с момента старта конкретного экземпляра disruptor-a, и покуда не закончится long (а он закончится весьма нескоро: даже при обработке 50 млн событий в секунду его хватит на сотни лет -- для практических целей можно считать его бесконечным).

Зная размер буфера этот порядковый номер элементарно преобразовать в индекс в массиве. С другой стороны, покуда речь не заходит об отображении номера на конкретный экземпляр события, можно оперировать только самой последовательностью, и это довольно удобно. Любые два события всегда можно упорядочить -- т.е. поток событий не только сериализуем, но эта сериализация еще и задана в явном виде. Логически мы получаем (почти)бесконечную, однозначно упорядоченную последовательность событий. Внутренняя же логика disruptor-а следит за тем, чтобы физически в каждый момент времени было задействовано не более bufferSize событий.

RingBuffer -- сам кольцевой буфер.

Во-первых, это сам массив записей (сообщений) с методами доступа. Размер массива должен быть степенью двойки, потому что авторы очень хотят иметь возможность вычислять остаток от деления через битовую маску. (Лично мне это стремление не очень понятно -- я где-то со времен P2 был уверен, что экономить на таких мелочах уже бессмысленно).

Во-вторых, здесь сосредоточена та самая внутренняя логика, которая позволяет иметь логически бесконечную последовательность событий, аккуратно отображенную на ограниченный буфер -- логика управления последовательностью. Здесь три части:
  1. Текущий курсор. Это логический номер последнего опубликованного сообщения. Сообщения с номерами (cursor-bufferSize, cursor] доступны для дальнейшей обработки (это не совсем точно, есть еще выделенные но не опубликованные сообщения, но про это далее)
  2. Список курсоров всех обработчиков событий. С его помощью мы следим, чтобы не переиспользовать еще кем-то не обработанную ячейку -- а именно, мы не можем опережать последний (минимальный) из этих курсоров более, чем на bufferSize.
  3. Стратегия ожидания -- WaitStrategy -- каким способом мы будем ожидать, если кончились доступные нам события (либо производитель не успел еще опубликовать новые, либо наоборот, обработчики еще не успели обработать все опубликованное). Доступны BusySpin/Yelding/Sleeping/Blocking реализации.
  4. Стратегия выделения номеров -- ClaimStrategy. Это, пожалуй, самая сложная часть, разберу ее далее.

SequenceBarrier -- это объект, инкапсулирующий в себе набор курсоров (Sequence), которые нам не нужно опрежать. Т.е. если наш обработчик зависим от обработчиков №1 и №2, то у курсора нашего обработчика (последовательности сообщений, им обрабатываемой) будет барьер сверху из курсоров обработчиков №1 и №2. И прежде , чем перейти к обработке следующего события (сдвинуть наш курсор на следующее событие) мы должны дождаться, пока курсоры, от которых мы зависим, уйдут вверх. Вот именно эта операция не лезть поперек батьки "убедиться, что наши предшественники ушли вперед" и является основной функциональностью SequenceBarrier-а. Поскольку ему иногда придется подождать, пока предшественники в самом деле уйдут, внутри него так же лежит стратегия ожидания WaitStrategy. Барьер создается RingBuffer-ом, так что его WaitStrategy та же самая, что использует и сам RingBuffer (вообще, WaitStrategy задается однажды при создании RingBuffer-а, и она используется в большинстве случаях, когда кому-то кого-то приходится ждать).


EventHandler -- это, собственно, интерфейс для обработчика событий, и ничего более.

EventProcessor -- это хелпер, который отвечает за пересылку событий из RingBuffer в EventHandler. Он реализует Runnable, и его нужно засовывать либо в Executor, либо явно в Thread.


Как это работает?



Начнем с производителя

Публикация сообщения в буфер disruptor-а происходит в три этапа: сначала мы резервируем (claim) под себя слот (а еще точнее -- номер события), в который собираемся что-то опубликовать, потом записываем в слот данные, и подтверждаем публикацию (publish). Авторы предпочитают говорить о двух этапах, намекая на то, что запись значений в слот -- внутреннее дело производителя. Пример кода с вики:
// Publishers claim events in sequence
final long sequence = ringBuffer.next();
final ValueEvent event = ringBuffer.get(sequence);

event.setValue(1234); // this could be more complex with multiple fields

// make the event available to EventProcessors
ringBuffer.publish(sequence);   

Зачем нужна такая хитрость? Ну, это обратная сторона решения использовать заранее преаллоцированные объекты событий. Т.е. если рассуждать отвлеченно, для публикации события нужно а) выделить место в памяти куда будем класть содержание б) положить содержание в) сгенерировать номер события г) сделать событие доступным для обработчиков. В случае использования, например, очереди (ABQ), выделение памяти и наполнение обычно производятся где-то в стороне (в клиентском коде), а генерация номера события и публикация объединены -- событие доступно с момента помещения его в очередь, и в этот же момент определяется его порядок относительно других событий. Поэтому в случае с очередью эти 4 этапа как бы не заметны -- первые два вообще не имеют отношения к очереди, а последние 2 объединены.

В случае disruptor ситуация иная: выделение места под данные производится через сам RingBuffer (который, в этом сценарии, выступает в роли пула/кэша/аллокатора объектов сообщений), при этом, в силу дизайна самого disruptor-а номер события однозначно определяет объект события (как buffer[sequence % bufferSize]), мы не можем выделить событие не задав его номер -- поэтому выделение и генерация номера в disruptor-е объединены в фазу 1. С другой стороны, чтобы сделать событие доступным (опубликовать его, фаза "г") нам нужно во-первых, подвинуть курсор RingBuffer-а, во-вторых -- нужен мембар, гарантирующий что все записи в поля сообщения были реально выполнены. Оба эти элемента получаются автоматом за счет volatile write при обновлении курсора.

Более подробно: функциональность выделения объектов событий, и их последующей публикации -- это как раз и есть ответственность ClaimStrategy. Реально, конечно, стратегия с объектами событий не работает -- она работает только с их номерами. Стратегия заявок держит внутри себя свой собственный курсор, указывающий какие номера событий уже выделены. При заявке на очередной номер стратегия делает внутри себя incrementAndGet(), проверяет, что объект с данным номером свободен -- т.е. все обработчики с предыдущего круга его уже обработали (если это не так, мы будем ждать -- ClaimStrategy не использует общую WaitStrategy, здесь, например используется spin->yield->sleep(1)) -- и выдает получившийся номер клиенту.

Коммит (публикация) несколько сложнее -- здесь ответственность за разные фазы поделена между ClaimStrategy и самим RingBuffer. А именно: ClaimStrategy отвечает за порядок публикации сообщений -- они должны публиковаться ровно в том порядке, в котором они пронумерованы. Т.е. если сообщение №3 уже готово к публикации, но сообщение №2 еще нет -- мы будем ждать (by spin-and-yield), пока сообщение №2 таки не опубликуется, и только потом сможем перейти к публикации нашего №3. После того, как ClaimStrategy убедилась, что порядок в порядке -- RingBuffer просто обновляет свой курсор (здесь даже CAS не нужен, достаточно volatile write -- потому что мы можем быть уверены в текущем значении курсора by design). Это обновление, по совместительству, обеспечивает мембар, необходимый для публикации изменений в самом объекте сообщения (чуть более строго, по JMM: видимость изменений в полях объекта обеспечивается ребром HB между VW(cursor) при публикации, и VR(cursor) который делает каждый обработчик в ходе своего spin-wait-а). Это разделение ответственности за коммит на две части мне не очень понятно -- по-моему, было бы логичнее чтобы всю ответственность взяла на себя ClaimStrategy

Разумеется, вся эта машинерия сильно упрощается, если издатель только один -- тогда все запросы идут из одного потока, и ClaimStrategy может быть сильно проще. Разработчики это предусмотрели, и существует SingleThreadedStrategy, которая этим преимуществом пользуется

Чисто технически, логика работы с последовательностями (выделение, публикация) вынесена в базовый класс Sequencer, а RingBuffer наследует Sequencer, и добавляет функциональность хранения самих объектов сообщений, и отображение sequence -> event. Сделано это для того, чтобы можно было легко реализовать свою собственную логику хранения сообщений. Например, если сообщения у нас это просто некий ID команды -- мы можем использовать для его хранения long[] без необходимости boxing/unboxing в/из Long

Обработчики

Ок, сообщение мы опубликовали. Что насчет обработчиков?

За пересылку сообщений из RingBuffer к соответствующим EventHandler-ам отвечает реализация интерфейса EventProcessor. Основная рабочая реализация BatchEventProcessor.
BatchEventProcessor держит внутри себя курсор (Sequence) на последнее обработанное сообщение и SequenceBarrier, задающий список курсоров, от которых зависит наш курсор (==список производителей/обработчиков, которых мы должны пропустить вперед нас, таким образом задаются взаимозависимостями между разными обработчиками).

Основной цикл работы BatchEventProcessor-а состоит в том, что он ждет на SequenceBarrier доступности события, следующего за последним обработанным (как я уже писал, SequenceBarrier как раз и позволяет дожидаться, пока событие с таким-то номером будет обработано всеми впереди идущими == SequenceBarrier.waitFor(cursor+1)). Дождавшись этого, он проверяет, какие события нам на самом деле доступны (гранулярность ожидания может оказаться слишком большой, и реально окажется доступен не только cursor+1, но и, скажем, вплоть до cursor+142), и в цикле все их пересылает в EventHandler. После чего -- один раз -- обновляет курсор, сдвигая его на количество обработанных сообщений (это и есть местный batching). Ну и плюс здесь еще всякая вспомогательная машинерия, типа обработки исключений и остановки обработчика.

Эпилог

Это, собственно все. Несмотря на кажущуюся сложность, идейно все довольно-таки просто. По всему коду повторяется один и тот же шаблон: есть курсор на последнее обработанное (в том или ином смысле) сообщение, есть список курсоров, которые ни в коем случае нельзя опережать. Мы сидим в spin-wait-е на списке "необгоняемых курсоров" (возможно, делая back off через yield/sleep(1)/park), и ждем пока минимальный из них не обгонит нас. Как только он это делает -- мы проверяем насколько именно он нас обогнал, и все эти сообщения -- в нашем распоряжении. Благодаря тому, что курсор везде volatile, пара операций "обновление значения курсора -- чтение нового значения курсора в ходе spin-wait" создает ребро HB, а значит все изменения, сделанные потоком, обновившим курсор до этого обновления будут гарантированно видны всем, прочитавшим новое значение курсора после этой самой операции чтения.

Единственное место, где требуется здесь CAS -- это выделение сообщений из пула для случая многих потоков-поставщиков -- но здесь и нельзя ничего сделать, потому что это в чистом виде consensus problem, которая (для потенциально неограниченного числа участников) обычными volatile write/read не решается.

Все остальное -- это оптимизационные тонкости, где какую стратегию ожидания с каким backoff использовать, где какой padding создать.

Интересно, что в disruptor производитель ничем принципиально от обработчика не отличается. В самом деле, обработчики же вполне свободны модифицировать сообщения, так что можно сделать "производителя" в виде заглушки
while(!Thread.currentThread().interrupted()){
  // Publisher claim events in sequence
  final long sequence = ringBuffer.next();
  // make the event available to EventProcessors
  ringBuffer.publish(sequence);   
}
Такой производитель (он будет однопоточным, кстати) большую часть времени будет проводить в ожидании, чтобы не "съесть свой собственный хвост" -- зато все сообщения будут доступны для обработчиков, следующих за ним. Почему так не сделано? Потому что это не позволяет эффективно использовать batching для производителей:
while(!Thread.currentThread().interrupted()){
  final byte[] message = network.receiveEvents(...);
  final int eventsCount = findEventsCountIn(message);
  // Publisher claim events in batch 
  final SequenceBatch sequenceBatch = new SequenceBatch(eventsCount);
  ringBuffer.next(sequenceBatch);
  //parse and copy event content from message to
  for( int i=sequenceBatch.getStart();i<=sequenceBatch.getEnd();i++){
    final Event event = ringBuffer.get(i);
    //copy i-th event content from message to event object
    ...
  }

  // make the events available to EventProcessors
  ringBuffer.publish(sequenceBatch);   
}
То есть размер "пачки" для публикации зависит от текущей ситуации на рынке в сети -- от текущего уровня нагрузки. Если мы реализуем производителя как обработчика -- мы вынуждены публиковать сообщения по одному -- каждый раз неся накладные расходы на volatile read/write (реально, скорее, на cache coherence traffic вызванный выполнением соответствующих барьеров). Если же мы подбираем размер пачки под текущую нагрузку -- мы амортизируем накладные расходы на передачу данных от ядра к ядру на количество этих сообщений, тем самым мы лучше адаптируемся к изменениям в трафике, сохраняя задержку обработки сообщений на минимальном уровне.

14 сентября 2011 г.

False sharing

Размышляю на досуге про false sharing в его простейшем смысле -- про данные, принадлежащие одной строке кэша. Обычно предлагается с этим бороться введением полей (padding) -- добить объект фиктивными полями типа, скажем, long до обычного размера строки в 64 байта. Проблем с этим методом -- не оберешься. Даже если забыть на минутку про то, насколько такая вещь как padding (тем более -- "в рукопашную") не вписывается в концепцию java как высокоуровневого языка, остается еще масса вопросов:

Переносимость: cache-line имеет разный размер на разных процессорах -- от 32 до 128 байт. Можно, конечно, закладываться на максимальный размер, но тратить 128 байт на каждую конкурентную переменную может оказаться многовато. Дело не в том, что это много само по себе -- память нынче дешевая. Дело в том, что это же априори горячие в кэше данные. Т.е. это не 128 байт из основной памяти, которой десятки гигабайт, это 128 байт из кэша первого уровня -- который все еще весьма невелик. Но даже если и так -- кто гарантирует, что завтра не появится процессор со строкой 256 байт? Если приблизиться к реалиям -- по факту везде, где я встречал padding он сделан из расчета на строку в 64 байта. То есть даже на существующих процессорах со строкой кэша длиннее этой -- производительность резко просядет. А ведь я еще нигде не видел, чтобы размер полей можно было хоть как-то "настраивать" -- да и сделать это довольно непросто.

Надежность: padding нынче делается просто набором неиспользуемых примитивных полей. Еще когда я впервые увидел этот трюк где-то в недрах JDK у меня возник вопрос -- а почему собственно JIT не выкинет эти поля нафиг? Ведь даже моя IDE очевидно мне подчеркивала их как неиспользуемые, уж JIT-то должен был бы это увидеть. И если он на данный момент этого не делает -- что будет, если в очередном обновлении он таки научится? Собственно, похоже это произошло: Martin Thompson, один из авторов disruptor уже жалуется, что используемый ими ранее простой padding в очередном обновлении Java 7 работать перестал. Чтобы таки надурить компилятор им пришлось наследовать PaddedAtomicLong от AtomicLong, и делать padding через 6 штук public volatile long (Кстати, почему их 6 а не 7 -- для меня загадка. Могу только предположить, что Мартин заложился на то, что еще 8 байт уйдет на object header. Надо ли говорить, что закладываться на такие детали внутренней реализации JVM не очень надежно?). Пока что это работает -- непонятно только, насколько долго будет.

Или вот такой вопрос -- ну хорошо, добили мы свой объект полями до размера строки. Но ведь мы не можем (из java) гарантировать, по какому адресу будет расположен наш объект. Нам ведь никто не гарантирует, что в памяти наш объект будет выравнен по границе строк. Реально он может оказаться половиной на одной строке, половиной -- на второй. Да, мы можем быть уверены, что если у нас есть два точно таких же объекта, то их одинаковые поля (в предположении что они всегда идут в одном и том же порядке) точно на одной строке не окажутся. То есть, если взять этот самый PaddedAtomicLong -- покуда мы реально используем только "первый" из составляющих его 7 long-ов , мы можем быть уверены, что false sharing это не про нас. Но если бы мы хотели использовать уже 2 его long-а ("первый" и "второй") -- отсутствие false sharing уже не гарантируется, поскольку "второй" может оказаться уже на следующей строке, и на этой же строке может уместиться начало уже другого PaddedAtomicLong, и наш "второй" будет конфликтовать с его "первым". Поэтому, если мы в самом деле хотим избежать false sharing здесь, нам нужен padding с обеих сторон -- и до "защищаемых" нами полей, и после них, размером заведомо больше половины строки в каждом случае.

Но даже это еще не решает проблемы окончательно, потому что у нас ведь в приложении не только одни PaddedAtomicLong-и существуют. Если мы возьмем, для примера, тот же disruptor -- есть еще и объекты сообщений, которые тоже модифицируются (потенциально) многими потоками. И вполне может так случиться, что на одной строке кэша окажется кусок от MyEvent, и начало от PaddedAtomicLong, как раз где самое важное для нас поле value и лежит. И все усилия Мартина и команды по мегаоптимизации своего dsiruptor-а пойдут прахом моментально и внезапно.

...А самое поганое здесь, на мой взгляд, это то, что сам-то disruptor пишет уже упомянутый Мартин Томпсон, который и знает много, и может себе позволить потратить недели на тонкую оптимизацию и выяснение причин каких-либо тормозов. А вот MyEvent будет создавать обычный прикладной программист, у которого скорее всего ни квалификации такой нет, ни времени на исследования. И вероятность того, что этот программист будет свои MyEvent защищать от false sharing мала.

...Но даже если он и будет это делать -- он ведь еще должен это делать таким же способом, как Мартин. Про что я говорю -- я говорю про то, что если, скажем, наш гипотетический программист Вася будет защищать свои MyEvent от false sharing с помощью двустороннего padding-а размером чуть больше полстроки (как я описывал выше) -- то все равно он не избежит кары небесной, ибо Мартин свои объекты защищает односторонним padding-ом. Легко понять, что в этом случае на одной строке не могут оказаться используемые поля от двух разных MyEvent, точно так же как и от двух разных PaddedAtomicLong, но могут оказаться используемые поля от одного MyEvent и другого PaddedAtomicLong. По-сути получается, что страдает инкапсуляция.

Получается, что либо надо рассчитывать на самый наихудший случай -- т.е. с обеих сторон защищать каждый свой объект padding-ом размером в строку, причем максимальную строку (128 байт). Либо мириться с тем, что отлаженная и заоптимизированная тобой структура данных может легко "лечь" из-за сложнопредсказуемой интерференции с совершенно посторонним кодом.

Чего бы хотелось, чтобы жизнь была безоблачной и радостной? Очевидно, хочется всю эту магию возложить на виртуальную машину -- которая, как минимум, знает размер строки, и имеет возможность управлять расположением объектов в памяти.

Идеальный вариант, разумеется, чтобы JIT сам определял, какие объекты испытывают наибольшее давление false sharing, и прямо в рантайме перемещал их в памяти так, чтобы это давление уменьшить. Мне этот вариант не очень нравится просто потому, что непонятно, за что тогда будут платить деньги мне вряд ли это будет реализовано скоро, и еще более вряд ли скоро и качественно.

Более реальный вариант: хочется стандартную аннотацию типа @PreventFalseSharing (хотелось бы для нее какое-нибудь название, больше отражающее идею, нежели реализацию), особым образом интерпретируемую JIT-ом -- он для таких объектов использует отдельный аллокатор, как минимум размещающий их строго по границам строк, и не позволяющий никому более использовать место, остающееся до целого количества строк (хотя это уже и не обязательно -- первой части уже будет достаточно, чтобы такие объекты гарантированно не конфликтовали хотя бы между собой). Сюда же можно добавить и включение для ссылочных полей таких объектов оптимизацию write barrier-а в виде test-and-mark вместо только mark. Ну и так далее -- думаю, там еще много чего можно добавить.

Причем на первое время такую штуку можно сделать вообще отдельно от Oracle -- сделать byte-code preprocessor, который при загрузке классов ищет соответствующую аннотацию, и модифицирует байт-код соответствующих классов, добавляя фиктивные поля. Можно даже сделать автоматическое определение размера строки кэша -- через нативный вызов, например. Потом можно прикрутить обработку к javac -- опять же под worst-case, но хотя бы исходный код не будет загрязнен всякими непонятными полями. И уж на последнем этапе можно перевести обработку на уровень JVM/JIT.

Вот такие вот эротические фантазии меня последнее время посещают.

UPD: По сообщениям из заслуживающих доверия источников, в казематах оракла суровыми инженерами сана, там заточенными, обсуждается аннотация @Contended, реализующая что-то вроде варианта 2. Так что не пройдет и 10 лет, как, возможно, мы узрим свет в окошке...