пятница, 9 сентября 2011 г.

Akka actors: Первое неоднозначное впечатление

Давно я ничего не писал, хотя в draft лежит много чего, в том числе интересного. Но эта статья носит немного другой, как мне кажется, не технический, а больше эмоциональный характер, поэтому решил её долго не пилить и выложить, тем более в twitter обещал отписаться о моём опыте.

Ситуация такая. Есть у нас маленький, как считалось не слишком важный, кусочек кода, который переносит данные из oracle в mongo с небольшими преобразованиями. Через некоторое время, как обычно, оказалось, что он не такой уж и неважный, поэтому возникла задач быстро его немножко ускорить. Особо оптимизировать в коде нечего было, поэтому ускорение можно было предать параллелизацией некоторых его частей, благо код был разделен на достаточно независимые части. Для этого я решил в порядке эксперимента воспользоваться интересно библиотекой Akka, написанной на Scala, но разработчики поддерживают удобное api и документацию и для Java. Это мой первый опыт с данной библиотекой, так что никаких откровений тут не будет, да и в целом, как я ранее написал, статья больше эмоциональная, нежели техническая.

В начале определимся с исходными данными. У нас есть какой-то однопоточный код.

На диаграмме (простите за фон, по клику можно посмотреть в лучше виде) представлены базовые типы. Главный класс у нас Flow. Он обращается к Reader за документами из oracle (json, фактически HashMap, где в качестве Object может быть другой map). Дальше к документу применяется несколько простых трансформаций и он передается Writer для записи в MongoDb. (Код однопоточного Flow простой, поэтому я его не привожу, но можно глянуть на GitHub. Конкретные же реализации Reader, Transform, Writer даже на GitHub не положил, так как к делу они не относятся).

За идей как это разбить на потоки даже к Кэпу не надо идти. В свое время на .NET мы что-то похоже делали с помощью parallel programming library. Т.е. каждая компонента садиться в отдельную Task, а между ними ставиться ConcurrentQueue. Типичный producer-consumer, только трансформации играют и ту и другую роль. Те, кто уже кто знаком с Akka понимают куда я виду. Фактически тогда на .NET, мы написали очень простые Actor'ы. По этому в этот раз я решил использовать настоящие Actor'ы :) .

Akka предоставляет два типа акторов: типизированные и нет. Тут честно говоря моему негодованию не было предела. Scala и Java статически типизированные языки, поэтому наличие чего-нибудь untyped вызывает у меня бурю эмоций, так как одних уже костыльных недо-generic'ов достаточно.

И так, типизированный актор. Как я это вижу с высока. Мы определяем свой интерфейс, регистрируем его с типом его реализующим и из регистра Akka можем получить объект каждый вызов интерфейсного метода которого превращается в асинхронную посылку сообщения. Немного посмотрев на диаграмму выше я все таки решил, что данные интерфейсы нам немного не подходят, хотя на самом деле можно было обойтись и ими. Я решил сделать более самодостаточный асинхронный pipeline сильно упростив flow. Обратим внимания на большую картинку, надеюсь она вас не испугает, в мой монитор она целиком влазит, поэтому я считаю ок :)

Будем разбираться по порядку. Flow значительно упрощается он работает только с интерфейсом ReaderEvaluator, который предоставляет всего один метод read, который запускает всю процедуру. Несколько неоднозначный на самом деле интерфейс, так как в результате документ будет не только прочитан, но и записан. Больше подходит Flow.run, тем не менее один Flow у нас уже есть...

За этим интерфейсом у нас скрывается ReaderActor, которому сообщение read будет передано асинхронно и он, используя Reader, читает документы и передает их интерфейсу Evaluator. За этим интерфейсам у нас так же прячутся акторы, так что это будут не вызовы методов, а просто передача сообщений и управление мы получим тотчас же. Всего у нас есть два актора реализующих этот интерфейс. TransformActor - выполняет преобразование документа и посылает его на обработку следующему Evaluator. WriterActor -записывает полученный документ. (Так как ReaderActor работает с интерфейсом Evaluator, то мы ему можем подсунуть сразу WriterActor, если преобразований выполнять не надо)

По моему выглядит все крайне просто и тут Akka порадовала, реализуется ещё проще.


public void read() {
    while (reader.hasNext()) {
        next.apply(reader.next());
    }
}

public void apply(final Document document) {
    next.apply(transform.apply(document));
}

