Как агрегировать данные, используя apache beam api с несколькими ключами - PullRequest
0 голосов
/ 12 ноября 2018

Я новичок в облачной платформе данных Google, а также в Apache beam api. Я хотел бы агрегировать данные на основе нескольких ключей. В моем требовании я получу фид транзакции, имеющий такие поля, как идентификатор клиента, имя клиента, сумма транзакции и тип транзакции. Я хотел бы объединить данные на основе идентификатора клиента и типа транзакции. Вот пример.

customer id,customer name,transction amount,transaction type
cust123,ravi,100,D
cust123,ravi,200,D
cust234,Srini,200,C
cust444,shaker,500,D
cust123,ravi,100,C
cust123,ravi,300,C

O/p should be

cust123,ravi,300,D
cust123,ravi,400,C
cust234,Srini,200,C
cust444,shaker,500,D

В Google большинство примеров основаны на одной клавише, например, группа по одной клавише. Может ли кто-нибудь помочь мне, как мой PTransform выглядит в моем требовании и как производить агрегированные данные вместе с остальными полями.

С уважением, Рави.

1 Ответ

0 голосов
/ 16 ноября 2018

Вот простой способ.Я соединил все ключи вместе, чтобы сформировать один ключ, а затем выполнил подпрограмму, а затем разделил ключ, чтобы организовать вывод так, как вы хотели.Пожалуйста, дайте мне знать, если возникнут вопросы.

Код не ожидает заголовок в файле CSV.Я просто коротко изложил главное, о чем вы спрашиваете.

import apache_beam as beam
import sys

class Split(beam.DoFn):

    def process(self, element):
        """
        Splits each row on commas and returns a tuple representing the row to process
        """
        customer_id, customer_name, transction_amount, transaction_type = element.split(",")
        return [
            (customer_id +","+customer_name+","+transaction_type, float(transction_amount))
        ]

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = 'aggregate.csv'
   output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'

   (p
      | 'ReadFile' >> beam.io.ReadFromText(input)
      | 'parse' >> beam.ParDo(Split())
      | 'sum' >> beam.CombinePerKey(sum)
      | 'convertToString' >>beam.Map(lambda (combined_key, total_balance): '%s,%s,%s,%s' % (combined_key.split(",")[0], combined_key.split(",")[1],total_balance,combined_key.split(",")[2]))
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

будет выводиться, как показано ниже:

cust234,Srini,200.0,C
cust444,shaker,500.0,D
cust123,ravi,300.0,D
cust123,ravi,400.0,C
...