Функция процесса для соединения (с боковым вводом) на балке Apache (Cloud DataFlow), не добавляющего новую запись в остальную часть - PullRequest
0 голосов
/ 23 декабря 2019

Я пытаюсь объединить в Beam с боковым вводом.

Объединение работает (с боковым вводом) и обновляет базовые данные сотрудника для общего ключа. В моем пользовательском требовании я также хочу добавить новый идентификатор сотрудника в существующий (обновленный набор данных) после объединения.

Оба PCollections имеют одинаковое расположение, emp_id является общим ключом. Моя работа выполняется в облачном потоке данных.

Функция процесса Apache Beam Остальное не работает должным образом, где я пытаюсь добавить новый empid, частьновый вход. Здесь клавиши различаются в этом наборе ввода.

Функция процесса:

def process_history_details(self, row, new_emp_details):
    import traceback
    result = row.copy()
    try:
        result.update(new_emp_details[row['emp_id']])
    except KeyError as err:
        pass
    else:
        result.update(new_emp_details[row['emp_id']]) 
        for k in new_emp_details[k['emp_id']]:
            if k not in result['emp_id']:
               result.update(new_emp_details)
    return result

Вызов этой функции:

history_data = (
        p
        | 'Read historical data from BigQuery ' >> beam.io.Read(
    beam.io.BigQuerySource(query=emp_hist_data, use_standard_sql=True))
        |'Join Data with sideInput' >> beam.Map(datalakecomparison.process_history_details, AsDict(*new_emp_data*))

Словарь new_emp_data генерируется, как указано ниже:

new_emp_data = (
            p
            | 'Read base from BigQuery ' >> beam.io.Read(
            beam.io.BigQuerySource(query= new_emp_query, use_standard_sql=True))
            |
            'New Employee details' >> beam.Map(
        lambda x:(
            x['emp_id'], x))
            )

Используя запрос belwo для извлечения данных, затем данные передаются (как упомянуто выше) с помощью лямбда-функции.

def new_emp_query(self):
    new_emp_query = """
    SELECT 
    emp_id, 
    emp_name,
    emp_code,
    emp_unit,
    emp_sal 
    FROM snow.new_emp_data 
    """
    return new_emp_query

Текущий ввод:

Исторические данные

emp_id, emp_name, emp_code, emp_unit, emp_sal

1,A, 34,45,70000

2, B, 35,45,80000

3, C, 34,45,90000

данные о новых сотрудниках

emp_id, emp_name, emp_code, emp_unit, emp_sal

1, A, 34,45,1000000

6, F, 36,47,90000

Токовый выход:

emp_id, emp_name, emp_code, emp_unit, emp_sal

1, A, 34,45,1000000

2,B, 35,45,80000

3, C, 34,45,90000

Ожидаемый результат:

emp_id, emp_name, emp_code, emp_unit, emp_sal

1, A, 34,45,1000000

2, B, 35,45,80000

3, C, 34,45,90000

6, F, 36,47,90000

Emp_id 6 - это то, что я хочу добавить к результату

...