У меня есть корзина, в которую я ежедневно вставляю огромные текстовые файлы (около 300 миллионов строк) в формате GZIP.Каждый раз, когда в этот блок добавляется новый файл, мне нужно прочитать каждую строку и обработать его в конвейере потока данных.Для этого я реализовал потоковый конвейер в Java со следующим фрагментом кода
p.apply("Read new file", TextIO.read()
.from("gs://my_bucket/*")
.withCompression(Compression.GZIP)
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()))
Мои текстовые файлы хранятся в облачном хранилище Google со следующими метаданными:
- Content-Type: text / plain
- ContentEncoding: gzip
Проблема заключается в следующем: при добавлении нового документа TextIO читает бесконечное количество строк из этого файла, как мы можем видеть на следующем рисунке (я остановил процесс после отправки 500 миллионов строк TextIO):
Мои вопросыявляются: почему это происходит?Где ошибка в моей части кода?