Я очень новичок как в Dask, так и в Featuretools, поэтому у меня возникло много трудностей при объединении их для распараллеливания разработки функций
Короткая версия : решение неотложной проблемы У меня есть задачаbag dfs
из pandas DataFrame
и хотите вывести их как csv с каждым файлом, имеющим раздел в качестве идентификатора. to_textfiles () выдало ошибку, и я не могу найти способ получить номер раздела для использования dfs.map(pd.to_csv, "[partition_num].csv")
.Есть ли способ сделать это?
>>> dfs
dask.bag<map-par..., npartitions=2>
>>> type(dfs.compute()[0])
pandas.core.frame.DataFrame
>>> dfs.to_textfiles('feature_matrices/calculate_matrix/*_test')
anaconda3/envs/featuretools/lib/python3.6/site-packages/dask/utils.py in ensure_unicode()
592 return s.decode()
593 msg = "Object %s is neither a bytes object nor has an encode method"
--> 594 raise TypeError(msg % s)
TypeError: ('Long error message', 'Object Age ArrivalMethod\nPAT_ENC_CSN_ID \n3223775624 33 Car\n3223776450 82 Medical Flight\n3223776487 65 Other\n3223776543 31 Ambulance\n3223835687 89 Ambulance\n3223838474 42 Public Transportation\n3223842283 11 Ambulance\n3223845045 60 A
Длинная версия : Для тех, кому интересно, почему у меня пакет данных с панками данных, я помещаю всю свою проблему здесь впоиск лучшего подхода.Я пытаюсь использовать featuretools для создания 2 миллионов объектов для набора данных из 22 тыс. Строк (для выбора объектов позже).Я пытаюсь следовать ссылкам ( этот пост и этот блокнот ).В записной книжке набор данных был огромным (45 миллионов строк) и намного больше, чем мой набор данных из 22 000 строк.
Тем не менее, я разбил шахту на разделы по 741 строке, поскольку передача entity set
полных данных в Calculate_feature_matrix содержала последовательный компонент, который занимал слишком много времени (вероятно, для распределения entity set
работникам).Это происходит, даже если я создаю только одну функцию со всем набором данных.Ни один из моих dask-workers
( LSFCluster ) не имел более 5% загрузки ЦП после 20 минут работы calculate_matrix
, и это привело к огромному отслеживанию ошибок:
ИспользованиеВесь набор данных с одной функцией:
...
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
File "/path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
Exception in callback BaseAsyncIOLoop._handle_events(110, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(110, 1)>
Traceback (most recent call last):
File "/path/anaconda3/envs/featuretools/lib/python3.6/asyncio/events.py", line 145, in _run
self._callback(*self._args)
File "/lab/corradin_data/FOR_AN/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 122, in _handle_events
handler_func(fileobj, events)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/path/anaconda3/envs/featuretools/lib/python3.6/site-packages/tornado/netutil.py", line 249, in accept_handler
File "path/anaconda3/envs/featuretools/lib/python3.6/socket.py", line 205, in accept
OSError: [Errno 24] Too many open files
В дополнение к разделению набора данных, я также разделяю по функциям, выполняя одну функцию за один раз.Теперь я хочу записать эту функцию на диск, но хочу объединить их в куски по 1 тыс. Вместо вывода 2 млн. CSV-файлов.Ниже приведен мой подход, который заканчивается тем, что dfs представляет собой dask bag
из pandas DataFrame
Для каждого раздела из 741 строки рассчитайте по одному объекту за раз :
from dask_jobqueue import LSFCluster
from dask.distributed import Client
cluster = LSFCluster(...)
client = Client(cluster)
# take a feature, return a feature matrix for a subset of data
def make_feature(feature):
feature_name = feature.generate_name()
try:
feature_matrix = ft.calculate_feature_matrix(feature, entityset=es, n_jobs= 1, verbose = 1) #es is one partition of dataset
print(f"Finished generating feature {feature_name}")
return feature_matrix
except:
print(f"Could not make feature: {feature_name}")
print("--------")
return None
import dask.bag as db
b = db.from_sequence(feature_list, partition_size=1000) # 1k feature per partition
b = b.map(make_feature)
#concatenate 1k dataframe (1 partition) to 1 df
def concat(partition):
series = [i for i in partition]
df = pd.concat(series,axis =1)
return [df]
dfs = b.map_partitions(concat) # dask bag of dataframes
overall_start = timer()
dfs.compute()
overall_end = timer()
print(f"Total Time Elapsed: {round(overall_end - overall_start, 2)} seconds.")
#ouput to disk here
???
Это мой первый ТАК вопрос, поэтому, пожалуйста, дайте мне знать, что исправить / добавить, чтобы мой вопрос был понятнее.Спасибо!