Конвейер потока данных, запускающий NotImplementedError [при выполнении «Filter Status 1»] - PullRequest
1 голос
/ 29 сентября 2019

Мой конвейер имеет следующий простой ввод JSON

{"mac": "KC:FC:48:AE:F6:94", "status": 8, "datetime": "2015-07-13T21:15:02Z"}

Вывод в основном должен идти в таблицу BigQuery с 3 столбцами (mac, status и datetime) с соответствующими значениями

MyКонвейер выглядит следующим образом:

# -*- coding: utf-8 -*-
import os, json, logging, argparse, datetime, apache_beam as beam
from google.cloud import error_reporting
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

GOOGLE_PUBSUB_CHANNEL = 'projects/project-name/topics/topic-name'
GOOGLE_BIGQUERY_TABLE = 'bq-table'
GOOGLE_DATASET_ID = 'bq-dataset'
GOOGLE_PROJECT_ID = 'project-name'

class GoogleBigQuery():

    client_error = error_reporting.Client()


    @staticmethod
    def get_schema_table(schema):
        bigquery_schema = []
        for key in range(len(schema)):
            bigquery_schema.append('{}:{}'.format(schema[key].get('bigquery_field_name'), schema[key].get('bigquery_field_type')))

        return ','.join(bigquery_schema)

fields_contract = (
      { 'bigquery_field_name': 'datetime', 'bigquery_field_type': 'STRING' },
      { 'bigquery_field_name': 'mac', 'bigquery_field_type': 'STRING' },
      { 'bigquery_field_name': 'status', 'bigquery_field_type': 'INTEGER' }
)

def parse_pubsub(line):
    record = json.loads(line)
    logging.info(record)
    return record

class FilterStatus1(beam.DoFn):
  def status_filter_1(self, data):
    for r in data:
      print(r)
      logging.info(r)
      if r["status"] == 1:
        print(r)
        logging.info(r)
        yield r

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_parameters = [
        '--runner', 'DirectRunner'
        , '--staging_location', 'gs://bucket/staging'
        , '--temp_location', 'gs://bucket/temp'
        , '--autoscaling_algorithm', 'THROUGHPUT_BASED' #'NONE' to disable autoscaling
        , '--num_workers', '1'
        , '--max_num_workers', '2'
        , '--disk_size_gb', '30'
        , '--worker_machine_type', 'n1-standard-1'
    ]

    pipeline_options = PipelineOptions(pipeline_parameters)
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline_options.view_as(GoogleCloudOptions).job_name = os.path.basename(__file__).split('.')[0].replace('_', '-')
    pipeline_options.view_as(GoogleCloudOptions).project = GOOGLE_PROJECT_ID

    with beam.Pipeline(options=pipeline_options, argv=pipeline_parameters) as p:
        # Read the pubsub topic into a PCollection.
        lines = (
                    p
                    | 'ReadPubSubMessage' >> beam.io.ReadFromPubSub(GOOGLE_PUBSUB_CHANNEL).with_output_types(bytes)
                    | 'Decode UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
                    | 'ParsePubSub' >> beam.Map(parse_pubsub)
        )

        (
                    lines | 'Filter Status 1' >> beam.ParDo(FilterStatus1())
                    | 'WriteToBigQueryStatus1' >> beam.io.WriteToBigQuery(
                                                 GOOGLE_BIGQUERY_TABLE
                                                 , project=GOOGLE_PROJECT_ID
                                                 , dataset=GOOGLE_DATASET_ID
                                                 , schema=GoogleBigQuery.get_schema_table(fields_contract)
                                                 , create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                                 , write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                 #, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                             )
          )
        logging.info('Pipeline finished')

        result = p.run()
        result.wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Я получаю следующую ошибку:

RuntimeError: NotImplementedError [while running 'Filter Status 1']

Моя цель здесь состоит в том, чтобы отфильтровать столбец состояния и, когда значение равно 1, передать его вBQ.

Заранее спасибо за помощь.

Ответы [ 2 ]

1 голос
/ 29 сентября 2019

Вы можете попробовать подход фильтрации, используя FlatMap, чтобы сделать такие вещи.

Сначала определите метод фильтрации:

def FilterStatus1(row):
   if row["status"] == 1:
      yield row

Затем вы можете применить как:

lines = lines | beam.FlatMap(FilterStatus1) | 'WriteToBigQueryStatus1' ...

Также попробуйте разбить код на куски или явно назначенные шаги.Это гигантское преобразование, отображения и фильтрации, происходящие в одной строке, обычно превращают ваш код в черный ящик.

Надеюсь, это поможет.Спасибо.

0 голосов
/ 29 сентября 2019

Я исправил свой код таким образом

class FilterStatus1(beam.DoFn):
  def process(self, data):
    if data["status"] == 1:
      result = [{"datetime":data["datetime"], "mac":data["mac"], "status":data["status"]}]
      logging.info(result)
      return result
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...