Почему карта работает, а ParDo нет? - PullRequest
0 голосов
/ 08 июня 2019

Я пытаюсь выяснить разницу в производительности между Map и ParDo, но я не могу как-то запустить метод ParDo

Я уже пытался найти некоторые ресурсы, которые пытаются решить проблему, но я не нашел ни одного

Метод ParDo (это не работает):

class ci(beam.DoFn):
  def compute_interest(self,data_item):
    cust_id, cust_data = data_item
    if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
      new_data = {}
      new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
      new_data.update(cust_data['account'][0])
      new_data.update(cust_data['basic'][0])
      del new_data['cur_bal']
      return new_data

Метод карты (это работает):

def compute_interest(data_item):
  cust_id, cust_data = data_item
  if(cust_data['basic'][0]['acc_opened_date']=='2010-10-10'):
    new_data = {}
    new_data['new_balance'] = (cust_data['account'][0]['cur_bal'] * cust_data['account'][0]['roi']) / 100
    new_data.update(cust_data['account'][0])
    new_data.update(cust_data['basic'][0])
    del new_data['cur_bal']
    return new_data

ОШИБКА:

повышение NotImplementedError RuntimeError: NotImplementedError [при выполнении «ИМЯ ТРУБОПРОВОДА»]

1 Ответ

3 голосов
/ 08 июня 2019

Beam.DoFn ожидает метод process вместо:

def process(self, element):

Как описано в разделе 4.2.1.2 Руководства по программированию луча :

Внутри вашего подкласса DoFn вы напишите процесс-метод, в котором вы предоставите реальную логику обработки. Вам не нужно вручную извлекать элементы из входной коллекции; Beam SDK справится с этим за вас. Ваш метод процесса должен принимать объект типа element. Это входной элемент, и вывод генерируется с помощью оператора yield или return внутри метода процесса.

В качестве примера мы определим функции Map и ParDo:

def compute_interest_map(data_item):
  return data_item + 1

class compute_interest_pardo(beam.DoFn):
  def process(self, element):
    yield element + 2

Если вы измените process на другое имя метода, вы получите NotImplementedError.

А магистральный трубопровод будет:

events = (p
  | 'Create' >> beam.Create([1, 2, 3]) \
  | 'Add 1' >> beam.Map(lambda x: compute_interest_map(x)) \
  | 'Add 2' >> beam.ParDo(compute_interest_pardo()) \
  | 'Print' >> beam.ParDo(log_results()))

Выход:

INFO:root:>> Interest: 4
INFO:root:>> Interest: 5
INFO:root:>> Interest: 6

код

...