Итерировать строки в фрейме данных df_a и обновлять фрейм данных df_b на основе значений df_a в Pyspark - PullRequest
0 голосов
/ 07 августа 2020

У меня есть фрейм данных df_b , который необходимо обновить на основе значений df_a фрейма данных

df_a

+-----+-----+------------+---------+
| id_1| id_2| header_oper| head_seq|
+-----+-----+------------+---------+
|  boy|    3|      insert|        1|
|  bat|    4|      delete|        3|
|  cat|    2|      insert|        1|
|  bat|    4|      update|        2|
|  bat|    5|   beforeimg|        1|
+-----+-----+------------+---------+

df_b (раньше)

+-----+-----+
| id_1| id_2|
+-----+-----+
|  boy|    4|
|  bat|    5|
|  cat|    1|
+-----+-----+

Метод, который я придумал :

  1. Сортировка df_a по 'head_seq'.
  2. Итерация df_a
  3. если 'header_oper'.isin (' insert ',' update '), затем добавьте эту строку в df_b
  4. если 'header_oper'.isin (' delete ',' beforeimg '), вычтите эту строку из df_b

Ожидается df_b (после):

+-----+-----+
| id_1| id_2|
+-----+-----+
|  boy|    4|
|  boy|    3|
|  cat|    2|
|  cat|    1|
+-----+-----+

Нужна помощь в том, как выполнять итерацию df_a и выполнять операции с df_b на основе значений df_a.

Ответы [ 3 ]

0 голосов
/ 08 августа 2020
ds= spark.createDataFrame([('boy',4),('bat',5),('cat',1)],['id_1','id_2'])
df_op=spark.createDataFrame([('boy',3,'insert',1),('bat',4,'delete',3),('cat',2,'insert',1),('bat',4,'update',2),('bat',5,'beforeimg',1)], ['id_1','id_2','eff_op','seq'])

effective_op=df_op.groupBy('id_1').agg(max('seq').alias('seq')).join(df_op,['id_1','seq'])

ds_insert=ds.union(effective_op.select('id_1','id_2').filter("eff_op in ('insert')").orderBy(asc('id_1')))

ds_delete=ds_insert.join(effective_op.filter("eff_op in ('delete')").select("*"),['id_1'],'left').select(ds_insert.id_1, ds_insert.id_2).filter("eff_op is null")

display(ds_delete)
0 голосов
/ 23 августа 2020

Хорошо, я понял это. Поскольку для каждого обновления есть beforeimg, порядок операций не имеет значения.

Мне просто нужно было добавить все вставки и обновления, а затем удалить Deletes и BeforeImgs

Разделение операций и отмена выбора столбцов заголовков

ins=df_a.where(df_a['header_oper']=='insert')
ins=ins.select(id_1,id_2)

upd=df_a.where(df_a['header_oper']=='update')
upd=upd.select(id_1,id_2)

dele=df_a.where(df_a['header_oper']=='delete')
dele=dele.select(id_1,id_2)

bimg=df_a.where(df_a['header_oper']=='delete')
bimg=bimg.select(id_1,id_2)

Добавление вставок и обновлений к df_b

df_b=df_b.union(ins)
df_b=df_b.union(upd)

Удаление удалений и BeforeImgs из df_b

df_b=df_b.subtract(dele)
df_b=df_b.subtract(bimg)
0 голосов
/ 07 августа 2020
import pandas as pd


dataA = [['boy', 3, 'insert', 1],['bat', 4, 'delete', 3], ['cat', 2, 'insert', 1], ['bat', 4, 'update', 2]]
df_a = pd.DataFrame(dataA, columns=['id_1', 'id_2','header_oper','head_seq'])

dataB = [['boy', 4],['bat', 5], ['cat', 1]]
df_b = pd.DataFrame(dataB, columns=['id_1', 'id_2'])

for index, row in df_a.iterrows():
    if row['header_oper'] is 'insert':
        pass # Add your Code here
    elif row['header_oper'] is 'delete':
        pass # Add your Code here
    elif row['header_oper'] is 'update':
        pass # Add your Code here
    else:
        raise Exception()
...