elementCountAtLeast является нижней границей, поэтому для бегуна допустимо использовать только одну панель.
При выполнении этого для пакетного конвейера у вас есть несколько вариантов:
- Позвольте бегуну решать, какой размер файла и сколько осколков записано:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
pipe.saveAsCustomOutput("writer",out)
Обычно это самый быстрый вариант, когда TextIO имеет GroupByKey или источник, который поддерживает расщепление, которое предшествует ему. Насколько мне известно, JDB C не поддерживает разбиение, поэтому лучше всего добавить Reshuffle после jdbcSelect , что позволит распараллеливать обработку после чтения данных из базы данных.
Вручную сгруппировать в пакеты, используя преобразование
GroupIntoBatches .
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
.apply(GroupIntoBatches.ofSize(500))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
pipe.saveAsCustomOutput("writer",out)
В общем, это будет медленнее, чем опция # 1, но она позволит вам выбрать, сколько записи записываются для каждого файла.
Есть несколько других способов сделать это со своими плюсами и минусами, но вышеупомянутые два, вероятно, наиболее близки к тому, что вы хотите. Если вы добавите более подробную информацию к своему вопросу, я могу еще раз пересмотреть этот вопрос.