Невозможно передать FileIO.Write / WriteFiles / WriteShardedBundlesToTempFiles / GroupIntoShards - PullRequest
0 голосов
/ 23 марта 2020

I Реализован конвейер, который читает JSON, который содержит AWS S3 Key, из Pub / Sub; и загрузите объект S3, состоящий из новой строки JSON (ND Json); затем запишите ND Json в файлы в GCS с помощью управления окнами и GroupByKey.

Очевидно, что это потоковая передача с обработкой окна 10 секунд и AfterPane.elementCountAtLeast(1000). Однако, управление окнами не было разделено и нет файлы в GCS . Кроме того, произошло исключение Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = 4295/4946/4946 MB, GC last/max = 98.54/98.54 %, #pushbacks=1, gc thrashing=true. Heap dump not written..

На приведенном ниже рисунке представлен график работы моего потока данных. GroupIntoShards не выводит данные. Я не могу понять, почему это так. Кроме того, это произошло несвязанным множеством. Другими словами, результат будет хорошим, если число сообщений Pub / Sub несколько (возможно, этого достаточно, чтобы G C не возникало). Причиной может быть закрытие окна, потому что подписка Pub / Sub не содержит никаких непрочитанных сообщений. Но, если сообщение было помещено непрерывно, GroupIntoShards не работает и в GCS нет файлов. введите описание изображения здесь

Следующее является частью моего источника:

В основном классе,

public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> inputed =
        .apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
        .apply("NDJson Divider", ParDo.of(new NDJsonDivider(options.getSecretId())));

    inputed
        .apply(options.getWindowDuration() + " Window",
            Window.<String>into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))
                .triggering(
                    Repeatedly.forever(
                        AfterFirst.of(
                            AfterWatermark.pastEndOfWindow(),
                            AfterPane.elementCountAtLeast(1000),
                            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils.parseDuration(options.getWindowDuration())))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardDays(2))
        )
        .apply(FileIO.<String, String>writeDynamic()
            .by(new TableRowPartitionContextFn())
            .via(TextIO.sink())
            .to(options.getOutputDirectory())
            .withNaming(PartitionedFileNaming::new)
            .withNumShards(options.getNumShards())
            .withDestinationCoder(StringUtf8Coder.of())
            .withCompression(Compression.GZIP)
        );

    return pipeline.run();
  }

В NDJsonDivider классе ,


  @ProcessElement
  public void processElement(ProcessContext c) {
    JsonObject msg;
    try {
      msg = new JsonParser().parse(new String(c.element().getPayload())).getAsJsonObject();
    } catch (JsonSyntaxException | IllegalStateException e) {
      LOG.error(e);
      return;
    }

    if (msg.has(RECORDS) && msg.get(RECORDS).isJsonArray()) {
      JsonArray records = msg.get(RECORDS).getAsJsonArray();
      try (BufferedReader reader = processPutEventMessage(records)) {
        if (reader == null) {
          return;
        }
        String line;
        while (Objects.nonNull(line = reader.readLine())) {
          c.outputWithTimestamp(line, Instant.now());
        }
      } catch (IOException e) {
        LOG.error("Failed to process Put Event Message: ", e);
      }
    }
  }

  private BufferedReader processPutEventMessage(JsonArray records) {
    try {
      putEventMessage s3Obj = extractS3ObjectInfo(records);
      GCPBridge.getInstance().setSecretId(this.secretId);
      return S3.getReader(s3Obj.region, s3Obj.bucket, s3Obj.key, "UTF-8");
    } catch (IllegalArgumentException | AWSS3Exception | NoSuchKeyException e) {
      LOG.error("Failed to access S3:", e);
    }
    return null;
  }

В TableRowPartitionContextFn классе,

class TableRowPartitionContextFn implements SerializableFunction<String, String> {
  @Override
  public String apply(String e) {
    JsonObject data = new JsonParser().parse(e).getAsJsonObject();
    String Id = "", Type = "";
    if (data.has("id")) {
      Id = data.get("id").getAsString();
    }
    if (data.has("type")) {
      Type = data.get("type").getAsString();
    }
    return Id + "/" + Type;
  }
}
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner

Длительность устанавливается по умолчанию @ Default.String ("10s") String getWindowDuration ();

1 Ответ

1 голос
/ 24 марта 2020

Я подозреваю, что комбинация Repeatedly.forever() с .accumulatingFiredPanes() сохранит буфер данных навсегда, что в конечном итоге приведет к ошибке нехватки памяти.

Пожалуйста, попробуйте использовать discardingFiredPanes () , который не будет удерживать элементы от предыдущих срабатываний панели.

Это будет означать, что только новые элементы, которые поступают, будут излучаться в панели при запуске. Не похоже, что вы выполняете агрегирование / комбинирование результатов со всеми значениями на панели. Похоже, что вы просто пишете их для вывода напрямую, поэтому я не думаю, что вам нужно накапливать FiredPanes.

Если вы хотите вычислить какую-то агрегированную статистику c или значение на основе всех всех элементов в Окно (например, среднее значение, сумма и т. Д. c.). Тогда я бы посоветовал использовать Combiner после окна, чтобы накапливать результат.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...