Как уведомить о завершении работы DataFlow - PullRequest
0 голосов
/ 28 июня 2018

Я хочу знать о GAE, когда задание потока данных будет завершено.

пытаюсь сделать следующий оба конвейера

1

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

2.

 | 'write to bigquery' >> beam.io.WriteToBigQuery(...)
 | 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

Но оба приведенных выше кода выдают следующую ошибку:

AttributeError: у объекта 'PDone' нет атрибута 'windowing'

Как сделать процедуру после WriteToBigquery?

примечание: Я выполняю поток данных, используя шаблон через REST. Таким образом, нельзя использовать pipeline_result.wait_until_finish().

Edit.

Полный стек здесь.

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
   vital_data_export()
 File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
   result = p.run()
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 382, in run
   return self.runner.run_pipeline(self)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 285, in run_pipeline
   return_context=True)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 580, in to_runner_api
   root_transform_id = context.transforms.get_id(self._root_transform())
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 810, in to_runner_api
   for part in self.parts],
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in to_runner_api
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pipeline.py", line 814, in <dictcomp>
   for tag, out in self.named_outputs().items()},
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\runners\pipeline_context.py", line 60, in get_id
   self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 144, in to_runner_api
   self.windowing))
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\pvalue.py", line 128, in windowing
   self.producer.inputs)
 File "<myPC_DIRPATH>\webapi-dataflow\venv\dstobq_test\lib\site-packages\apache_beam\transforms\ptransform.py", line 443, in get_windowing
   return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

Ответы [ 2 ]

0 голосов
/ 01 октября 2018

В java это то, что я сделал, чтобы опубликовать событие «done» в PubSub в конце конвейера потока данных, где выходные данные конвейера записывают в BigQuery. Надеюсь, в Python есть аналог.

PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
    // Transforms each row inserted to an Integer of value 1
    .apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(Integer.valueOf(1));
        }
    }))
    // https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
    // Combines a PCollection of Integers (all 1's) by summing them. 
    // Outputs a PCollection of one integer element with the sum
    .apply("SumInsertedCounts", Sum.integersGlobally())
    .apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = "pipeline_completed";
            Map<String, String> attributes = new HashMap<>();
            attributes.put("rows_written", c.element().toString());
            PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
            c.output(message);
        }
    }))
    .apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));
0 голосов
/ 07 сентября 2018

ВЫ НЕ МОЖЕТЕ

Очевидно, что PDone - это последняя стадия вашего конвейера, и применять ожидание готовности для этого не нужно.

PInput и PDone - это классы, поддерживаемые Apache Beam, которые указывают источник и приемник соответственно. Если вы пытаетесь выполнить что-то после записи BigQuery, это невозможно, если вы последовательно не запускаете два разных задания потока данных.

Если вы хотите использовать их последовательно, обратитесь к Apache Airflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...