Потребление файлов в многоузловой настройке hazelcast - PullRequest
1 голос
/ 11 марта 2020

Я вижу примеры, где CSV-файлы могут использоваться с использованием jet, например.

BatchSource<SalesRecordLine> source = Sources.filesBuilder(sourceDir)
             .glob("*.csv")
             .build(path -> Files.lines(path).skip(1).map(SalesRecordLine::parse));

В многоузловой установке все узлы начнут брать файл (скажем, совместно используемый NFS) или он использует некоторая умная блокировка (например, Apache Идемпотентный метод потребителя файлов Camel?). Как Jet узнает, что файл был полностью записан на диск перед чтением?

спасибо

Ответы [ 2 ]

0 голосов
/ 12 марта 2020

Вы можете разместить файл только на одном узле и заставить Jet распространять данные среди всех участников. В настоящее время в Jet отсутствует первоклассная поддержка перебалансировки потоков, но вы можете добиться этого в несколько обходном порядке:

pipeline.readFrom(source)
        .groupingKey(x -> x)
        .mapStateful(() -> null, (state, key, item) -> item)
        .restOfYourPipeline();

groupingKey(x -> x) определяет функцию разделения. Я использовал простую функцию идентификации, но вы можете поместить что-нибудь еще, что имеет смысл для ваших данных.

0 голосов
/ 11 марта 2020

Если вы используете NFS, установите для свойства sharedFileSystem значение true:

BatchSource<SalesRecordLine> source = Sources.filesBuilder(sourceDir)
    .glob("*.csv")
    .sharedFileSystem(true)
    .build(path -> Files.lines(path).skip(1).map(SalesRecordLine::parse));

Из метода javado c:

Устанавливает, если файлы находятся в общем хранилище, видимом для всех участников. Значением по умолчанию является false. Если sharedFileSystem имеет значение true, Jet будет считать, что все участники видят одинаковые файлы. Они разделят работу так, чтобы каждый участник прочитал часть файлов. Если sharedFileSystem имеет значение false, каждый участник будет читать все файлы в каталоге, предполагая, что они являются локальными.

Для источника пакета Jet предполагает, что файлы не изменяются во время чтения. Если это так, результат не определен.

Если вы хотите отслеживать файлы по мере их записи, используйте FileSourceBuilder.buildWatcher() вместо build() - это создаст потоковое задание. Но наблюдатель обрабатывает только строки, добавленные с момента начала работы. Опять же, если файлы изменены каким-либо другим способом, кроме добавления в конце, результат не определен. Например, многие текстовые редакторы удаляют и записывают весь файл, даже если вы просто добавили строку в конце - для тестирования проще всего использовать

echo "text" >> your_file"
...