Редактировать: перенес обновленный вопрос в dask-kubernetes: проблема создания модуля с именем пользователя в верхнем регистре
Я изучаю dask-kubernetes в GKE. Я наткнулся на ошибку asyncio. Смотрите шаги ниже для решения проблемы. Тем не менее, дополнительные рекомендации по использованию dask-kubernetes с удаленным кластером Kubernetes приветствуются (обратите внимание, я использовал helm с хорошим опытом здесь , но хочу попробовать нативный подход, так как I не может масштабировать подход helm .
Создать кластер:
$ gcloud container clusters create --machine-type n1-standard-2 --num-nodes 2 --zone us-central1-b --cluster-version latest k8scluster
Установить пакет:
$ pip install dask-kubernetes
Аутентифицировать проект:
$ export GOOGLE_APPLICATION_CREDENTIALS="/Users/Ray/Downloads/daskk8s-f92d8191517a.json"
Запустить Записная книжка:
$ jupyter notebook
Запускайте команды и используйте рабочую-спец. c .yml, данную здесь
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-2-fc3b1e8d8941> in <module>
----> 1 cluster = KubeCluster.from_yaml('worker-spec.yml')
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in from_yaml(cls, yaml_path, **kwargs)
638 d = yaml.safe_load(f)
639 d = dask.config.expand_environment_variables(d)
--> 640 return cls.from_dict(d, **kwargs)
641
642 @property
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in from_dict(cls, pod_spec, **kwargs)
600 KubeCluster.from_yaml
601 """
--> 602 return cls(make_pod_from_dict(pod_spec), **kwargs)
603
604 @classmethod
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, auth, idle_timeout, deploy_mode, interface, protocol, dashboard_address, security, scheduler_service_wait_timeout, scheduler_pod_template, **kwargs)
414 self.auth = auth
415 self.kwargs = kwargs
--> 416 super().__init__(**self.kwargs)
417
418 def _get_pod_template(self, pod_template, pod_type):
/opt/anaconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, security, silence_logs, name)
254 if not self.asynchronous:
255 self._loop_runner.start()
--> 256 self.sync(self._start)
257 self.sync(self._correct_state)
258
/opt/anaconda3/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
159 return future
160 else:
--> 161 return sync(self.loop, func, *args, **kwargs)
162
163 async def _get_logs(self, scheduler=True, workers=True):
/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
/opt/anaconda3/lib/python3.7/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/core.py in _start(self)
512 )
513
--> 514 await ClusterAuth.load_first(self.auth)
515
516 self.core_api = kubernetes.client.CoreV1Api()
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/auth.py in load_first(auth)
69 for auth_instance in auth:
70 try:
---> 71 await auth_instance.load()
72 except (
73 kubernetes_asyncio.config.ConfigException,
/opt/anaconda3/lib/python3.7/site-packages/dask_kubernetes/auth.py in load(self)
127 )
128 await kubernetes_asyncio.config.load_kube_config(
--> 129 self.config_file, self.context, None, self.persist_config
130 )
131
/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_kube_config(config_file, context, client_configuration, persist_config)
554 if client_configuration is None:
555 config = type.__call__(Configuration)
--> 556 await loader.load_and_set(config)
557 Configuration.set_default(config)
558 else:
/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_and_set(self, client_configuration)
369
370 async def load_and_set(self, client_configuration):
--> 371 await self._load_authentication()
372 self._load_cluster_info()
373 self._set_config(client_configuration)
/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in _load_authentication(self)
195
196 if self.provider == 'gcp':
--> 197 await self.load_gcp_token()
198 return
199
/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/kube_config.py in load_gcp_token(self)
231 credentials = self._get_google_credentials()
232 else:
--> 233 credentials = await google_auth_credentials(config)
234 config.value['access-token'] = credentials.token
235 config.value['expiry'] = credentials.expiry
/opt/anaconda3/lib/python3.7/site-packages/kubernetes_asyncio/config/google_auth.py in google_auth_credentials(provider)
15 stdout=asyncio.subprocess.PIPE,
16 stderr=asyncio.subprocess.PIPE)
---> 17 proc = await cmd_exec
18
19 data = await proc.stdout.read()
/opt/anaconda3/lib/python3.7/asyncio/subprocess.py in create_subprocess_exec(program, stdin, stdout, stderr, loop, limit, *args, **kwds)
215 program, *args,
216 stdin=stdin, stdout=stdout,
--> 217 stderr=stderr, **kwds)
218 return Process(transport, protocol, loop)
/opt/anaconda3/lib/python3.7/asyncio/base_events.py in subprocess_exec(self, protocol_factory, program, stdin, stdout, stderr, universal_newlines, shell, bufsize, *args, **kwargs)
1538 transport = await self._make_subprocess_transport(
1539 protocol, popen_args, False, stdin, stdout, stderr,
-> 1540 bufsize, **kwargs)
1541 if self._debug and debug_log is not None:
1542 logger.info('%s: %r', debug_log, transport)
/opt/anaconda3/lib/python3.7/asyncio/unix_events.py in _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra, **kwargs)
191
192 watcher.add_child_handler(transp.get_pid(),
--> 193 self._child_watcher_callback, transp)
194 try:
195 await waiter
/opt/anaconda3/lib/python3.7/asyncio/unix_events.py in add_child_handler(self, pid, callback, *args)
939 if self._loop is None:
940 raise RuntimeError(
--> 941 "Cannot add child handler, "
942 "the child watcher does not have a loop attached")
943
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached