17 июня 2012 г.

Simple concurrent caches

Почти год назад я уже писал про любопытную идею организации data race based конкурентного кэша без какой-либо синхронизации. Тогда мне казалось, что идея хоть и любопытная, но наверняка очень узко применимая, и наверняка кому надо -- те уже ее сами изобрели. Однако за прошедший год я так нигде и не встретил ничего подобного, зато уже несколько раз в своей работе столкнулся с ситуациям, где эта идея очень удачно ложится. Поэтому я опишу эту штуку еще раз, уже более конкретно.

Есть у нас доменный объект CurrencyEntity:
public final class Entity {
 private final String isoCode;

 public Entity( final String isoCode ) {
  this.isoCode = isoCode;
 }

 @Override
 public boolean equals( final Object o ) {
  if( this == o ) {
   return true;
  }
  if( !(o instanceof Entity )) {
   return false;
  }

  final Entity entity = ( Entity ) o;

  if( !isoCode.equals( entity.isoCode ) ) {
   return false;
  }

  return true;
 }

 @Override
 public int hashCode() {
  return isoCode.hashCode();
 }
}
(важно, что объект immutable). На вход (из сети, например) программа получает Entity в виде String isoCode, но внутри, понятно, хочется оперировать type-safe представлением. Однако создавать каждый раз новый экземпляр Entity не хочется -- зачем нагружать лишний раз GC? Кроме того, известно, что реально различных isoCode не так уж много -- ну, скажем, штук 100-200. Вот как бы так их кэшировать, чтобы кэш и конкурентным был, и накладные расходы минимальны (иначе кэшировать смысла нет -- создать/удалить такой мелкий объект чего-то стоит, но не так уж дорого)?

Вариант решения:
public class FastConcurrentEntityCache{
 private static final Object FREE = null;

 private final Entity[] entries;

 public FastConcurrentEntityCache( final int size ) {
  checkArgument( size > 1, "size[%s] must be >1", size );

  entries = new Entity[size];
 }

 public Entity get( final String isoCode ) {
  final int hash = isoCode.hashCode() & 0x7fffffff;

  final int length = entries.length;
  final int index = hash % length;

  Entity cur = entries[index];

  if( cur == FREE ) {
   //cache miss
   return evaluateAndStore( isoCode, index );
  } else if( cur.isoCode.equals( isoCode ) ) {
   //cache hit
   return cur;
  } else {// already FULL, must probe

   // compute the double hash
   final int probe = 1 + ( hash % ( length - 2 ) );
   int probedIndex = index;
   for( int i = 0; i < entries.length; i++ ) {
    probedIndex -= probe;
    if( probedIndex < 0 ) {
     probedIndex += length;
    }
    cur = entries[probedIndex];
    if( cur == FREE ) {
     //cache miss
     return evaluateAndStore( isoCode, probedIndex );
    } else if( cur.isoCode.equals( isoCode ) ) {
     //cache hit
     return cur;
    }
   }
   //cache miss, and cache is full 
   throw new IllegalStateException( "Cache[size:"+entries.length+"] is full: ["+isoCode+"]" );
  }
 }

 private Entity evaluateAndStore( final String isoCode,
                                  final int index ) {
  final Entity entity = new Entity( isoCode );
  entries[index] = entity;
  return entity;
 }

 public void purge() {
  Arrays.fill( entries, null );
 }
}

Собственно, весь кэш -- это хэш-таблица фиксированного размера, с открытой адресацией. Сам метод get() по-большей части с любовью украден из gnu.trove.TObjectHash -- логика несколько упрощена, поскольку нет удаления.

Подробности

Вопрос: Почему это работает? Ответ: А почему бы и нет? ;)

Вообще-то стоило бы чуть подробнее остановиться на том, как именно оно работает. Обычно предполагается, что если объект уже есть в кэше, то при запросе именно он и вернется. Но в данном случае, из-за отсутствия синхронизации, состав кэша может разными потоками видиться по-разному, так что понятие "объект есть в кэше" становится не очень определенным. Так что работать это будет так: если объект есть в кэше, и виден текущему потоку -- вернется он. Если не виден (или вообще нет) -- будет создан новый экземпляр. То есть кэш не сохраняет идентичность (preserve identity) -- для одного и того же ключа isoCode в программе могут существовать несколько разных экземпляров Entity.

