Расписание времени настенных часов для генерации сообщения с фиктивной темой ввода - PullRequest
0 голосов
/ 23 октября 2018

У меня есть вопрос о шаблоне, который мы сейчас используем для планирования генерации сообщений на основе хранилища состояний:
Мы пишем в хранилище состояний на основе состояния обработки в наших «обычных» потоках обработки, построенных с помощьюDSL.Мы подключили transform() к расписанию настенных часов, чтобы просматривать хранилище состояний каждые n секунд.
В зависимости от состояния мы forward() обрабатываем новое сообщение в нисходящем направлении (наш способ выполнения отложенных повторных попыток).transform() нужен входной поток, в качестве входного которого используется фиктивная тема, которая никогда не будет видеть данные.

Может ли это быть сделано без фиктивной темы ввода?
Как другие люди делают это?

Дополнительная информация после ответа Матиаса: Topo

schedule() используется в качестве генератора данных на основе синхронизированных поисков данных в хранилище состояний.Это используется для повторных попыток побочного эффекта.

1 Ответ

0 голосов
/ 24 октября 2018

Похоже, вы хотите поделиться некоторым кодом для обычной обработки с побочным эффектом и обработки на основе пунктуации без побочного эффекта.В противном случае вы не будете использовать MyTransformer для обоих случаев?

Таким образом, мне интересно, могли бы вы сделать все за один Transformer вместо трех.

MyTransformer<K,V,R> implement Transformer<K,V,R> {

    public void init(ProcessorContext context) {
      context.schedule(..., new MyPunctuator());
    }

    public R transform(K key, V value) {
      // for every record from the source topic do everything
      doSharedStuff();
      doStuffWithSideEffect();
    }

    private doSharedStuff() {...}
    private doStuffWithSideEffect() {...}

    private class MyPunctuator implements Punctuator {
        public void punctuate(long timestamp) {
          for(KeyValue kv : ...) { // whatever k/v-pair to want to "forward"
            // for every record you want to emit delayed, do only some part
            doSharedStuff();
          }
        }
    }
}
...