Задание потока данных читает из PubSub (неограниченный источник) для настроенного окна, а затем записывает файлы в облачное хранилище. Я вижу, что несколько файлов пишутся. Как я могу ограничить это меньшее число. Я также пробовал .withNumShards () во время записи, но все же создается несколько файлов. Почему это происходит, и как я могу ограничить запись только настроенного или меньшего количества файлов. Скажем, мой случай использования: я запускаю работу один раз в день, а затем прекращаю ее. Время настройки окна составляет 8 часов. При этом задание по-прежнему состоит из нескольких файлов (может быть 20 или более) для одного ежедневного запуска.
Пример кода:
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> events =
pipeline.apply(PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
// Windowing
.apply(options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration("8h"))));
// Conversion from PubSub message payload to String using a ParDo
PCollection<String> strMsg = events.apply("To String", ParDo.of(new Extractor()));
// Windowed writes
strMsg.apply("Write File(s)",
TextIO.write().withWindowedWrites()
.to(new WindowedFilenamePolicy(options.getOutputDirectory(), options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(), options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(options.getOutputDirectory(),
(SerializableFunction<String, ResourceId>) input -> FileBasedSink
.convertToFileResourceIfPossible(input)))
.withNumShards(options.getNumShards()));
return pipeline.run();