Второй важный момент это то, что Entity -- immutable. В многопоточном окружении запись entries[index] = entity -- это публикация ссылки на объект. Без дополнительной синхронизации (а где она? а нет ее!) это будет публикация через data race. И только особые гарантии JMM для immutable объектов дают нам возможность утверждать, что если другой поток увидел ссылку на Entity, то он увидит через нее полностью инициализированный объект.

Собственно, здесь две ключевых идеи. Во-первых, ослабление семантики -- отказ от принципа один тюбик в одни зубы "для каждого ключа значение создается строго однажды". Во-вторых, использование immutable объектов для безопасной публикации через data race. Эти две идеи в сумме и позволяют обойтись без синхронизации.

Что еще можно с этим сделать?

Приведенная реализация подразумевает работу в стационарном режиме: через какое-то время все isoCode будут программе знакомы, и соответствующие Entity закэшированы, и видимы всем потокам единообразно. Т.е. через какое-то время новые экземпляры Entity создаваться перестанут. При этом важно, чтобы мы изначально угадали с размером кэша.

Но это не единственный вариант использования. Бывает другая задача: множество объектов в принципе не ограничено, но известно, что чаще всего используются недавние объекты. Т.е. хочется LRU кэш. И такое тоже можно:

public class FastLRUEntityCache{
 private static final Object FREE = null;

 private final Entity[] entries;

 public FastLRUEntityCache( final int size ) {
  checkArgument( size > 1, "size[%s] must be >1", size );

  entries = new Entity[size];
 }

 public Entity get( final String isoCode ) {
  final int hash = isoCode.hashCode() & 0x7fffffff;

  final int length = entries.length;
  final int index = hash % length;

  Entity cur = entries[index];

  if( cur == FREE ) {
   //cache miss
   return evaluateAndStore( isoCode, index );
  } else if( cur.isoCode.equals( isoCode ) ) {
   //cache hit
   return cur;
  } else {// already FULL, replace
   return evaluateAndStore( isoCode, index );
  }
 }

 private Entity evaluateAndStore( final String isoCode,
                                  final int index ) {
  final Entity entity = new Entity( isoCode );
  entries[index] = entity;
  return entity;
 }

 public void purge() {
  Arrays.fill( entries, null );
 }
}

Я убрал разрешение коллизий (probing). Теперь, если место для объекта уже занято -- мы просто затираем предыдущего жильца. Этот вариант, в отличие от предыдущего не страдает от запаздывающей синхронизации состояния кэша между потоками -- он ей наслаждается. В самом деле, разные потоки будут просто видеть свои LRU объекты!

Вопрос: А если у меня объект не immutable? Ответ: Если он не immutable, но фактически не меняется -- можно просто создать обертку

public class Freezer<T>{
   public final T item;
   public Freezer(final T item){
    this.item = item;
   }
}

Разумеется, это добавит немного накладных расходов (особенно в случае LRU-кэша), так что нужно тщательно тестировать. Идею можно довольно сильно обобщить: вот тут у меня годичной давности примеры кода и бенчмарки к ним, можно поглядеть, что еще с этим можно делать.

Вопрос: А можно сделать кэш resizeable? Ответ: Можно, конечно. Но придется делать поле entities volatile -- что несколько увеличит стоимость обращения к кэшу. Я не стал с этим возиться.

Для пуристов, поясняю: да, сам по себе volatile load на широко используемых архитектурах ничего дополнительного не стоит, в сравнении с обычным load. Но он запрещает некоторые оптимизации. Например:

for( over 9000 ){
  final String isoCode = ...;
  final Entity entity = cache.get(isoCode);
  ...
}
Если entries является final (да и просто non-volatile), то компилятор может вынести его чтение за цикл. Если же entries volatile -- оно будет считываться на каждой итерации. Т.е. мы можем сэкономить что-то вроде (L1 load - register load) * (over 9000) тактов :)

Философское заключение про data race

