Даска не начинающих работников - PullRequest
1 голос
/ 13 марта 2019

Я пытаюсь использовать Dask для выполнения групповой операции над кадром данных.Приведенный ниже код не работает, но кажется, что если я инициализирую Клиента с другой консоли, код работает, даже если я ничего не вижу на приборной панели (http://localhost:8787/status): я имею в виду, что есть приборная панель,но все цифры выглядят пустыми.Я на MacOS.Код:

from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd

client = Client()
# open http://localhost:8787/status

csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'


df = dd.read_csv(csv_path,
     dtype = {
        'timestamp': str,
         'node_id': str,
         'subsystem': str,
         'sensor': str,
         'parameter': str, 
         'value_raw': str, 
         'value_hrf': str,
     },
     parse_dates=['timestamp'],
     date_parser=lambda x: pd.datetime.strptime(x, '%Y/%m/%d %H:%M:%S')
)


#%%
if not os.path.exists(dir_destination):
        os.makedirs(dir_destination)

def create_node_csv(df_node):
    # test function
    return len(df_node)

res = df.groupby('node_id').apply(create_node_csv, meta=int)

CSV-файл просто состоит из столбцов строки.Моя цель состоит в том, чтобы сгруппировать все строки, которые содержат определенное значение в столбце, и затем сохранить их как отдельный файл, используя create_node_csv (df_node) (даже если сейчас это фиктивная функция).Любой другой способ сделать это приветствуется, но я хотел бы понять, что здесь происходит.

Когда я запускаю его, консоль несколько раз выдает следующие ошибки: tornado.application - ERROR - Несколько исключений в yieldОтслеживание списка (последний вызов был последним):

 File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 208, in _start_worker
    yield w._start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 157, in _start
    response = yield self.instantiate()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 226, in instantiate
    self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/anaconda3/lib/python3.7/site-packages/distributed/nanny.py", line 370, in start
    yield self.process.start()
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 35, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/anaconda3/lib/python3.7/site-packages/distributed/process.py", line 184, in _start
    process.start()
  File "/anaconda3/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/anaconda3/lib/python3.7/multiprocessing/context.py", line 291, in _Popen
    return Popen(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 35, in __init__
    super().__init__(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/anaconda3/lib/python3.7/multiprocessing/popen_forkserver.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/anaconda3/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

И:

distributed.nanny - WARNING - Worker process 1844 exited with status 1
distributed.nanny - WARNING - Restarting worker

И:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker
    raise gen.TimeoutError("Worker failed to start")
tornado.util.TimeoutError: Worker failed to start
tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 883, in callback
    result_list.append(f.result())
  File "/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/anaconda3/lib/python3.7/site-packages/distributed/deploy/local.py", line 217, in _start_worker

РЕДАКТИРОВАТЬ: на основе ответа: - Как сделатьЛи предотвратить создание нового Клиента, если я снова запустил программу?- Как я могу сделать следующее?

def create_node_csv(df_node):
    return len(df_node)

Это возвращает мне следующую ошибку, это связано с мета-параметром?

ValueError: cannot reindex from a duplicate axis

1 Ответ

1 голос
/ 13 марта 2019

Когда вы запускаете скрипт, Client() вызывает появление новых работников Dask, которые также получают копии переменных из исходного основного процесса. В некоторых случаях это включает в себя повторный импорт скрипта в каждого работника, каждый из которых, конечно, затем пытается создать Client и новый набор процессов.

Лучший ответ, как и в целом для всего, что выполняется в процессах, - это использовать функции и защищать основное выполнение. Следующий способ может быть использован для этого без изменения структуры с одним скриптом:

from datetime import datetime
import numpy as np
import os
from dask import dataframe as dd
from dask.distributed import Client
import pandas as pd

csv_path = 'chicago-complete.monthly.2018-07-01-to-2018-07-31/data.csv'
dir_destination = 'data'

def run():
    client = Client()

    df = dd.read_csv(csv_path, ...)
    if not os.path.exists(dir_destination):
            os.makedirs(dir_destination)

    def create_node_csv(df_node):
        # test function
        return len(df_node)

    res = df.groupby('node_id').apply(create_node_csv, meta=int)
    print(res.compute())

if __name__ == "__main__":
    run()

Как я могу предотвратить создание нового Клиента, если я снова запускаю программу?

В вызове Client() вы можете включить адрес существующего кластера, если знаете, что это будет. Кроме того, некоторые конкретные типы развертываний (а их несколько) могут иметь понятие «текущий кластер».

...