Запись пакета dask с фреймом данных на диск (создание 2 миллионов функций с помощью dask и featuretools) - PullRequest
0 голосов
/ 27 февраля 2019

Я очень новичок как в Dask, так и в Featuretools, поэтому у меня возникло много трудностей при объединении их для распараллеливания разработки функций

Короткая версия : решение неотложной проблемы У меня есть задачаbag dfs из pandas DataFrame и хотите вывести их как csv с каждым файлом, имеющим раздел в качестве идентификатора. to_textfiles () выдало ошибку, и я не могу найти способ получить номер раздела для использования dfs.map(pd.to_csv, "[partition_num].csv").Есть ли способ сделать это?

>>> dfs 
dask.bag<map-par..., npartitions=2>

>>> type(dfs.compute()[0])
pandas.core.frame.DataFrame

>>> dfs.to_textfiles('feature_matrices/calculate_matrix/*_test')

anaconda3/envs/featuretools/lib/python3.6/site-packages/dask/utils.py in ensure_unicode()
    592         return s.decode()
    593     msg = "Object %s is neither a bytes object nor has an encode method"
--> 594     raise TypeError(msg % s)

TypeError: ('Long error message', 'Object                 Age          ArrivalMethod\nPAT_ENC_CSN_ID                            \n3223775624       33                    Car\n3223776450       82         Medical Flight\n3223776487       65                  Other\n3223776543       31              Ambulance\n3223835687       89              Ambulance\n3223838474       42  Public Transportation\n3223842283       11              Ambulance\n3223845045       60              A

Длинная версия : Для тех, кому интересно, почему у меня пакет данных с панками данных, я помещаю всю свою проблему здесь впоиск лучшего подхода.Я пытаюсь использовать featuretools для создания 2 миллионов объектов для набора данных из 22 тыс. Строк (для выбора объектов позже).Я пытаюсь следовать ссылкам ( этот пост и этот блокнот ).В записной книжке набор данных был огромным (45 миллионов строк) и намного больше, чем мой набор данных из 22 000 строк.

Тем не менее, я разбил шахту на разделы по 741 строке, поскольку передача entity set полных данных в Calculate_feature_matrix содержала последовательный компонент, который занимал слишком много времени (вероятно, для распределения entity set работникам).Это происходит, даже если я создаю только одну функцию со всем набором данных.Ни один из моих dask-workers ( LSFCluster ) не имел более 5% загрузки ЦП после 20 минут работы calculate_matrix, и это привело к огромному отслеживанию ошибок:

ИспользованиеВесь набор данных с одной функцией:

...
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "/path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
Exception in callback BaseAsyncIOLoop._handle_events(110, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(110, 1)>
Traceback (most recent call last):
  File "/path/anaconda3/envs/featuretools/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/lab/corradin_data/FOR_AN/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
    handler_func(fileobj, events)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
  File "path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files

В дополнение к разделению набора данных, я также разделяю по функциям, выполняя одну функцию за один раз.Теперь я хочу записать эту функцию на диск, но хочу объединить их в куски по 1 тыс. Вместо вывода 2 млн. CSV-файлов.Ниже приведен мой подход, который заканчивается тем, что dfs представляет собой dask bag из pandas DataFrame

Для каждого раздела из 741 строки рассчитайте по одному объекту за раз :

from dask_jobqueue import LSFCluster
from dask.distributed import Client
cluster = LSFCluster(...)
client = Client(cluster)

# take a feature, return a feature matrix for a subset of data
def make_feature(feature):
    feature_name = feature.generate_name()
    try:
        feature_matrix = ft.calculate_feature_matrix(feature, entityset=es, n_jobs= 1, verbose = 1) #es is one partition of dataset

        print(f"Finished generating feature {feature_name}")
        return feature_matrix
    except:
        print(f"Could not make feature: {feature_name}")
        print("--------")
        return None

import dask.bag as db
b = db.from_sequence(feature_list, partition_size=1000) # 1k feature per partition
b = b.map(make_feature) 

#concatenate 1k dataframe (1 partition) to 1 df 
def concat(partition):
    series = [i for i in partition]
    df = pd.concat(series,axis =1)
    return [df]

dfs = b.map_partitions(concat) # dask bag of dataframes

overall_start = timer()
dfs.compute()
overall_end = timer()

print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")

#ouput to disk here
???

Это мой первый ТАК вопрос, поэтому, пожалуйста, дайте мне знать, что исправить / добавить, чтобы мой вопрос был понятнее.Спасибо!

...