Агрегировать непрерывный поток чисел из файла, используя Hazelcast Jet - PullRequest
0 голосов
/ 22 января 2019

Я пытаюсь суммировать непрерывный поток чисел из файла, используя hazelcast jet

pipe
    .drawFrom(Sources.fileWatcher)<dir>))
    .map(s->Integer.parseInt(s))
    .addTimestamps()
    .window(WindowDefinition.sliding(10000,1000))
    .aggregate(AggregateOperations.summingDouble(x->x))
    .drainTo(Sinks.logger());

Несколько вопросов

  1. Это не дает ожидаемого результата, мое ожиданиекак только в файле появится новое число, оно должно просто добавить его к существующей сумме
  2. . Для этого мне нужно указать окно и метод addTimestamp, мне просто нужно сделать сумму бесконечного потока
  3. Как мы можем достичь отказоустойчивости, то есть, если перезапуск сервера сохранит агрегированный результат, а когда он появится, он будет агрегировать из последней вычисленной суммы?
  4. , если сервер не работает и мало чиселвходить в файл сейчас, когда сервер запускается, будет ли он читать с последней точки с момента, когда сервер вышел из строя, или он пропустит числа, когда он был выключен, и будет считывать только число, полученное после того, как сервер работал.

1 Ответ

0 голосов
/ 22 января 2019

Ответ на вопросы Q1 и Q2: Вы ищете rollingAggregate, вам не нужны метки времени или окна.

pipe
    .drawFrom(Sources.fileWatcher(<dir>))
    .rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
    .drainTo(Sinks.logger());

Ответ на вопросы Q3 и Q4: источник fileWatcher не является отказоустойчивым. Причина в том, что он читает локальные файлы, и когда член умирает, локальные файлы не будут доступны в любом случае. Когда задание возобновится, оно начнет читать с текущей позиции и пропустит числа, добавленные, когда задание было закрыто.

Кроме того, поскольку вы используете глобальное агрегирование, данные из всех файлов будут перенаправлены на один элемент кластера, а другие участники будут простаивать.

...