29 апреля 2011 г.

Without locks

Последние несколько недель в голове вертится идея об организации межпоточного взаимодействия без блокировок. Идея эта звучит примерно так "иногда проще сделать что-то самому, чем организовывать протокол для передачи результата от коллеги". На практике это про то, что иногда лучшая синхронизация -- отсутствие таковой. Вместо того, чтобы строго гарантировать, что результат выполнения какой-то задачи будет получен только однажды, только одним потоком, а остальные будут его использовать, мы можем построить систему, где будет более мягкое утверждение -- "если результат уже посчитан одним потоком, и успел стать видимым другому -- он будет использован, если не посчитан, или не успел стать видимым -- значит посчитаем его второй раз, не надорвемся"

Самый наглядный пример реализации -- ленивый кэш. Вот, например, простейший вариант -- кэшируем только результат последнего вызова:
public class SingleLRUWaitFreeCache<In, Out> implements ICache<In, Out> {
    private final Function<In, Out> function;

    private transient Entry<In, Out> recentUsed = null;

    public static <In, Out> SingleLRUWaitFreeCache<In, Out> create( final Function<In, Out> f ) {
        return new SingleLRUWaitFreeCache<In, Out>( f );
    }

    public SingleLRUWaitFreeCache( final Function<In, Out> function ) {
        checkArgument( function != null, "function can't be null" );
        this.function = function;
    }

    @Override
    public Out get( final In key ) {
        if( key == null ){
            throw new IllegalArgumentException("null keys are not supported");
        }
        final Entry<In, Out> lru = recentUsed;
        if ( lru != null && lru.key.equals( key ) ) {
            //cache hit
            return lru.value;
        } else {
            //cache miss
            final Out value = function.apply( key );
            recentUsed = new Entry<In, Out>( key, value );
            return value;
        }
    }

    @Override
    public void purge() {
        //not strictly correct since recentUsed is not volatile
        recentUsed = null;
    }
}

class Entry<Key, Value> {
    public final Key key;
    public final Value value;

    Entry( final Key key,
               final Value value ) {
        this.key = key;
        this.value = value;
    }
}

Дополнительные условия, при которых это работает:
  1. Функция function -- идемпотентна, т.е. ее повторный вызов с теми же аргументами не производит никаких дополнительных изменений в системе, по сравнению с первым вызовом. Лучше всего, чтобы она вообще была "чистой" (pure)
  2. function должно быть безопасно вызывать из нескольких потоков в произвольном порядке
  3. In/Out должны быть immutable объектами

Что мы получаем в итоге?
  1. Никаких блокировок
  2. Задержка с видимостью работает на нас -- если у разных потоков будут свои версии записанного значения, значит у каждого будет своя, самая свежая для него, версия

Так же можно реализовывать кэши произвольного размера -- только размер должен быть известен заранее. Сейчас у меня в голове бродит реализация Scatter/Gatherer на этой же основе.

