У меня есть две PCollections: одна, которая извлекает информацию из Pub / Sub, и другая, которая извлекает данные из файла CSV.После нескольких различных преобразований в каждом конвейере, я хотел бы объединить их по общему ключу, который они оба используют, "STREET".Я включаю второй PCollection в качестве бокового входа.Тем не менее, я получаю сообщение об ошибке при попытке запуска.
Я пытался использовать CoGroupByKey, но я продолжал получать ошибки, касающиеся различий в типах данных в коллекциях Pcollections.Я попытался изменить рефакторинг выходных данных и установить атрибут PCollection через __setattr__
, чтобы заставить типы быть равными, но он сообщал о «смешанных значениях» независимо от этого.После дальнейших исследований кажется, что лучше использовать побочные входы, особенно когда существует несоответствие в размере данных между элементами.Даже с боковыми входами я все еще не могу обойти текущую ошибку:
from_runner_api raise ValueError('No producer for %s' % id)
ValueError: No producer for ref_PCollection_PCollection_6
Моя логика приложения такова:
def merge_accidents(element, pcoll):
print(element)
print(pcoll)
"some code that will append to existing data"
accident_pl = beam.Pipeline()
accident_data = (accident_pl |
'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
| 'Map Accidents' >> beam.ParDo(AccidentstoDict())
| 'Count Accidents' >> Count.PerKey())
chi_traf_pl = beam.Pipeline(options=pipeline_options)
chi_traffic = (chi_traf_pl | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
| 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
| 'TimeDelayEnrich' >> beam.Map(timedelay)
| 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
| 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=AsDict(accident_data))
| 'Temp Write'>> beam.io.WriteToText('testtime', file_name_suffix='.txt'))
accident_pl.run()
chi_result = chi_traf_pl.run()
chi_result.wait_until_finish()```
**Pcoll 1:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15'}]
**Pcoll 2:**
('MILWAUKEE AVE', 1)
('CENTRAL AVE', 2)
('WESTERN AVE', 6)
**Expected:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15', 'accident_count': '6'}]
**Actual Results:**
"from_runner_api raise ValueError('No producer for %s' % id)ValueError: No producer for ref_PCollection_PCollection_6