воскресенье, 6 марта 2011 г.

О поиске простого решения или как я писал JobStore для Quartz

Java очень хорошо известна своим разнообразием различных Open Source frameworks, одни ребята из Apache наверно несколько десятков сгенерировали. Причём всегда есть много альтернатив (к сожалению, зачастую все равно выбрать нечего :))). Но есть планировщик задач Quartz, которому нет альтернативы, почти стандарт, так сказать. Причём с архитектурной точки зрения он довольно прост и хорош. У нас есть Scheduler в котором мы регистрируем Trigger'ы и Job'ы и связи между ними, таким образом каждый триггер может активировать одну или несколько работ.

Данная библиотека одна из ключевых в одном из наших подпроектов. Важный вопрос как всегда это persistence. Т.е. возможны следующие случаи к примеру:

  • Если триггер должен сработать только несколько раз. То после рестарта приложения Scheduler должен знать об этом и не активировать его.
  • Если у нас есть триггер, который должен срабатывать каждый час, но спустя, например, 59 минут наше приложение упало. То когда его перезапустили, этот триггер должен сработать через минуту, а не через час.

Это вопрос в Quartz так же решён. Есть интерфейс JobStore, к которому обращается Scheduler за всеми нужными данными и он должен обеспечивать сохранность данных. По умолчанию существуют две реализации RAM и JDBC. Как нетрудно догадаться, RAM никакой сохранности не обеспечивает, но JDBC по сути должно хватить всем, так как почти любая СУБД к вашим услугам. К сожалению для нас это не так, мы не используем СУБД. Поэтому пришлось реализовывать JobStore самостоятельно. И вот тут, тот, кто также успел сходить по ссылке на api JobStore, мог ужаснуться, так как JobStore - это интерфейс наверно на полсотни методов. Поэтому под катом, моя история, как я пытался малой кровью реализовать этого бегемота для наших нужд :)

Очевидно, реализовывать это всё с нуля, крайне глупо, поэтому в качестве основы я выбрал RAMJobStore, просто надо было обеспечить сохранность его данных. Немного изучив исходные коды, а также посмотрев на структуру таблиц JDBC реализации, я понял, что сохранять мне необходимо информацию только о триггерах, так как job'ы у нас stateless, поэтому их мы можем переинициализировать при перезапуске заново. И тут мы воспользуемся грязным императивным трюком =D В JobStore есть метод storeTrigger, в который и передаётся сам триггер. RAMJobStore, Scheduler и другие классы Quartz, уже работают только с этим объектом, меняют его состояние и т.п., поэтому достаточно перехватить на методе storeTrigger этот объект и сохранить к себе в коллекцию и на остальных методах лишь сериализовать эту коллекцию и у нас будет актуальное состояние триггеров. Ах, да, забыл упомянуть, persistence мы будем обеспечивать обычной xml сериализацией. Так же надо ещё отметить тот факт, что конфигурация работ и их связей с триггерами у нас задаётся отдельно, а Quartz удаляет триггер, который отработал нужное количество раз, поэтому будет ещё одна коллекция с удалёнными триггерами. Т.о. стейт выглядит очень просто:


@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
private class SchedulerState {

    private Map<String, Trigger> activeTriggers = new HashMap<String, Trigger>();

    private Map<String, Trigger> removedTriggers = new HashMap<String, Trigger>();

}

Ну и теперь надо реализовать ряд простых методов для манипуляции со стейтом тут всё просто, только не надо забывать, что удалённый триггер мы больше в активные не добавляем.


public synchronized boolean containsTrigger(String triggerName) {
    return activeTriggers.containsKey(triggerName);
}

public synchronized boolean isRemoved(String triggerName) {
    return removedTriggers.containsKey(triggerName);
}

public synchronized Trigger getTrigger(String triggerName) {
    return activeTriggers.get(triggerName);
}

public synchronized void updateTrigger(Trigger trigger) {
   if (trigger == null) {
        return;
   }
   String triggerName = trigger.getName();
   if (removedTriggers.containsKey(triggerName)) {
       return;
   }
   activeTriggers.put(triggerName, trigger);
   serialize();
}

public synchronized void updateTriggers() {
    serialize();
}

public synchronized void removeTrigger(String triggerName) {
    if (!activeTriggers.containsKey(triggerName)) {
         return;
    }
    Trigger trigger = activeTriggers.get(triggerName);
    removedTriggers.put(triggerName, trigger);
    activeTriggers.remove(triggerName);
    serialize();
}

Касательно самого нашего JobStore, тут всё просто при сохранении добавляем триггер в наш стейт, но если он уже был удалён, нам не надо его оправлять в RAMJobStore, остальные методы ещё тривиальнее, либо обновляем триггер(-ы) или просто удаляем