9 комментариев:

  1. Строго говоря, неявные scalability bottleneck'и будут, ибо поле recentUsed будет пользоваться несколькими тредами, а значит, хардварные кеши всё равно будут бешено пытаться это значение синхронизировать. А поскольку в плохом случае запись будет производиться на каждый get(), то в патологическом случае с производительностью можно попрощаться.

    Я бы сделал либо вероятностную запись (т.е. писать не каждый раз, а раз в N операций), либо хранил recentUsed в ThreadLocal.

    ОтветитьУдалить
  2. А, и кстати, в коде -- гарантированный NPE на втором же вызове get(null).

    ОтветитьУдалить
  3. >Строго говоря, неявные scalability bottleneck'и будут

    Да, это конечно правда. Но это очень упрощенный пример. Он хорошо будет работать, если априори известно, что вообще-то функция 99.9 вызывается с одним и тем же аргументом (я в таких случаях эту штуку и использую). Если расширить реализацию для хранения массива значений (в духе fixed-size hash map) -- опять же, вероятность конкуренции за кэш-линии будет падать обратно пропорционально размеру массива.

    В общем случае, это решение хорошо работает тогда, когда оно имеет шансы выйти со временем на "асимптотический" режим, когда (почти) все используемые значения закэшированы. Тогда никакой конкуренции за строки кэша не будет, и счастье будет почти достигнуто в этой жизни.

    >либо хранил recentUsed в ThreadLocal.

    Это будет не совсем то, что я хотел -- в этом варианте каждое значение будет размножено в (число потоков, его использовавших) раз в памяти. Т.е. тут почти никакого оверхеда по синхронизации (учитывая нынешнюю реализацию ThreadLocal в jdk), зато потенциально большой оверхед по памяти. В моем же варианте никакого оверхеда по памяти, но потенциальный оверхед по CPU в плохом случае.

    >А, и кстати, в коде -- гарантированный NPE на втором же вызове get(null).

    Да, правда :) Нулевые ключи не использую, поэтому и не заметил :)

    ОтветитьУдалить
  4. ...зато мои последние микробенчмарки дают для стационарного режима (когда все актуальные ключи уже вычислены и закэшированы) примерно 70-100% прироста скорости чтения из кэша по сравнению с кэшем на базе ConcurrentHashMap (я использовал реализацию из google-guava). Я пока в фоновом режиме играюсь с различными параметрами бенчмарка -- но если эти цифры подтвердятся, то игра стоит свеч :)

    http://tools.assembla.com/svn/Behemoth/Tests/JAVA/test/src/main/java/test/threads/cache/ -- тут можно посмотреть реализацию и бенчмарки. В частности, MultiEntryCacheBenchmark.java

    ОтветитьУдалить
  5. Ну, я прямо вами горжусь. :) Наконец-то кто-то пишет микробенчмарки, от которых не тошнит.

    Добавить бы в микробенчмарк калибровку количества итераций, а то получается, что вы делаете loop{key = generator.nextValue -> lru.key.equals(key) -> return lru -> afterEachIteration()}, а потому можете случайно мерить не столько скорость самого быстрого случая в кеше, сколько скорость ваших switch'ей в afterEachIteration().

    Да, и BOGUS-итерации в конце бенчмарка тоже не помешают. По-моему, сейчас BOGUS только на старте бенчмарка.

    ОтветитьУдалить
  6. Спасибо за комплимент :) Я старался.

    >Добавить бы в микробенчмарк калибровку количества итераций

    Там же есть опция "empty" когда вместо кэша подставляется NoCache(identity()), что, по моему разумению, должно просто инлайниться в пустое место. Я ее и использую для калибровки -- с ней "производительность" примерно в 2-3 раза выше, чем лучшие результаты моих кэшей. Т.е. да, накладные расходы довольно существенны, но измеряемую величину из них все же можно выделить.

    >Да, и BOGUS-итерации в конце бенчмарка тоже не помешают.

    Да, они только на старте -- но я не понимаю, зачем их делать в конце? Насколько я понимаю, на старте они нужны а) для прогрева б) чтобы максимально одновременно дать отмашку на начало измерений всем потокам, без ожидания на каком-нибудь барьере, где будет непредсказуемая пауза между открытием барьера и реальным запуском потока планировщиком.

    Но в конце-то -- зачем?

    ОтветитьУдалить
  7. А, ну если вы померили baseline с пустым кешем, и отнимаете значение этого baseline'а из всех остальных измерений, и только потом считаете проценты -- то нормально, да :) Мы обычно поступаем проще: минимизируем расходы на инфраструктуру сразу.

    В конце BOGUS-итерации надо считать потому, что если некоторые потоки завершатся раньше, всем остальным будет проще считать: и хардварных кешей больше, и процов больше (важно для HT-машин), и т.п. Там легко можно получить мифический прирост "ниоткуда". В этом, собственно, одна из основных целей BOGUS-итераций -- сделать так, чтобы измерения в каждом из тредов проводились в условиях, когда все остальные треды тоже заняты работой.

    http://people.apache.org/~shade/talks/j1-April2011-benchmarking.pdf. слайды 50-51, посмотрите на разницу ;)

    ОтветитьУдалить
  8. >Мы обычно поступаем проще: минимизируем расходы на инфраструктуру сразу.

    На самом деле я пока не придумал, как можно еще больше минимизировать расходы на инфраструктуру. Там, вроде бы, для "измерительных" итераций, всего одно волатильное чтение на итерацию. Мне показалось это достаточным -- не могу придумать, как можно подать одновременный сигнал всем потокам на завершение измерений, не сделав хотя бы одного волатильного чтения.

    >В конце BOGUS-итерации надо считать потому, что если некоторые потоки завершатся раньше, всем остальным будет проще считать

    А, да, об этом я думал. Но я пошел более сложным путем: во-первых, флаг на завершение измерений проверяется на каждой итерации -- т.е. гэп здесь должен быть минимален. Во-вторых, там считается три числа -- максимальное/минимальное/среднее время. Если все потоки были в равных условиях -- они все должны быть равны (с учетом того, что они все округляются до ближайшего кванта системного таймера -- так точно равны). Так и выходит если брать число потоков не больше реального числа ядер (даже с учетом HT) и все ядра свободны. Если же взять больше потоков (или кто-то влезет на сервер замежду делом :) -- сразу появляются различия. Так я фильтровал некорректные эксперименты

    ОтветитьУдалить
  9. Быстрее volatile-флажка не придумаешь, по-моему.

    Хорошо, если gap между окончаниями потоков маленький. В общем случае этого гарантировать нельзя, конечно, поэтому проще озаботиться правильной схемой остановки с BOGUS в конце. Например, следующим заходом сделаете в каждой итерации тысячупятьсот get()'ов из кеша, время одной итерации подскочит до нескольких секунд, и тут-то ваша схема и лязгнет. Хорошо если, вы об этом вспомните сразу, а не после того, как получите результаты, которые не сойдутся по min/avg/max-статистикам.

    Поверьте, часто проще убить час на построение хорошей инфраструктуры, чем потерять месяц процессорного времени :)

    ОтветитьУдалить