Я использую Apache Beam 2.7.0, который я видел в Матрице возможностей (https://beam.apache.org/documentation/runners/capability-matrix/) и Flink Runner поддерживают Splittable DoFn, но это не работает
Я хочу непрерывно обрабатывать CSV-файлы с помощьюTextIO watchForNewFiles У меня есть 16 файлов для проверки конвейера. Я помещаю в каталог Input все файлы и запускаю свою конвейерную линию. Я передаю префикс моих файлов в функции из TextIO.read (). Из
, однако, когдая запускаю код с прямым Runner Работает очень хорошо, но когда я запускаю код с flink runner, обрабатываю только часть файла, он не обрабатывает все файлы
Есть ли обходной путь для этой проблемы?
мой код
p.apply("ReadLines", TextIO.read().from("prefixFileCSV*").watchForNewFiles(Duration.standardSeconds(30),Watch.Growth.<String>never()))
.apply("FixedWindowsTeam",
Window.<InfoMOC>into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.discardingFiredPanes().withAllowedLateness(Duration.ZERO))
.apply("Write File(s)", TextIO.write()
.to(options.getOutputDirectory())
.withWindowedWrites()
.withNumShards(1)