Dask эквивалент pandas .DataFrame.update - PullRequest
3 голосов
/ 14 июля 2020

У меня есть несколько функций, которые используют метод pandas.DataFrame.update, и я пытаюсь перейти к использованию Dask вместо наборов данных, но API Dask Pandas не имеет реализованного метода update . Есть ли альтернативный способ получить тот же результат в Dask?

Вот методы, которые я использую update:

  1. Вперед заполняет данные последним известным значением

df.update(df.filter(like='/').mask(lambda x: x == 0).ffill(1))

вход

id .. .. ..(some cols) 1/1/20 1/2/20 1/3/20 1/4/20 1/5/20 1/6/20 ....
1                      10     20     0      40     0      50
2                      10     30     30     0      0      50
.
.

выход

id .. .. ..(some cols) 1/1/20 1/2/20 1/3/20 1/4/20 1/5/20 1/6/20 ....
1                      10     20     20     40     40      50
2                      10     30     30     30     30      50
.
.
Заменяет значения в кадре данных значениями из другого кадра данных на основе столбца идентификатора / индекса
def replace_names(df1, df2, idxCol = 'id', srcCol = 'name', dstCol = 'name'):
    df1 = df1.set_index(idxCol)
    df1[dstCol].update(df2.set_index(idxCol)[srcCol])
    return df1.reset_index()
df_new = replace_names(df1, df2)

ввод

df1

id    name  ...
123   city a
456   city b
789   city c
789   city c
456   city b
123   city a
.
.
.

df2

id    name  ...
123   City A
456   City B
789   City C
.
.
.

вывод

id    name  ...
123   City A
456   City B
789   City C
789   City C
456   City B
123   City A
.
.
.

1 Ответ

2 голосов
/ 15 июля 2020

Вопрос 2

Есть способ частично решить эту проблему. Я предполагаю, что df2 намного меньше, чем df1, и он действительно помещается в памяти, поэтому мы можем читать как pandas dataframe. В этом случае следующая функция работает, если df1 - это pandas или dask фрейм данных, но df2 должен быть pandas один.

import pandas as pd
import dask.dataframe as dd

def replace_names(df1, # can be pandas or dask dataframe
                  df2, # this should be pandas.
                  idxCol='id',
                  srcCol='name',
                  dstCol='name'):
    diz = df2[[idxCol, srcCol]].set_index(idxCol).to_dict()[srcCol]
    out = df1.copy()
    out[dstCol] = out[idxCol].map(diz)
    return out

Вопрос 1

Что касается первой проблемы, следующий код работает в pandas и dask

df = pd.DataFrame({'a': {0: 1, 1: 2},
 'b': {0: 3, 1: 4},
 '1/1/20': {0: 10, 1: 10},
 '1/2/20': {0: 20, 1: 30},
 '1/3/20': {0: 0, 1: 30},
 '1/4/20': {0: 40, 1: 0},
 '1/5/20': {0: 0, 1: 0},
 '1/6/20': {0: 50, 1: 50}})

# if you want to try with dask
# df = dd.from_pandas(df, npartitions=2)

cols = [col for col in df.columns if "/" in col]
df[cols] = df[cols].mask(lambda x: x==0).ffill(1) #.astype(int)

Удалите комментарий в последней строке, если вы хотите, чтобы результат был целочисленным.

ОБНОВЛЕНИЕ Вопрос 2 Если вам нужно решение только dask, вы можете попробовать следующее.

Данные

import numpy as np
import pandas as pd
import dask.dataframe as dd

df1 = pd.DataFrame({'id': {0: 123, 1: 456, 2: 789, 3: 789, 4: 456, 5: 123},
 'name': {0: 'city a',
  1: 'city b',
  2: 'city c',
  3: 'city c',
  4: 'city b',
  5: 'city a'}})

df2 = pd.DataFrame({'id': {0: 123, 1: 456, 2: 789},
 'name': {0: 'City A', 1: 'City B', 2: 'City C'}})

df1 = dd.from_pandas(df1, npartitions=2)
df2 = dd.from_pandas(df2, npartitions=2)

Случай 1

В в этом случае, если один id присутствует в df1, но отсутствует в df2, вы сохраняете имя в df1.

def replace_names_dask(df1, df2,
                       idxCol='id',
                       srcCol='name',
                       dstCol='name'):
    if srcCol == dstCol:
        df2 = df2.rename(columns={srcCol:f"{srcCol}_new"})
        srcCol = f"{srcCol}_new"
    
    def map_replace(x, srcCol, dstCol):
        x[dstCol] = np.where(x[srcCol].notnull(),
                             x[srcCol],
                             x[dstCol])
        return x
    
    df = dd.merge(df1, df2, on=idxCol, how="left")
    df = df.map_partitions(lambda x: map_replace(x, srcCol, dstCol))
    df = df.drop(srcCol, axis=1)
    return df

df = replace_names_dask(df1, df2)

Case 2

В этом случае, если один id присутствует в df1, но отсутствует в df2, тогда name на выходе df будет NaN (как в стандартном левом соединении)

def replace_names_dask(df1, df2,
                       idxCol='id',
                       srcCol='name',
                       dstCol='name'):
    df1 = df1.drop(dstCol, axis=1)
    df2 = df2.rename(columns={srcCol: dstCol})
    df = dd.merge(df1, df2, on=idxCol, how="left")
    return df

df = replace_names_dask(df1, df2)
...