Я пытаюсь объединить в Beam с боковым вводом.
Объединение работает (с боковым вводом) и обновляет базовые данные сотрудника для общего ключа. В моем пользовательском требовании я также хочу добавить новый идентификатор сотрудника в существующий (обновленный набор данных) после объединения.
Оба PCollections имеют одинаковое расположение, emp_id является общим ключом. Моя работа выполняется в облачном потоке данных.
Функция процесса Apache Beam Остальное не работает должным образом, где я пытаюсь добавить новый empid, частьновый вход. Здесь клавиши различаются в этом наборе ввода.
Функция процесса:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
try:
result.update(new_emp_details[row['emp_id']])
except KeyError as err:
pass
else:
result.update(new_emp_details[row['emp_id']])
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result
Вызов этой функции:
history_data = (
p
| 'Read historical data from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=emp_hist_data, use_standard_sql=True))
|'Join Data with sideInput' >> beam.Map(datalakecomparison.process_history_details, AsDict(*new_emp_data*))
Словарь new_emp_data генерируется, как указано ниже:
new_emp_data = (
p
| 'Read base from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query= new_emp_query, use_standard_sql=True))
|
'New Employee details' >> beam.Map(
lambda x:(
x['emp_id'], x))
)
Используя запрос belwo для извлечения данных, затем данные передаются (как упомянуто выше) с помощью лямбда-функции.
def new_emp_query(self):
new_emp_query = """
SELECT
emp_id,
emp_name,
emp_code,
emp_unit,
emp_sal
FROM snow.new_emp_data
"""
return new_emp_query
Текущий ввод:
Исторические данные
emp_id, emp_name, emp_code, emp_unit, emp_sal
1,A, 34,45,70000
2, B, 35,45,80000
3, C, 34,45,90000
данные о новых сотрудниках
emp_id, emp_name, emp_code, emp_unit, emp_sal
1, A, 34,45,1000000
6, F, 36,47,90000
Токовый выход:
emp_id, emp_name, emp_code, emp_unit, emp_sal
1, A, 34,45,1000000
2,B, 35,45,80000
3, C, 34,45,90000
Ожидаемый результат:
emp_id, emp_name, emp_code, emp_unit, emp_sal
1, A, 34,45,1000000
2, B, 35,45,80000
3, C, 34,45,90000
6, F, 36,47,90000
Emp_id 6 - это то, что я хочу добавить к результату