public void apply(final Document document) {
    writer.write(document);
}

Flow действительно стал необычайно простым - в этом случае, просто вызов метода read. Вся проблема в том, что необходимо не просто сохранить интерфейс Flow (это как раз просто). Но необходимо сохранить и поведение. Предыдущая версия Flow была синхронной, поэтому если бы мы ReaderActor совместили бы с Flow, что выглядит на первый взгляд логичным, метод run выполнялся бы асинхронно и это сломало бы другие вещи. Хотя сейчас он все равно выполняет асинхронно так как Flow не дожидается завершения всех акторов, но к этому мы вернемся позже. Сейчас хотел добавить одну техническую деталь, а именно как создаются Flow. Для удобства был сделан абстрактный builder.

Абстрактный тут только метод build, который создает конкретный flow. Именно в этом методе мы спрячем детали создания акторов и связей между ними. (Хотя на самом деле Akka умеет работать с Spring и Guice, но в нашем примере, который я вынес на GitHub это избыточно).


protected Flow flow(final Reader reader, final Transform[] transforms, final Writer writer) {
    final Evaluator writerEvaluator = TypedActor.newInstance(
             Evaluator.class, 
             WriterActor.factory(writer));
    Evaluator previous = writerEvaluator;
    for (int i = transforms.length - 1; i > 0; i--) {
        final Evaluator transformEvaluator = TypedActor.newInstance(
                  Evaluator.class, 
                  TransformActor.factory(transforms[i], previous));
        previous = transformEvaluator;
   }
   final ReaderEvaluator readerEvaluator = TypedActor.newInstance(
          ReaderEvaluator.class, 
          ReaderActor.factory(reader, previous));
   return new Flow(readerEvaluator);
}

Самим создавать через конструктор конечно же нельзя. Надо просто вежливо попросить Akka. Тут есть одна особенность. Попросить можно по разному. Например указать тип интерфейса и тип его реализации. Но мне такой способ не подходит, так как я предпочитаю инициализировать объекты через конструктор, а не getter/setter. Для этого нужно передать вместо типа реализации, фабрику, которая его создает.

Теперь вернемся к нашему поведению Flow. Есть такой хороший принцип Liskov substitution principle. Моя идея в том, чтобы старый Flow и новый имели то же поведение. Причем это даже очень легко можно проверить. Я создал вот такой абстрактный тест класс.


public abstract class AbstractFlowTest {

    @Test
    public void testFullRun() {
        Document doc1 = new Document();
        doc1.put("id", 1);
        Document doc2 = new Document();
        doc2.put("id", 2);

        AbstractFlowBuilder builder = builder();

        final Reader reader = reader(doc1, doc2);
        builder.reader(reader);

        final Transform transform1 = transform(doc1, doc2);
        builder.transform(transform1);
        final Transform transform2 = transform(doc1, doc2);
        builder.transform(transform2);

        final Writer writer = writer(doc1, doc2);
        builder.writer(writer);
      
        final Flow flow = builder.build();
        flow.run();

        verify(writer).write(doc1);
        verify(writer).write(doc2);
        verify(writer, times(0)).write(null);
    }