private class RAMJobStoreWrapper extends RAMJobStore {
    @Override
    public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) {
        super.initialize(loadHelper, signaler);

        state = new SchedulerState(getStatePath());
    }

    @Override
    public void storeTrigger(SchedulingContext ctxt, Trigger newTrigger, boolean replaceExisting)
            throws JobPersistenceException {
        if (state.isRemoved(newTrigger.getName())) {
            return;
        }
        super.storeTrigger(ctxt, newTrigger, replaceExisting);
        state.updateTrigger(newTrigger);
    }

    @Override
    public boolean removeTrigger(SchedulingContext ctxt, String triggerName, String groupName) {
        boolean result = super.removeTrigger(ctxt, triggerName, groupName);
        state.removeTrigger(triggerName);
        return result;
    }

    @Override
    public boolean replaceTrigger(SchedulingContext ctxt, String triggerName, String groupName, Trigger newTrigger)
            throws JobPersistenceException {
        if (state.isRemoved(newTrigger.getName())) {
            return false;
        }
        boolean result = super.replaceTrigger(ctxt, triggerName, triggerName, newTrigger);
        state.updateTrigger(newTrigger);
        return result;
    }

    @Override
    public void releaseAcquiredTrigger(SchedulingContext ctxt, Trigger trigger) {
        super.releaseAcquiredTrigger(ctxt, trigger);
        state.updateTrigger(trigger);
    }

....

}

Теперь поговорим о Scheduler'e. Тут всё очень просто. Как я раньше написал, Job'ы e нас stateless, поэтому единственные объекты, которые нам надо достать из стейта, если он есть, это триггеры, для всего остального без различно, мы поднимаемся со стейта или напускаемся в первый раз.

Ах, да, в стейт классе фигурировал метод serialize. Для этого мы используем Marshaller, но с некоторыми специфическими наворотами, типа архивации стейта, подсчёта чексуммы и пр. По мере разработки он меня попеременно то радовал, то сильно разочаровывал. Но по порядку.

Заметим, что в списках мы храним абстрактный класс Trigger, на самом же деле там окажутся его реализации (например SimpleTrigger или CronTrigger). А marshaller генерирует чистый xml, который не содержит мусорной информации о классах, поэтому данный код упал сразу же при десериализации. И тут я был рад, проблема разрешилась легко, просто в контекст надо было передать кроме класса стейта, еще классы триггеров, и специально для Cron'а ещё и TimeZone с его реализацией.


JAXBContext context = JAXBContext.newInstance(SchedulerState.class, Trigger.class, SimpleTrigger.class,
                CronTrigger.class, TimeZone.class, ZoneInfo.class);
Marshaller marshaller = context.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
marshaller.marshal(state, outputStream);

Тут нас ждал очередной сюрприз. Очень важные поля previousFireTime и nextFireTime на отрез не сериализовались. Соответственно данная версия решала только первую описанную проблему: отработавшие триггеры больше не запускались. Но как быть с правильным временем старта? Пришлось написать обёртку для Trigger класса с отдельным сохранением времени и использовать её в списках в стейте.


@XmlAccessorType(XmlAccessType.PROPERTY)
private static class TriggerWrapper {

    private Trigger trigger;
    private Date previousFireTime;
    private Date nextFireTime;

    public TriggerWrapper() {
        super();
    }

    public TriggerWrapper(Trigger trigger) {
        this.trigger = trigger;
        previousFireTime = trigger.getPreviousFireTime();
        nextFireTime = trigger.getNextFireTime();
    }

    public Trigger getTrigger() {
        correctTime(trigger, nextFireTime, previousFireTime);
        return trigger;
    }

    public void setTrigger(Trigger trigger) {
        this.trigger = trigger;
    }

    public Date getPreviousFireTime() {
        if (trigger.getPreviousFireTime() != null) {
            previousFireTime = trigger.getPreviousFireTime();
        }
        return previousFireTime;
    }

    public void setPreviousFireTime(Date previousFireTime) {
        this.previousFireTime = previousFireTime;
    }

    public Date getNextFireTime() {
        if (trigger.getNextFireTime() != null) {
            nextFireTime = trigger.getNextFireTime();
        }
        return nextFireTime;
    }

    public void setNextFireTime(Date nextFireTime) {
        this.nextFireTime = nextFireTime;
    }
        

    public static void correctTime(Trigger trigger, Date nextFireTime, Date previousFireTime) {
       if (trigger == null) {
           return;
       }
       if (trigger.getNextFireTime() != null
               || trigger.getPreviousFireTime() != null
               || trigger.getStartTime() != null) {
           return;
       }
       Date currentTime = new Date();
       if (nextFireTime != null) {
           if (trigger instanceof SimpleTrigger) {
               ((SimpleTrigger) trigger).setNextFireTime(nextFireTime);
           } else if (trigger instanceof CronTrigger) {
               ((CronTrigger) trigger).setNextFireTime(nextFireTime);
           }
           // Correct start time, because scheduler recalculate nextFireTime
           if (nextFireTime.getTime() <= currentTime.getTime()) {
               trigger.setStartTime(currentTime);
           } else if (previousFireTime != null) {
               trigger.setStartTime(previousFireTime);
           } else {
               trigger.setStartTime(currentTime);
           }
       }
       if (previousFireTime != null) {
           if (trigger instanceof SimpleTrigger) {
               ((SimpleTrigger) trigger).setPreviousFireTime(previousFireTime);
           } else if (trigger instanceof CronTrigger) {
               ((CronTrigger) trigger).setPreviousFireTime(previousFireTime);
           }
       }
    }
}

