Обновление конвейера потока данных GCP (Apache Beam) с непрерывным шаблоном файла - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть конвейер, подобный этому:

Постоянное чтение CSV-файлов -> изменение их -> сохранение измененных

Шаблон, используемый для непрерывного чтения файлов, взят из моих пользовательских параметров.

Что я хочу сделать, так это уметь изменять шаблон путем обновления конвейера. Но после обновления arg обновляется, но весь конвейер работает со старым arg.

Modified pattern

Unmodified pipeline used pattern

Фрагменты кода:

  static class BigQueryDataPreparatorFn extends DoFn<KV<String, String>, KV<String, String>> {
@ProcessElement
public void processElement(final ProcessContext context) {
  final KV<String, String> element = context.element();
  String beeswaxWinData = element.getValue();
  beeswaxWinData = beeswaxWinData.replace("\\\"", "\"\"");

  final BeeswaxDataflowOptions options =
      context.getPipelineOptions().as(BeeswaxDataflowOptions.class);
  final String key = element.getKey() + "##" + options.getSourcePath();
  context.output(KV.of(key, beeswaxWinData));
}
  }

  static void run(final BeeswaxDataflowOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final PCollection<MatchResult.Metadata> matches =
    pipeline.apply(
        "Read",
        FileIO.match()
            .filepattern(options.getSourcePath() + options.getSourceFilesPattern())
            .continuously(
                Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never()));

matches
    .apply(FileIO.readMatches().withCompression(GZIP))
    .apply(
        Window.<FileIO.ReadableFile>into(
                FixedWindows.of(Duration.standardSeconds(options.getWindowInterval())))
            .accumulatingFiredPanes()
            .withAllowedLateness(Duration.ZERO)
            .triggering(
                Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger())))
    .apply(
        "Uncompress",
        MapElements.into(
                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(
                file -> {
                  final String filePath = file.getMetadata().resourceId().toString();
                  try {
                    return KV.of(filePath, file.readFullyAsUTF8String());
                  } catch (final IOException e) {
                    return KV.of(filePath, "");
                  }
                }))
    .apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn()))
    .apply(
        "Save results",
        FileIO.<String, KV<String, String>>writeDynamic()
            .withCompression(GZIP)
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .withNumShards(options.getShards())
            .to(options.getOutputPath())
            .withTempDirectory(options.getTempLocation())
            .withNaming(AbsoluteNaming::new));

pipeline.run().waitUntilFinish();
  }

Как запустить конвейер:

 ./gradlew clean run -Pargs="--update --runner=DataflowRunner --jobName=beeswax-wins-fixer --appName=beeswax-wins-fixer --workerRegion=europe-west1 --project=ozone-analytics-dev --gcpTempLocation=gs://ozone-dataflows/beeswax-wins-fixer/temp --tempLocation=gs://ozone-dataflows/beeswax-wins-fixer/temp --stagingLocation=gs://ozone-dataflows/beeswax-wins-fixer/staging --shards=5 --outputPath=gs://ozone-beeswax-new/logs/ --sourcePath=gs://ozone-beeswax/logs/ --sourceFilesPattern=wins/YYYY=*/MM=*/dd=*/HH=*/mm=*/*.gz --streaming=true --interval=120 --windowInterval=30 --autoscalingAlgorithm=THROUGHPUT_BASED --maxNumWorkers=5 --numWorkers=2 --region=europe-west2"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...