    protected abstract AbstractFlowBuilder builder();


......

Теперь для каждого из Flow мы можем создать по тест классу с правильным builder и будем точно знать (насколько мы уверены в наших тестах), что оба наших flow имеют одинаковое поведение. И в зависимости от расположения звезд мы можем использовать медленный последовательный или быстрый на стероидах акторах flow.

Typed акторы умеют возвращать Future и можно дождаться её завершения. Но проблема в том, что flow считается завершенным только тогда, когда WriterActor запишет последний документ. Так что надо получить Future именно от него. Мне показалось это сложным, поэтому я просто добавил listener, который WriterActor вызывает когда завершил свою работу и мы тогда просыпаемся. Код можно глянуть в финальной версии на GitHub, если есть комментарии/идеи - всегда готов обсудить.

Теперь обсудим то, что смазало мое положительное впечатление о Akka Actors. Есть ещё одно требования в Flow.run. Он не должен кушать исключения, т.е. если какой-то компонент его выбросит, flow должен просто пробросить его выше, там мы разберемся что делать или не делать.Так как теперь компоненты работают в отдельных нитях, у нас проблема, акторы скушают ошибки. На этот случай есть концепция супервизоров. Т.е. наблюдающих акторов. На первый взгляд она выглядит замечательно, можно указать сколько раз перерестартовать актор и с какими интервалами. Можно указать сам актор супервизор... Но Я так и не смог решить свою проблему. Где можно добыть этот exception, который произошел в акторе. Я даже написал в akka-user, но по видимому я так там и не смог правильно выразить свою мысль, чтобы меня тыкнули пальцам, какой именно метод в супер визоре надо определить, чтобы получить это магическое исключение.

Так что я применил уже проверенный опыт с листенером. К моему удивлению, не смотря на наличие концепции супрвизоров. В каждом TypedActor есть hook-методы preRestart и postRestart. В которых я и вызываю этот листенер, так как там есть нужное мне исключение. Но это породило другую проблему. Приходится указывать, чтобы актор один раз рестартовался после ошибки, так как тогда будут вызваны эти методы. Мне то этот рестарт не нужен, поэтому приходится в основных методах актора его отлавливать и игнорировать. Это настолько ужасный солюшел, что я даже не стал его переносить в GitHub пример.

Есть ещё одна маленькая эстетическая проблема. Idea хотя отлично компилирует, тем не менее в редакторе подсвечивает красным TypedActor'ы: Extending TypedActor

На самом деле Akka мне понравилась и в данной статье я не осветил даже 10% фич акторов. Например remote actors и почему hadoop вам больше не нужен. Или о более низкоуровневых деталях работы, в том что стоит настроить размер очереди между акторами, а то по умолчанию она бесконечная. Есть и другие предостережения, что в моем случая сообщения будут совсем не immutable. Как я вначале предупреждал данная статья не учебная, я просто описал мои небольшой опыт работы. (Финальный код в соответствующем проекте на github: pipeline)

А ещё я завтра иду на доклад @remeniuk о второй версии Akka, буду там задавать ему нубские вопросы и попытаюсь понять решить ли она мои проблемы или мне все же надо подравнять руки :) (хотя внимательный читатель понимает зачем я туда иду :D)

11 комментариев:

  1. ага, отлично, тогда все комменты будут завтра, в процессе :-)

    ОтветитьУдалить
  2. Замечательно :) Тогда после встречи обновлю пост :)

    ОтветитьУдалить
  3. Проблема решена, перепишу все на Scala + Untyped actor'ах :)

    ОтветитьУдалить
  4. Не посоветуешь какой-нибудь небольшой проект open source на Akka, чтобы посмотреть как люди правильно готовят?

    ОтветитьУдалить
  5. долго рылся, но ничего достойного внимания так и не нашел :( с чего напрашивается вывод, что 1) все готовят, как умеют (и готовят, в большинстве, неправильно :-) ), 2) идиоматического пути нет (если только не заглядывать в erlang или в исходники самой akka, которая в лучших традициях жанра написана с использованием себя :-) ).

    ОтветитьУдалить
  6. бггг :) а у меня появилась идея сделать API для Akka на clojure :D раз у них там все так скудно с распределенным программированием

    ОтветитьУдалить
  7. > идиоматического пути нет
    Это да, но должны быть же best practies. 
    Ну вот к примеру, я в других акторах (ReaderActor, TransformActor) акторы получаю из flow в конструкторе. Насколько я понял на встрече - правильный способ брать их из репозитория?

    Да исходники akka уже местами смотрел. Конкретно тесте и примеры. Идея эта здравая.

    ОтветитьУдалить
  8. Нет, это как мне кажется плохая идея :) Просто Рич так старался сделать классную интеграцию с Java.. :)

    ОтветитьУдалить
  9. Картинков чей-та вообще нет.

    ОтветитьУдалить
  10. Вот жеж... Спасибо что сказал. Просто я использовал сервис http://yuml.me/, он по специальному url содержащему описание диаграммы выдает картинку, а сейчас он лежит походу =/ Как только поднимется - закачаю картинки на блоггер.

    ОтветитьУдалить
  11. Для диаграмм использовал прикольный сервис http://yuml.me/. Он позволяет прямо в url на их языке описать диаграмму и он отдаст картинку с ней. К сожалению сегодня весь день он лежал, как мне подсказал @2c31fd9e8356119cdf8229bb220fd07c поэтому я скачал картинки и загрузил на блогер. Кстати, заметил ещё один любопытный факт, теперь блогер по щелчку на картинку открывает view как на G+ со всеми картинками из поста.

    ОтветитьУдалить