Я сейчас борюсь с шаблоном конвейера. У меня нет такого поведения, когда входной файл gzip или нет.
Вот рабочие журналы для файла CSV:
Splitting filepattern gs://bucket/file.csv into bundles of size 22369621 took 125 ms and produced 1 files and 128 bundles
Splitting source gs://bucket/file.csv into bundles of estimated size 22369621 bytes produced 128 bundles. Rebundling into 100 bundles.
Splitting source gs://bucket/file.csv produced 100 bundles with total serialized response size 187328
Тогда источник правильно разделен и задание автоматически масштабируется до нужного количества рабочих для этой задачи.
Теперь вот журнал для того же файла, который теперь распакован:
Splitting filepattern gs://bucket/file.csv.gz into bundles of size 22369621 took 137 ms and produced 1 files and 1 bundles
Splitting source gs://bucket/file.csv.gz produced 1 bundles with total serialized response size 1925
А потом они появляются Через несколько секунд / минут после того, как данные начнут проходить:
Proposing dynamic split of work unit dev01-processing;2020-02-21_03_30_25-2364220079620552332;7684124696241385390 at {"fractionConsumed":0.5126594305038452}
Rejecting split request because custom reader returned null residual source.
Кажется, он не может масштабироваться, возможно, потому что размер файла не соответствует ожидаемому.
Вот код:
ValueProvider<String> input = options.getInput();
Pipeline p = Pipeline.create(options);
PCollection<String> csvLines = p.apply(TextIO.read().from(input)/*.withCompression(Compression.GZIP)*/);
...
Мне просто нужно удалить комментарии, чтобы создать шаблон с версией "gzip input".
Есть ли что-то, что я делаю неправильно, что мешает работе автомасштабирование с помощью ввода gzip?