Как Dask Process не завершится, пока Dask Diagnostics сообщит о его завершении? - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть HDF-хранилище с одним фреймом данных 60089424 строк х 5 столбцов.Все столбцы являются категориальными, и все они содержат одинаковое сопоставление значений с кодами, которое имеет 3318655 категорий.HDF-хранилище сжимается с complevel=9 и complib='blosc:lz4'.Это написано с format='table'.Файл на диске 1.1 Gig.

Я упоминаю все это, потому что я не знаю, относится ли это к моему вопросу.

Теперь я хотел удалить строки из кадра данных, которые содержатзначения, которые имеют частоту ниже определенного порога, и я хотел сделать это в разумные сроки.Я задал вопрос об этом здесь .

Мне в конечном итоге удалось с помощью Dask создать код, который, по-видимому, справляется с работой за разумное время (если этот вопрос здесь может быть решен, я будуответь на мой вопрос там с этим кодом).Хотя есть одна большая проблема.Код не заканчивается.Он работает нормально до вызова compute().Он также заканчивает compute(), как указано завершенным индикатором выполнения.Но это так и есть.Я никогда не выполняю следующую строку (print(f'{time_to_display()}: dropped rows with low frequency words').

Теперь ... Я действительно в растерянности относительно того, почему это так. Я почти буквально вырываю свои волосы на самом деле. Можете ли вы объяснить мне, почемуcompute() вызов отображается как завершенный, но еще не завершенный?

import time
import dask.dataframe as ddf
import dask
import numpy as np
import pandas as pd


def time_to_display():
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))



hdfstore_filename =  '<the HDF file>'
threshold = 100

print(f'{time_to_display()}: filtering data for low frequency words')
print(f'file = {hdfstore_filename}.h5')

store = pd.HDFStore(f'{hdfstore_filename}.h5')
df = store.select(key='/ngrams')

print(f'{time_to_display()}: data loaded')

unique, counts = np.unique(df.values.ravel(), return_counts=True)
print(f'{time_to_display()}: gathered frequencies of words [1]')

d = dict(zip(unique, counts))
print(f'{time_to_display()}: gathered frequencies of words [2]')

to_remove = [k for k, v in d.items() if v < threshold]
print(f'{time_to_display()}: gathered indices of values to remove')


df_dask = ddf.from_pandas(df, chunksize=1000000) # results in 60 chunks for this data
print(f'{time_to_display()}: df_dask created')

mask = df_dask.isin(to_remove)
print(f'{time_to_display()}: mask created')

column_mask = (~mask).all(axis=1)
print(f'{time_to_display()}: column_mask created')

df_dask = df_dask[column_mask]
print(f'{time_to_display()}: df_dask filtered')

df_dask.visualize(filename='log/df_dask', format='pdf')
print(f'{time_to_display()}: computation graph rendered')

from dask.diagnostics import ProgressBar

with ProgressBar():
    df_out = dask.compute(df_dask)[0]
print(f'{time_to_display()}: dropped rows with low frequency words')

df_out.to_hdf(f'{hdfstore_filename}_filtered_complete.h5', 'ngrams', complevel=9,
          complib='blosc:lz4', format='table')
print(f'{time_to_display()}: store written')

store.close()

Я добавил сообщения о ходе выполнения, чтобы узнать, сколько времени занимают отдельные вызовы. Вывод:

2018-12-04 22:35:46: filtering data for low frequency words
file = data/5grams_wiki_00/5grams_wiki_00_cat_final.h5
2018-12-04 22:36:31: data loaded
2018-12-04 22:45:33: gathered frequencies of words [1]
2018-12-04 22:45:34: gathered frequencies of words [2]
2018-12-04 22:45:35: gathered indices of values to remove
2018-12-04 22:45:52: df_dask created
2018-12-04 22:46:12: mask created
2018-12-04 22:46:12: column_mask created
2018-12-04 22:46:12: df_dask filtered
2018-12-04 22:46:13: computation graph rendered
[########################################] | 100% Completed | 12min  2.4s

Я позволил программе работать еще 30 минут, но новые сообщения не появлялись.

edit

Я выполнил это сейчас с версией того же набора данных с одним отличием: у всех столбцов были своиотчетливая краткая кодировка. На этот раз разница во времени между завершением создания отчета о ходе выполнения и следующим распечатанным сообщением о прогрессе составила «всего» 15 минут. Так что суть - категориальная кодировка. Есть идеи почему?

2018-12-05 13:00:46: filtering data for low frequency words
file = data/5grams_wiki_00/5grams_wiki_00_cat.h5
2018-12-05 13:01:34: data loaded
2018-12-05 13:11:00: gathered frequencies of words [1]
2018-12-05 13:11:01: gathered frequencies of words [2]
2018-12-05 13:11:02: gathered indices of values to remove
2018-12-05 13:11:19: df_dask created
2018-12-05 13:11:31: mask created
2018-12-05 13:11:31: column_mask created
2018-12-05 13:11:31: df_dask filtered
2018-12-05 13:11:31: computation graph rendered
[########################################] | 100% Completed |  5min 52.3s
2018-12-05 13:28:49: dropped rows with low frequency words
2018-12-05 13:31:05: store written
...