I Реализован конвейер, который читает JSON, который содержит AWS S3 Key, из Pub / Sub; и загрузите объект S3, состоящий из новой строки JSON (ND Json); затем запишите ND Json в файлы в GCS с помощью управления окнами и GroupByKey.
Очевидно, что это потоковая передача с обработкой окна 10 секунд и AfterPane.elementCountAtLeast(1000).
Однако, управление окнами не было разделено и нет файлы в GCS . Кроме того, произошло исключение Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = 4295/4946/4946 MB, GC last/max = 98.54/98.54 %, #pushbacks=1, gc thrashing=true. Heap dump not written.
.
На приведенном ниже рисунке представлен график работы моего потока данных. GroupIntoShards
не выводит данные. Я не могу понять, почему это так. Кроме того, это произошло несвязанным множеством. Другими словами, результат будет хорошим, если число сообщений Pub / Sub несколько (возможно, этого достаточно, чтобы G C не возникало). Причиной может быть закрытие окна, потому что подписка Pub / Sub не содержит никаких непрочитанных сообщений. Но, если сообщение было помещено непрерывно, GroupIntoShards
не работает и в GCS нет файлов. введите описание изображения здесь
Следующее является частью моего источника:
В основном классе,
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<String> inputed =
.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("NDJson Divider", ParDo.of(new NDJsonDivider(options.getSecretId())));
inputed
.apply(options.getWindowDuration() + " Window",
Window.<String>into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterWatermark.pastEndOfWindow(),
AfterPane.elementCountAtLeast(1000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils.parseDuration(options.getWindowDuration())))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(2))
)
.apply(FileIO.<String, String>writeDynamic()
.by(new TableRowPartitionContextFn())
.via(TextIO.sink())
.to(options.getOutputDirectory())
.withNaming(PartitionedFileNaming::new)
.withNumShards(options.getNumShards())
.withDestinationCoder(StringUtf8Coder.of())
.withCompression(Compression.GZIP)
);
return pipeline.run();
}
В NDJsonDivider
классе ,
@ProcessElement
public void processElement(ProcessContext c) {
JsonObject msg;
try {
msg = new JsonParser().parse(new String(c.element().getPayload())).getAsJsonObject();
} catch (JsonSyntaxException | IllegalStateException e) {
LOG.error(e);
return;
}
if (msg.has(RECORDS) && msg.get(RECORDS).isJsonArray()) {
JsonArray records = msg.get(RECORDS).getAsJsonArray();
try (BufferedReader reader = processPutEventMessage(records)) {
if (reader == null) {
return;
}
String line;
while (Objects.nonNull(line = reader.readLine())) {
c.outputWithTimestamp(line, Instant.now());
}
} catch (IOException e) {
LOG.error("Failed to process Put Event Message: ", e);
}
}
}
private BufferedReader processPutEventMessage(JsonArray records) {
try {
putEventMessage s3Obj = extractS3ObjectInfo(records);
GCPBridge.getInstance().setSecretId(this.secretId);
return S3.getReader(s3Obj.region, s3Obj.bucket, s3Obj.key, "UTF-8");
} catch (IllegalArgumentException | AWSS3Exception | NoSuchKeyException e) {
LOG.error("Failed to access S3:", e);
}
return null;
}
В TableRowPartitionContextFn
классе,
class TableRowPartitionContextFn implements SerializableFunction<String, String> {
@Override
public String apply(String e) {
JsonObject data = new JsonParser().parse(e).getAsJsonObject();
String Id = "", Type = "";
if (data.has("id")) {
Id = data.get("id").getAsString();
}
if (data.has("type")) {
Type = data.get("type").getAsString();
}
return Id + "/" + Type;
}
}
mvn -Pdataflow-runner compile exec:java -Dexec.mainClass=com.dev.playground -Dexec.args="--project=PROJECTID --inputTopic=projects/PROJECTID/topics/dev --outputDirectory=gs://dev/playground/output/ --secretId=dev --tempLocation=gs://dev/playground/tmp/ --runner=DataflowRunner
Длительность устанавливается по умолчанию @ Default.String ("10s") String getWindowDuration ();