Я создал потоковый конвейер луча Apache, который считывает файлы из папок GCS и вставляет их в BigQuery, он отлично работает, но он перерабатывает все файлы, когда я останавливаю и запускаю задание, поэтому все данные будут реплицированы снова.
Так что моя идея состоит в том, чтобы переместить файлы из отсканированного каталога в другой, но я не знаю, как технически это сделать с помощью Apache Beam.
Спасибо
public static PipelineResult run(Options options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Read from the text source.
* 2) Write each text record to Pub/Sub
*/
LOG.info("Running pipeline");
LOG.info("Input : " + options.getInputFilePattern());
LOG.info("Output : " + options.getOutputTopic());
PCollection<String> collection = pipeline
.apply("Read Text Data", TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply("Write logs", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(c.element());
c.output(c.element());
}
}));
collection.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}