Для простого подтверждения концепции я пытаюсь отобразить данные о кликах в двухминутных окнах.Все, что я хочу сделать, это распечатать счетчик для каждого окна вместе с границами окон в BigQuery.При запуске моего конвейера я получаю следующую ошибку:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"windowend","message":"This field is not a record.","reason":"invalid"}],"index":0}]
Конвейер выглядит так:
// Creating the pipeline
Pipeline p = Pipeline.create(options);
// Window items
PCollection<TableRow> counts = p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("AddEventTimestamps", WithTimestamps.of(TotalCountPipeline::ExtractTimeStamp).withAllowedTimestampSkew(Duration.standardDays(10000)))
.apply("Window", Window.<String>into(
FixedWindows.of(Duration.standardHours(options.getWindowSize())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(10000))
.accumulatingFiredPanes())
.apply("CalculateSum", Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply("BigQueryFormat", ParDo.of(new FormatCountsFn()));
// Writing to BigQuery
counts.apply("WriteToBigQuery",BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
// Execute pipeline
p.run().waitUntilFinish();
Я предполагаю, что это как-то связано с функцией форматирования BigQuery,который реализован следующим образом:
static class FormatCountsFn extends DoFn<Long, TableRow> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row =
new TableRow()
.set("windowStart", window.maxTimestamp().toDateTime())
.set("count", c.element().intValue());
c.output(row);
}
}
Как вдохновлено этим постом .Может кто-нибудь пролить некоторый свет на это?Кажется, я не могу обойти это.