Агрегирование данных в окне в Apache Beam - PullRequest
1 голос
/ 11 июля 2019

Я получаю поток сложного и вложенного объекта JSON в качестве входных данных для моего конвейера.

Моя цель - создать небольшие пакеты для передачи в другую тему pubsub для последующей обработки.Я борюсь с функцией beam.beam.GroupByKey() - из того, что я прочитал, это правильный метод для объединения.

Упрощенный пример, события ввода:

{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a', 'b', 'c'], url: 'websiteA.com' }
{ data:['a'], url: 'websiteB.com' }

Я пытаюсь создать следующее:

{
'websiteA.com': {a:2, b:2, c:2},
'websiteB.com': {a:1},
}

Моя проблема заключается в попытке сгруппировать что-либо ещечто самый простой кортеж выбрасывает ValueError: too many values to unpack.

Я мог бы выполнить это в два этапа, но из моего чтения использование beam.GroupByKey() стоит дорого и поэтому должно быть сведено к минимуму.

РЕДАКТИРОВАТЬ на основе ответаот @ Cubez.

Это моя функция объединения, которая, кажется, наполовину работает: (

class MyCustomCombiner(beam.CombineFn):
  def create_accumulator(self):
    logging.info('accum_created') #Logs OK!
    return {}

  def add_input(self, counts, input):
    counts = {}
    for i in input:
      counts[i] = 1
    logging.info(counts) #Logs OK!
    return counts

  def merge_accumulators(self, accumulators):
    logging.info('accumcalled') #never logs anything
    c = collections.Counter()
    for d in accumulators:
      c.update(d)
    logging.info('accum: %s', accumulators) #never logs anything
    return dict(c)

  def extract_output(self, counts):
    logging.info('Counts2: %s', counts) #never logs anything
    return counts

Кажется, что в прошлом add_input ничего не вызывается?

Добавление конвейеракод:

with beam.Pipeline(argv=pipeline_args) as p:
    raw_loads_dict = (p 
      | 'ReadPubsubLoads' >> ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)
      | 'JSONParse' >> beam.Map(lambda x: json.loads(x))
    )
    fixed_window_events = (raw_loads_dict
      | 'KeyOnUrl' >> beam.Map(lambda x: (x['client_id'], x['events']))
      | '1MinWindow' >> beam.WindowInto(window.FixedWindows(60))
      | 'CustomCombine' >> beam.CombinePerKey(MyCustomCombiner())
    )
    fixed_window_events | 'LogResults2' >> beam.ParDo(LogResults())

1 Ответ

3 голосов
/ 12 июля 2019

Это прекрасный пример необходимости использования сумматоров . Это преобразования, которые используются для объединения или объединения коллекций между несколькими работниками. Как сказано в документе, CombineFns работает, считывая ваш элемент (beam.CombineFn.add_input), объединяя несколько элементов (beam.CombineFn.merge_accumulators), а затем, наконец, выводит окончательное объединенное значение (beam.CombineFn.extract_output). Смотрите документы Python для родительского класса здесь .

Например, чтобы создать сумматор, который выводит среднее значение для набора чисел, выглядит так:

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

pc = ...
average = pc | beam.CombineGlobally(AverageFn())

Для вашего случая использования я бы предложил что-то вроде этого:

values = [
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a', 'b', 'c'], 'url': 'websiteA.com'},
          {'data':['a'], 'url': 'websiteB.com'}
]

# This counts the number of elements that are the same.
def combine(counts):
  # A counter is a dictionary from keys to the number of times it has
  # seen that particular key.
  c = collections.Counter()
  for d in counts:
    c.update(d)
  return dict(c)

with beam.Pipeline(options=pipeline_options) as p:
  pc = (p
        # You should replace this step with reading data from your
        # source and transforming it to the proper format for below.
        | 'create' >> beam.Create(values)

        # This step transforms the dictionary to a tuple. For this
        # example it returns:
        # [ ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteA.com', 'data':['a', 'b', 'c']),
        #   ('url': 'websiteB.com', 'data':['a'])]
        | 'url as key' >> beam.Map(lambda x: (x['url'], x['data']))

        # This is the magic that combines all elements with the same
        # URL and outputs a count based on the keys in 'data'.
        # This returns the elements:
        # [ ('url': 'websiteA.com', {'a': 2, 'b': 2, 'c': 2}),
        #   ('url': 'websiteB.com', {'a': 1})]
        | 'combine' >> beam.CombinePerKey(combine))

  # Do something with pc
  new_pc = pc | ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...