Балка: запись по количеству элементов окна с границами окна - PullRequest
0 голосов
/ 26 ноября 2018

Для простого подтверждения концепции я пытаюсь отобразить данные о кликах в двухминутных окнах.Все, что я хочу сделать, это распечатать счетчик для каждого окна вместе с границами окон в BigQuery.При запуске моего конвейера я получаю следующую ошибку:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"windowend","message":"This field is not a record.","reason":"invalid"}],"index":0}]

Конвейер выглядит так:

// Creating the pipeline
Pipeline p = Pipeline.create(options);

// Window items
PCollection<TableRow> counts = p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("AddEventTimestamps", WithTimestamps.of(TotalCountPipeline::ExtractTimeStamp).withAllowedTimestampSkew(Duration.standardDays(10000)))
        .apply("Window", Window.<String>into(
                FixedWindows.of(Duration.standardHours(options.getWindowSize())))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.standardDays(10000))
                .accumulatingFiredPanes())
        .apply("CalculateSum", Combine.globally(Count.<String>combineFn()).withoutDefaults())
        .apply("BigQueryFormat", ParDo.of(new FormatCountsFn()));

// Writing to BigQuery
counts.apply("WriteToBigQuery",BigQueryIO.writeTableRows()
                .to(options.getOutputTable())
                .withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

// Execute pipeline
p.run().waitUntilFinish();

Я предполагаю, что это как-то связано с функцией форматирования BigQuery,который реализован следующим образом:

static class FormatCountsFn extends DoFn<Long, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c, BoundedWindow window) {
        TableRow row =
                new TableRow()
                        .set("windowStart", window.maxTimestamp().toDateTime())
                        .set("count", c.element().intValue());
        c.output(row);
    }
}

Как вдохновлено этим постом .Может кто-нибудь пролить некоторый свет на это?Кажется, я не могу обойти это.

1 Ответ

0 голосов
/ 26 ноября 2018

Видимо, ответ на этот вопрос не имел ничего общего с оконным управлением лучом и был связан исключительно с BigQuery.Для записи объекта DateTime в строку BigQuery требуется строка в надлежащем формате гггг-мм-дд ЧЧ: мм: сс, в отличие от объекта DateTime, который я предоставлял.

...