Я кстати опускаю автоматически сгенерированные equals/hashCode/toString. Тут в первый раз в жизни, наверное, пришлось в getter'е написать что-то больше чем возвращения значения, но тут действительно надо подкорректировать время, так как оно не сериализовалось, можно было сделать в setter'е, но пришлось бы это делать в трёх и с костылями, потому как не понятно в какой очерёдности они будут вызваны, а getTrigger вызывается только в моём коде. Кроме того есть ещё несколько тонких моментов по методу correctTime.

  • Он использует конкретные реализации триггеров, потому что в классе Trigger определён только getNextFireTime, а setNextFireTime это методы реализаций. Это кстати одна из возможны причин не работающей сериализации.
  • Вызов setStartTime должен вызывать удивление. Это поле сериализуется нормально и почему мы устанавливаем ему другое значение. Это необходимо сделать из за нашего неполного persistence'а. О том, что мы триггеры восстанавливаем из state'а Quartz не знает, поэтому считает, что мы запускались с нуля, поэтому пересчитывает время запуска триггера с помощью метода computeFirstFireTime, который использует startTime, как точку отсчёта для nextFireTime.

Вот пример стейта:


<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<state>
    <activeTriggers>
        <entry>
            <key>trigger</key>
            <value>
                <nextFireTime>2011-03-06T13:44:00+02:00</nextFireTime>
                <trigger xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="cronTrigger">
                    <group>DEFAULT</group>
                    <jobDataMap/>
                    <jobGroup>DEFAULT</jobGroup>
                    <jobName>job</jobName>
                    <misfireInstruction>0</misfireInstruction>
                    <name>maintenance</name>
                    <priority>5</priority>
                    <startTime>2011-03-06T13:42:40+02:00</startTime>
                    <cronExpression>0 0/2 * ? * * *</cronExpression>
                    <timeZone xsi:type="zoneInfo">
                        <ID>Europe/Athens</ID>
                        <rawOffset>7200000</rawOffset>
                    </timeZone>
                </trigger>
            </value>
        </entry>        
    </activeTriggers>
    <removedTriggers>        
    </removedTriggers>
</state>

И напоследок ещё пару замечаний. Работа с исключениями в java, это всегда головная боль. Например, обратим внимания на метод JobStore.initialize. Согласно контракту интерфейса он может выбросить SchedulerConfigException, но класс RAMJobStore, от которого мы наследуемся решил, что он всегда успешно проинициализируется, поэтому его сигнатура уже не содержит декларации этого исключения. Но мы в методе инициализации десериализуем стейт и тут могут возникнуть куча проблем, все что мы ловим, я заворачиваю в наш кастомный exception, который наследуется от RuntimeEsception, поэтому ловите где хотите, так сказать. И вот я бы хотел словить в методе initialize, чтобы наверх выкинуть SchedulerConfigException, чтобы Scheduler корректно остановился (чего не будет в случае runtime исключения). Но если наследоваться от RAMJobStore, такое нельзя сделать, я не могу поменять сигнатуру добавив SchedulerConfigException, а выкинуть его без этого нельзя, так как это checked exception. Поэтому пришлось заменить наследование на включение (ещё одна отсылка к не состоятельности ООП и его реализаций :))).

Так же полезным может оказаться в JobStore передать какие-нибудь конфигурационные параметры. В Quartz это сделано не плохо. В JobStore определим поле с getter/setter по java конвенции и тогда, при инициализации Scheduler'а с прочими параметрами можно передать org.quartz.jobStore.<имя поля> и это значение будет установлено в это поле перед вызовом initialize, но к сожалению это работает только с простыми типами, так как ориентированно на использование в properties файле, имя которого можно передать фабрике, конструирующей планировщики. Кстати, своя реализация JobStore устанавливается полем org.quartz.jobStore.class.

Раз я тут всех ругал, можно и себя поругать, для тех, кто не читает мой твиттер. Мне официально принадлежит изобретение анти-паттерна "Immutable class". Это когда настолько сложно добавить что-то непосредственно в класс, что возникают мысли лучше это сделать где-то в другом месте. Частный случай реализации этого паттерна был получен в попытках реализовать паттерн immutable object в mutable java'е.

Комментариев нет:

Отправить комментарий