Объедините BigQuery и Pub / Sub Apache Beam - PullRequest
0 голосов
/ 05 сентября 2018

Я пытаюсь сделать следующее, используя DataFlowRunner:

  1. Чтение данных из многораздельной таблицы BigQuery (много данных, но только за последние два дня)
  2. Чтение JSON из подписки на паб / подписку
  3. Объедините обе коллекции на общем ключе
  4. Вставить объединенные коллекции в другую таблицу BigQuery

Я довольно плохо знаком с Apache Beam, поэтому не уверен на 100%, что то, что я хочу сделать, возможно.

Моя проблема возникает, когда я пытаюсь объединить обе строки, после использования преобразования CoGroupByKey кажется, что данные никогда не поступают в одно и то же время, хотя стратегия управления окнами одинакова (фиксированное окно 30 секунд, триггер конца окна и отказ от запуска стекла).

Некоторые соответствующие фрагменты моего кода:

    /* Getting the data and windowing */
    PCollection<PubsubMessage> pubsub = p.apply("ReadPubSub sub",PubsubIO.readMessages().fromSubscription(SUB_ALIM_REC));

    String query = /* The query */
    PCollection<TableRow> bqData = p.apply("Reading BQ",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardSeconds(30)))
                    .triggering(AfterWatermark.pastEndOfWindow())
                    .withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes());        

    PCollection<TableRow> tableRow = pubsub.apply(Window.<PubsubMessage>into(FixedWindows.of(Duration.standardSeconds(120)))
            .triggering(AfterWatermark.pastEndOfWindow())
            .withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes())
            .apply("JSON to TableRow",ParDo.of(new ToTableRow()));



    /*  Join code   */  
    PCollection<TableRow> finalResultCollection =
                kvpCollection.apply("Join TableRows", ParDo.of(
                        new DoFn<KV<Long, CoGbkResult>,  TableRow>() {
                            private static final long serialVersionUID = 6627878974147676533L;

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        KV<Long, CoGbkResult> e = c.element();
                        Long idPaquete = e.getKey();
                        Iterable<TableRow> it1 = e.getValue().getAll(packTag);
                        Iterable<TableRow> it2 = e.getValue().getAll(alimTag);
                        for(TableRow t1 : itPaq) {
                            for (TableRow t2 : itAlimRec) {
                                TableRow joinedRow = new TableRow();
                                /* set the required fields from each collection */
                                c.output(joinedRow);
                            }

                        }
                    }
                }));

Также в последние два дня я получаю эту ошибку:

java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2808d228
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: BigQuery source must be split before being read
        org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.createReader(BigQuerySourceBase.java:153)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:463)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:442)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:293)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:286)
        com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
        com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

Буду очень признателен за ваше руководство, чтобы узнать, возможно ли то, что я пытаюсь сделать, или есть альтернатива для решения этого сценария.

...