Как мы могли увидеть, существование в програме data race сам по себе не является приговором "программа некорректная". Но существование гонки, как правило, резко увеличивает размер графа состояний программы, количество возможных сценариев выполнения. Если это не было предусмотрено -- есть близкая к 100% вероятность упустить какие-то сценарии, нарушающие заложенные в код инварианты. Но если распоряжаться гонками аккуратно, то можно осмысленно создать ситуацию, где граф состояний даже с гонкой остается обозримым. И тогда можно увидеть возможности для вот таких вот простых и эффективных решений.

Это я так, типа, хвастаюсь :)

22 комментария:

  1. А где циферки? Интересно же, как в конкретном приложении это помогло.

    Кстати, в случае фондовых бирж большинство адекватных торговых площадок позволяют получить список торгующихся в данной сессии инструментов. Думаю, для валютных бирж ситуация аналогична. Можно их заранее получить, и запопулировать кеш предварительно, сэкономив на первом получении во время торговой сессии.

    Это если я правильно понял, в каком контексте ты это делаешь :)

    ОтветитьУдалить
  2. Все стало быстрее over 9000 раз -- разве не очевидно?

    Как ты будешь измерять эффект в большом приложении? Там слишком много факторов. Мне кажется, что быстрее, чем здесь -- просто сделать нельзя. Конкурентный кэш по цене однопоточного -- чего тебе еще ;)

    Микробенчмарки годичной давности показывали стоимость .get() в 2-2.5 раза меньше, чем ComputableConcurrentHashMap из guava. Но там, кажется, не смотрелся случай пакетного обращения, как в примере с циклом в статье.

    ОтветитьУдалить
  3. @gvsmirnov

    разогрев lazy cache происходит задолго до торговли, так, что замарачиваться не стоит)

    ОтветитьУдалить
  4. После "currency" внезапно захотелось проверить, где вы работаете :)

    ОтветитьУдалить
  5. При таком алгоритме двойного хеширования надо увеличивать индекс на второй хеш, а не уменьшать. Это важно. Догадайтесь сами почему.

    ОтветитьУдалить
  6. @elizarov

    как бы не очевидно, почему надо инкрементировать, а не декрементировать. собственно cache калька с trove, который как показывают ваши же тесты в статье Пишем самый быстрый хеш для кэширования данных: Часть 1 самый быстрый.

    Напришивается вопрос - где нестыковка ?

    ОтветитьУдалить
  7. Руслан, ну как обычно. При прочих равных дали тестовую нагрузку, померили целевых попугаев. В данном случае, полагаю, пытались уменьшить latency для транзакций. Прошлую статью проглядел. Бенчмарки посмотрел, спасибо :)

    Кстати, имо, нужно отдельно акцентировать внимание на контракте purge: несколько контринтуитивно, что после его вызова другие потоки ещё могут увидеть старые значения.

    P.S. конструктор в LRU переименовать забыл :)

    Владимир, ну да, я это и имел в виду.

    ОтветитьУдалить
  8. @gvsmirnov Не, отдельно на purge акцентировать не нужно -- нужно акцентировать на том, что весь кэш evantually consistent.

    ОтветитьУдалить
  9. @cheremin Прямо так уже eventually consistent? Я понимаю, что на практике на уровне железа кеши eventually consistent, но на уровне Java же нет? Или из JMM следует? Если да, то я не понимаю, как.

    ОтветитьУдалить
  10. @gvsmirnov Когда-нибудь -- понятие растяжимое :) Можит, и никогда.

    Нет, из JMM гарантии прогресса (в смысле -- сходимости всех видимых образов кэша к одному) не выводятся. Есть несложные примеры кода, которые почти наверняка и в реальности дадут несходимость. Поток с бесконечным циклом с коротким телом, где идет вызов кэша с фиксированным ключом, например.

    Другое дело, что с точки зрения практики это не важно. Ну остался там у кого-то застрявший образ кэша -- да и хрен с ним, пусть подавится. Общий объем неконсистентности на системах с hcc будет не больше, чем (размер регистров)*(число потоков), а в реальности гораздо меньше, потому что сложно иметь осмысленный код, который сколько-либо заметное время держит какие-то данные в регистрах.

    ОтветитьУдалить
  11. А разве probe не должен оказаться взаимно-простым с length? Кажется, код этого не гарантирует.

    ОтветитьУдалить
  12. Да, должен. Нет, код этого не гарантирует. Размер лучше выбирать простым числом, тогда все будет круто. Но и для обычных чисел все скорее всего будет неплохо -- потому что число проб больше 3-4 это уже признак плохо выбранного размера, или плохой хэш-функции.

    ОтветитьУдалить
  13. @elizarov
    >При таком алгоритме двойного хеширования надо увеличивать индекс на второй хеш, а не уменьшать. Это важно. Догадайтесь сами почему.

    Не догадался, расскажите. Есть пара гипотез, но на "важно" они никак не тянут. Бенчмарки тоже не показывают сколь-либо заметного эффекта

    ОтветитьУдалить
  14. >> "data race based конкурентного кэша без какой-либо синхронизации. ... за прошедший год я так нигде и не встретил ничего подобного".
    Кэша я тоже не видел, но в целом, эта идея использована авторами Sun JDK в String.hashCode(). Значение кэшируется в non-volatile переменной. Так как значение int, то их пример работает и в старой JMM (им не требуется новая семантика для final, используемая Вами).

    ОтветитьУдалить
  15. >> "Когда-нибудь -- понятие растяжимое :) Можит, и никогда."
    Ну ... Если быть уж совсем "буквоедом", то
    - Wiki [http://en.wikipedia.org/wiki/Eventual_consistency]: "It means that given a sufficiently long period of time over which no changes are sent, all updates can be expected to propagate eventually through the system and all the replicas will be consistent. While some authors use that definition (e.g. Vogels), others prefer a stronger definition that requires good things to happen even in the presence of continuing updates, reconfigurations, or failures. In the Terry et al. work referenced above, eventual consistency means that for a given accepted update and a given replica, eventually, either the update reaches the replica, or the replica retires from service."
    - Vogel [http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html]: "...to be an eventually consistent data store; that is all updates reach all replicas eventually."
    Т.е. в Вашем случае, Вы полагаете еще "более слабый" уровень консистенции. Математически, я полагаю, eventually == для каждого запуска программы можно указать такой промежуток времени, что после него все "устаканится". Т.е. время "устаканивания" неограниченно сверху, но всегда конечно.

    ОтветитьУдалить
  16. @ivan
    >String.hashCode().

    Да, разумеется, кажется даже, я именно им и вдохновлялся. И меня удивляет-то именно то, что дальше hashCode почему-то никто не идет. Собственно, immutable объекты в джаве имеют в некотором смысле семантику примитивов -- как int не может быть записан "наполовину", так и ссылка на immutable объект не может быть передана на "наполовину" инициализированный объект

    Насчет EC -- нет, я думаю конечность времени достижения стационарного режима как раз гарантировать будет нельзя. Мой контрпример -- поток, вертящийся в бесконечном цикле, и тупо дергающий cache.get(123). Я не вижу, что мешает JVM закэшировать _все_ в регистрах навечно.

    Другое дело, что а) ну и хрен бы с ним -- от этого ничего не ломается б) в реально рабочем коде такое представить сложно -- если код делает хоть что-то полезное, он время от времени будет "натыкаться" на synchronized actions.

    ОтветитьУдалить
  17. Продолжая "буквоедство":
    На счет eventually consistency есть более интересный момент. EC неявно предполагает "однонаправленность времени": скажем обновления с одной ноды/потока/агента/актора сохраняют порядок (PRAM/FIFO consistency/Processor consistency: http://en.wikipedia.org/wiki/PRAM_consistency).
    С hb-ребрами я разобрался, но на commitment protocol от Джереми и Пуха я "заглох". Соответственно вопрос, есть ли PRAM при полной отсутствии синхронизации?

    Пример:
    Изначально x==0

    Поток #0:
    x = 1;
    x = 2;

    Поток #1:
    System.out.println(x);
    System.out.println(x);
    System.out.println(x);
    System.out.println(x);
    System.out.println(x);

    Вопрос: что может вывести поток #1?
    Например, может ли вывести
    0
    1
    0
    2
    1
    ?

    P.S. Имеется в виду только спецификация, отбрасываем то, как реализованы текущие процессоры/компиляторы/JVM. Скажем, мы реализуем распределенную JVM, или у нас новейший оптико-квантовый-волновой процессор:)

    ОтветитьУдалить
  18. На протоколе подтверждения я тоже пока завис :)

    Насколько я сейчас думаю -- нет, то, что вы описали невозможно по JMM. Потому что какая-то запись либо подтверждена, либо нет -- и если уж чтение увидело результат какой-то записи, то она-таки уже не может быть "отменена", и перепременена позже. Но это лишь мое мнение

    ОтветитьУдалить
  19. Я спрашивал Джереми на счет его "эмулятора JMM" - он ответил, что никому не дает, так как он неверно работает:). Раз один из авторов допускает ошибки при попытке реализации эмулятора, значит commitment protocol, фактически, полное Г.
    Вот второй пример на котором я застрял:

    Изначально x==0;

    Поток #0:
    x = 1;
    println("#0: " + x);

    Поток #1:
    x = 2;
    println("#1: " + x);

    Может ли вывести
    #0: 2
    #1: 1
    ?

    ОтветитьУдалить
  20. А что ты думаешь на счет AtomicReferenceArray.weakCompareAndSet(...)[http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/atomic/AtomicReferenceArray.html#weakCompareAndSet(int, E, E)]?
    - есть гарантии, что обновление пройдет
    - нет платы за синхронизацию = "weakCompareAndSet atomically reads and conditionally writes a variable, is ordered with respect to other memory operations on that variable, but otherwise acts as an ordinary non-volatile memory operation."[http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/atomic/package-summary.html]
    ?

    P.S. Не представляю как оно реализовано, т.е. есть ли у "обычных" процессоров инструкции, гарантирующие запись, но не делающих синхронизации. Думаю стоит спросить Алексея Шепелева [https://plus.google.com/101775216477661225977/posts] - он у тебя в друзьях есть и это, вроде как, его парафия.

    ОтветитьУдалить
  21. >> "Общий объем неконсистентности на системах с hcc будет не больше, чем (размер регистров)*(число потоков), а в реальности гораздо меньше, потому что сложно иметь осмысленный код, который сколько-либо заметное время держит какие-то данные в регистрах."

    Если бы мы писали на ассемблере, то однозначно бы с тобой согласился. Но в JVM Spec 3(java 5) были понятия "private working copies" + "master copies" + "shared main memory": "The Java programming language allows threads that access shared variables to keep private working copies of the variables; this allows a more efficient implementation of multiple threads (§2.19). These working copies need to be reconciled with the master copies in the shared main memory only at prescribed synchronization points, namely, when objects are locked or unlocked (§2.19)". (Надо сказать что в JVM Spec 4(java 7)) эту часть просто стерли - теперь вообще туман. Эта фраза достаточно явно указывает на то, что авторы JVM для борьбы с неявной платой за когерентность (семейство MESI - шум на шине) имеют возможность каждому потоку заводить свою копию shared variable. Из чего следует, что объем неконсистентности неограничен ни размерами регистров, ни размерами кэша. Опять же, это по спеке. Как оно на самом деле - к Алексею Шепелеву.

    [http://docs.oracle.com/javase/specs/jvms/se5.0/html/Concepts.doc.html#29882]

    ОтветитьУдалить
  22. @ivan
    Про weakCAS я уже писал несколько раз. Как и про lazySet, из той же оперы -- совсем недавно. На x86 weakCAS == CAS, там нет атомиков без барьера. Такие штуки есть на ARM (вроде бы) и на Azul VEGA (точно), для них это актуально.

    Про объем неконсистентности: надо все-таки отделять мух от котлет. JMM это инструмент доказательства формальной корректности. Это не инструмент доказательства эффективности. Корректность в данном примере это "кэш всегда возвращает именно те объекты, которые запрошены (с нужным содержимым), полностью инициализированными". Это свойство можно доказать.

    Эффективность же -- это чего будет стоить такой кэш. Здесь без деталей реализации JVM/CPU не обойтись. Детали реализации -- это то, что когда ОС меняет активный поток на ядре, она заботится о сохранении/восстановлении только состояния CPU (регистров). Память у потоков общая, разделяемая, и аппаратно синхронизируемая.

    ОтветитьУдалить