Работник падает при простой агрегации - PullRequest
0 голосов
/ 28 декабря 2018

Я пытаюсь объединить различные столбцы в наборе данных 450 миллионов строк.Когда я использую встроенные агрегаты Dask, такие как 'min', 'max', 'std', 'mean', продолжайте сбивать работника в процессе.

Файл, который я использую, можно найти здесь: https://www.kaggle.com/c/PLAsTiCC-2018/data ищите test_set.csv

У меня есть кластер Google kubernetes, который состоит из 3 8-ядерных компьютеров с общим количеством22 ГБ ОЗУ.

Поскольку это всего лишь встроенные функции агрегирования, я не слишком много пробовал.

Он также не использует слишком много ОЗУ, он остается стабильным около 6 ГБ, и я невидел любые ошибки, которые указывали бы на ошибку нехватки памяти.

Ниже приведен мой основной код и журнал ошибок уволенного работника:

from dask.distributed import Client, progress
client = Client('google kubernetes cluster address')

test_df = dd.read_csv('gs://filepath/test_set.csv', blocksize=10000000)

def process_flux(df):
flux_ratio_sq = df.flux / df.flux_err
flux_by_flux_ratio_sq = (df.flux * flux_ratio_sq)
df_flux = dd.concat([df, flux_ratio_sq, flux_by_flux_ratio_sq], axis=1)
df_flux.columns = ['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected', 'flux_ratio_sq', 'flux_by_flux_ratio_sq']
return df_flux

aggs = {
'flux': ['min', 'max', 'mean', 'std'],

'detected': ['mean'],
'flux_ratio_sq': ['sum'],
'flux_by_flux_ratio_sq': ['sum'],
'mjd' : ['max', 'min'],
}

def featurize(df):

start_df = process_flux(df)
agg_df = start_df.groupby(['object_id']).agg(aggs)
return agg_df

overall_start = timer()
final_df = featurize(test_df).compute()
overall_end = timer()

Журналы ошибок:

 distributed.core - INFO - Event loop was unresponsive in Worker for 74.42s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
 distributed.core - INFO - Event loop was unresponsive in Worker for 3.30s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
 distributed.core - INFO - Event loop was unresponsive in Worker for 3.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

Количество таких событий, а затем:

 distributed.core - INFO - Event loop was unresponsive in Worker for 65.16s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
 distributed.worker - ERROR - Worker stream died during communication: tcp://hidden address
 Traceback (most recent call last):
 File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 180, in read
n_frames = yield stream.read_bytes(8)
 File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 441, in read_bytes
self._try_inline_read()
 File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 911, in _try_inline_read
self._check_closed()
 File "/opt/conda/lib/python3.6/site-packages/tornado/iostream.py", line 1112, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

response = yield comm.read(deserializers=deserializers)
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 326, in wrapper
yielded = next(result)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 201, in read
convert_stream_closed_error(self, e)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 127, in     convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: TimeoutError: [Errno 110] Connection timed out

Он работает довольно быстро, и я просто хочу получить стабильную производительность без сбоев моих работников.

Спасибо!

...