Я все еще очень плохо знаком с концепциями Apache Beam и пытаюсь запустить задание в потоке данных Google со следующим потоком процессов:
По сути, используется один источник данных, выполняется фильтрация на основе определенных значений в словаре и создаются отдельные выходные данные для каждого критерия фильтрации.
Я написал следующий код:
# List of values to filter by
x_list = [1, 2, 3]
with beam.Pipeline(options=PipelineOptions().from_dictionary(pipeline_params)) as p:
# Read in newline JSON data - each line is a dictionary
log_data = (
p
| "Create " + input_file >> beam.io.textio.ReadFromText(input_file)
| "Load " + input_file >> beam.FlatMap(lambda x: json.loads(x))
)
# For each value in x_list, filter log_data for dictionaries containing the value & write out to separate file
for i in x_list:
# Return dictionary if given key = value in filter
filtered_log = log_data | "Filter_"+i >> beam.Filter(lambda x: x['key'] == i)
# Do additional processing
processed_log = process_pcoll(filtered_log, event)
# Write final file
output = (
processed_log
| 'Dump_json_'+filename >> beam.Map(json.dumps)
| "Save_"+filename >> beam.io.WriteToText(output_fp+filename,num_shards=0,shard_name_template="")
)
В настоящее время он обрабатывает только первое значение в списке. Я знаю, что мне, вероятно, придется использовать ParDo, но я не очень уверен, как учесть это в моем процессе.
Ценю любую помощь!