Создать поднабор данных Dask на основе другого Dask - PullRequest
0 голосов
/ 08 января 2020

У меня есть два кадра данных Dask, df1 длиной 5000 и df2 длины 100000, оба со столбцами start_time и end_time. Я пытаюсь найти строки df1, где интервал start_time-end_time df2 меньше или равен интервалу start_time-end_time df1 (df1.start_time <= df2.start_time <= df2.end_time <= df2.end_time) . </p>

Я пробовал следующее, но безрезультатно:

df3 = dd.from_pandas(pd.DataFrame(), npartitions=1)
for _, df2_row in df2.iterrows():
    df3_chunk = df1.apply(lambda df1_row: df1_row.start_time <= df2_row.start_time <= df2_row.end_time <= df1_row.end_time, axis=1)
    df3 = dd.concat([df3, df3_chunk])
df3 = pd.DataFrame()
for _, df2_row in df2.iterrows():
    df3_chunk = df1.loc[df1.start_time <= df2_row.start_time <= df2_row.end_time <= df1.end_time]
    df3 = dd.concat([df3, df3_chunk])

Первый фрагмент работает вечно, а второй возвращает ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

Пример входных данных (df1, df2) и выходных данных (df3):

# df1
start_time, end_time
2019-10-10 01:01:01, 2019-10-11 01:01:01
2001-01-01 00:00:01, 2001-02-02 03:04:05

# df2
start_time, end_time
2019-10-10 09:07:05, 2019-10-10 11:12:10

# df3
start_time, end_time
2019-10-10 01:01:01, 2019-10-11 01:01:01

Есть ли способ достичь этого?

1 Ответ

0 голосов
/ 08 января 2020

Начните с необходимого импорта:

import dask.dataframe as dd

Для целей тестирования я прочитал исходные (Pandas) кадры данных ( df1 и df2 ), а затем преобразовал их в Dask DataFrames:

dd1 = dd.from_pandas(df1, npartitions=2)
dd2 = dd.from_pandas(df2, npartitions=2)

В вашей программе (как я полагаю) вы будете читать их непосредственно из соответствующих входных файлов.

Затем определите следующую функцию:

def enclosesAny(row, other):
    return (other.start_time.ge(row.start_time) &
        other.end_time.le(row.end_time)).any()

И фактическое вычисление делится на 2 этапа:

  1. Вычисление encl столбец - результат применения вышеуказанной функции к каждой строке:

    dd1['encl'] = dd1.map_partitions(enclosesAny, other=dd2)
    
  2. Создание фактического результата. Он содержит строки из dd1 , для которых encl равно True . Чтобы не загромождать результат дополнительными столбцами, я удалил столбец encl :

    result = dd1[dd1.encl].drop('encl', axis=1).compute()
    
...