dask-kubernetes: RuntimeError: Невозможно добавить дочерний обработчик - PullRequest
0 голосов
/ 12 марта 2020

Редактировать: перенес обновленный вопрос в dask-kubernetes: проблема создания модуля с именем пользователя в верхнем регистре

Я изучаю dask-kubernetes в GKE. Я наткнулся на ошибку asyncio. Смотрите шаги ниже для решения проблемы. Тем не менее, дополнительные рекомендации по использованию dask-kubernetes с удаленным кластером Kubernetes приветствуются (обратите внимание, я использовал helm с хорошим опытом здесь , но хочу попробовать нативный подход, так как I не может масштабировать подход helm .

Создать кластер:

$ gcloud container clusters create --machine-type n1-standard-2 --num-nodes 2 --zone us-central1-b --cluster-version latest k8scluster

Установить пакет:

$ pip install dask-kubernetes

Аутентифицировать проект:

$ export GOOGLE_APPLICATION_CREDENTIALS="/Users/Ray/Downloads/daskk8s-f92d8191517a.json"

Запустить Записная книжка:

$ jupyter notebook

Запускайте команды и используйте рабочую-спец. c .yml, данную здесь

from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-2-fc3b1e8d8941> in <module>
----> 1 cluster = KubeCluster.from_yaml('worker-spec.yml')

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in from_yaml(cls, yaml_path, **kwargs)
    638             d = yaml.safe_load(f)
    639             d = dask.config.expand_environment_variables(d)
--> 640             return cls.from_dict(d, **kwargs)
    641 
    642     @property

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in from_dict(cls, pod_spec, **kwargs)
    600         KubeCluster.from_yaml
    601         """
--> 602         return cls(make_pod_from_dict(pod_spec), **kwargs)
    603 
    604     @classmethod

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, auth, idle_timeout, deploy_mode, interface, protocol, dashboard_address, security, scheduler_service_wait_timeout, scheduler_pod_template, **kwargs)
    414         self.auth = auth
    415         self.kwargs = kwargs
--> 416         super().__init__(**self.kwargs)
    417 
    418     def _get_pod_template(self, pod_template, pod_type):

/opt/anaconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name)
    254         if not self.asynchronous:
    255             self._loop_runner.start()
--> 256             self.sync(self._start)
    257             self.sync(self._correct_state)
    258 

/opt/anaconda3/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    159             return future
    160         else:
--> 161             return sync(self.loop, func, *args, **kwargs)
    162 
    163     async def _get_logs(self, scheduler=True, workers=True):

/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in _start(self)
    512         )
    513 
--> 514         await ClusterAuth.load_first(self.auth)
    515 
    516         self.core_api = kubernetes.client.CoreV1Api()

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/auth.py in load_first(auth)
     69         for auth_instance in auth:
     70             try:
---> 71                 await auth_instance.load()
     72             except (
     73                 kubernetes_asyncio.config.ConfigException,

/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/auth.py in load(self)
    127         )
    128         await kubernetes_asyncio.config.load_kube_config(
--> 129             self.config_file, self.context, None, self.persist_config
    130         )
    131 

/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_kube_config(config_file, context, client_configuration, persist_config)
    554     if client_configuration is None:
    555         config = type.__call__(Configuration)
--> 556         await loader.load_and_set(config)
    557         Configuration.set_default(config)
    558     else:

/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_and_set(self, client_configuration)
    369 
    370     async def load_and_set(self, client_configuration):
--> 371         await self._load_authentication()
    372         self._load_cluster_info()
    373         self._set_config(client_configuration)

/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in _load_authentication(self)
    195 
    196         if self.provider == 'gcp':
--> 197             await self.load_gcp_token()
    198             return
    199 

/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_gcp_token(self)
    231                     credentials = self._get_google_credentials()
    232             else:
--> 233                 credentials = await google_auth_credentials(config)
    234             config.value['access-token'] = credentials.token
    235             config.value['expiry'] = credentials.expiry

/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/google_auth.py in google_auth_credentials(provider)
     15                                               stdout=asyncio.subprocess.PIPE,
     16                                               stderr=asyncio.subprocess.PIPE)
---> 17     proc = await cmd_exec
     18 
     19     data = await proc.stdout.read()

/opt/anaconda3/lib/python3.7/asyncio/subprocess.py in create_subprocess_exec(program, stdin, stdout, stderr, loop, limit, *args, **kwds)
    215         program, *args,
    216         stdin=stdin, stdout=stdout,
--> 217         stderr=stderr, **kwds)
    218     return Process(transport, protocol, loop)

/opt/anaconda3/lib/python3.7/asyncio/base_events.py in subprocess_exec(self, protocol_factory, program, stdin, stdout, stderr, universal_newlines, shell, bufsize, *args, **kwargs)
   1538         transport = await self._make_subprocess_transport(
   1539             protocol, popen_args, False, stdin, stdout, stderr,
-> 1540             bufsize, **kwargs)
   1541         if self._debug and debug_log is not None:
   1542             logger.info('%s: %r', debug_log, transport)

/opt/anaconda3/lib/python3.7/asyncio/unix_events.py in _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra, **kwargs)
    191 
    192             watcher.add_child_handler(transp.get_pid(),
--> 193                                       self._child_watcher_callback, transp)
    194             try:
    195                 await waiter

/opt/anaconda3/lib/python3.7/asyncio/unix_events.py in add_child_handler(self, pid, callback, *args)
    939         if self._loop is None:
    940             raise RuntimeError(
--> 941                 "Cannot add child handler, "
    942                 "the child watcher does not have a loop attached")
    943 

RuntimeError: Cannot add child handler, the child watcher does not have a loop attached
...