Ответ на вопросы Q1 и Q2:
Вы ищете rollingAggregate
, вам не нужны метки времени или окна.
pipe
.drawFrom(Sources.fileWatcher(<dir>))
.rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
.drainTo(Sinks.logger());
Ответ на вопросы Q3 и Q4: источник fileWatcher
не является отказоустойчивым. Причина в том, что он читает локальные файлы, и когда член умирает, локальные файлы не будут доступны в любом случае. Когда задание возобновится, оно начнет читать с текущей позиции и пропустит числа, добавленные, когда задание было закрыто.
Кроме того, поскольку вы используете глобальное агрегирование, данные из всех файлов будут перенаправлены на один элемент кластера, а другие участники будут простаивать.