Вот простой способ.Я соединил все ключи вместе, чтобы сформировать один ключ, а затем выполнил подпрограмму, а затем разделил ключ, чтобы организовать вывод так, как вы хотели.Пожалуйста, дайте мне знать, если возникнут вопросы.
Код не ожидает заголовок в файле CSV.Я просто коротко изложил главное, о чем вы спрашиваете.
import apache_beam as beam
import sys
class Split(beam.DoFn):
def process(self, element):
"""
Splits each row on commas and returns a tuple representing the row to process
"""
customer_id, customer_name, transction_amount, transaction_type = element.split(",")
return [
(customer_id +","+customer_name+","+transaction_type, float(transction_amount))
]
if __name__ == '__main__':
p = beam.Pipeline(argv=sys.argv)
input = 'aggregate.csv'
output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
(p
| 'ReadFile' >> beam.io.ReadFromText(input)
| 'parse' >> beam.ParDo(Split())
| 'sum' >> beam.CombinePerKey(sum)
| 'convertToString' >>beam.Map(lambda (combined_key, total_balance): '%s,%s,%s,%s' % (combined_key.split(",")[0], combined_key.split(",")[1],total_balance,combined_key.split(",")[2]))
| 'write' >> beam.io.WriteToText(output_prefix)
)
p.run().wait_until_finish()
будет выводиться, как показано ниже:
cust234,Srini,200.0,C
cust444,shaker,500.0,D
cust123,ravi,300.0,D
cust123,ravi,400.0,C