Использование spark_sklearn с кластером kubernetes - PullRequest
0 голосов
/ 07 ноября 2019

Я работаю над проектом машинного обучения. Первоначально я использовал библиотеку scikit-learn (sklearn). В процессе оптимизации модели я использую классический класс GridSearchCV от sklearn. В настоящее время он распараллеливается, используя все ресурсы хоста, на котором запущен API (библиотека JobLib). Здесь ниже у вас есть пример этого,

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV
from datetime                 import datetime
import numpy as np

def count_trials(param_grid):
    total_trials = 0
    for k,v in param_grid.items():
        total_trials += len(v)

    return total_trials

# Load data
digits = datasets.load_digits()

X, y = digits.data, digits.target

print("")
print("Iris data set: ")
print("X:      {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")

param_grid = {"max_depth":         [3, None],
              "max_features":      ["auto"],
              "min_samples_split": [2, 3,4,5,10,20],
              "min_samples_leaf":  [1, 3,4,6,10,20],
              "bootstrap":         [True],
              "criterion":         ["entropy"],
              "n_estimators":      [40, 80],
              }
cv = 5

n_models = count_trials(param_grid)

print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),
                  param_grid=param_grid,
                  cv=cv,
                  refit=True,
                  scoring="accuracy",
                  n_jobs = -1)
gs.fit(X, y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

Недавно я обнаружил, что он был расширен для использования ресурсов искрового кластера (библиотеки pyspark и spark_sklearn). Мне удалось настроить спарк-кластер с одним мастером и двумя рабочими. Приведенный ниже код выполняет ту же задачу, что и раньше, но с использованием ресурсов кластера искры.

from sklearn                  import datasets
from sklearn.ensemble         import RandomForestClassifier
from sklearn.model_selection  import GridSearchCV as SKGridSearchCV
from spark_sklearn            import GridSearchCV as SparkGridSearchCV
from pyspark                  import SparkConf, SparkContext
from datetime                 import datetime
import numpy as np

def get_context():
    sc_conf = SparkConf()
    sc_conf.setAppName("test-sklearn-spark-app")
    sc_conf.setMaster('spark://<master-IP>:7077')
    sc_conf.set('spark.cores.max', '40')
    sc_conf.set('spark.logConf', True)
    print(sc_conf.getAll())

    return SparkContext(conf=sc_conf)

def count_trials(param_grid):
    total_trials = 0
    for k,v in param_grid.items():
        total_trials += len(v)

    return total_trials

# Load data
digits = datasets.load_digits()

X, y = digits.data, digits.target

print("")
print("Iris data set: ")
print("X:      {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")

param_grid = {"max_depth":         [3, None],
              "max_features":      ["auto"],
              "min_samples_split": [2, 3,4,5,10,20],
              "min_samples_leaf":  [1, 3,4,6,10,20],
              "bootstrap":         [True],
              "criterion":         ["entropy"],
              "n_estimators":      [40, 80],
              }
cv = 5

n_models = count_trials(param_grid)

print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))

sc = get_context()
gs = SparkGridSearchCV(sc = sc,
                       estimator = RandomForestClassifier(),
                       param_grid=param_grid,
                       cv=cv,
                       refit=True,
                       scoring="accuracy",
                       n_jobs = -1)

start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X, y)
end_time   = datetime.now()

print("Ending   at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))

Где master-IP - это IP-адрес главного узла. Код работает отлично, используя все ресурсы, доступные в кластере spark.

Затем я настроил кластер kubernetes с одним главным и одним подчиненными узлами. Затем я запустил тот же код, что и раньше, но заменил строку на

sc_conf.setMaster('spark://<master-IP>:7077')

на

sc_conf.setMaster('k8s://<master-IP>:<PORT>')

, где master-IP и PORT - это те, которые я получаю, запустив команду наглавный узел,

kubectl  cluster-info

Дело в том, что мой код больше не работает. Он отображает следующие сообщения об ошибках:

19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
    at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$$onNewSnapshots$1.apply$mcVI$sp(ExecutorPodsAllocator.scala:133)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$$onNewSnapshots(ExecutorPodsAllocator.scala:126)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:102)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:99)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:71)
    at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anon$1.run(ExecutorPodsSnapshotsStoreImpl.scala:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Кажется, что он говорит, что я должен указать образ докера, но я не знаю, как мне это сделать.

Кто-нибудьесть опыт с этим? Я искал в Интернете, но ответов нет.

Заранее спасибо,

Ale

1 Ответ

0 голосов
/ 07 ноября 2019

Я бы порекомендовал вам сначала прочитать doc .

Когда вы запускаете Spark на Kubernetes, ваши процессы передаются в кластер Kubernetes внутри контейнеров Docker, который должен быть известен во время работы. представление. Для SparkConf:

  • spark.kubernetes.container.image

или

  • spark.kubernetes.driver.container.image
  • spark.kubernetes.executor.container.image

Также убедитесь, что ваш кластер Kubernetes может извлекать эти образы, самый простой способ - отправить их в DockerHub. Следуйте руководству о том, как создавать образы Spark Docker.

Похоже, вы выполняете свою работу в режиме клиента, поэтому примите во внимание сетевые заметки . По сути, вы должны быть уверены, что ваш процесс Driver (который, вероятно, выполняется на вашем локальном компьютере) доступен из сети Kubernetes (в частности, для модулей-исполнителей), что может быть не так очевидно. Также ваш процесс Driver должен иметь сетевой доступ к модулю Executor. На самом деле очень сложно отправить Spark Jobs в режиме клиента на удаленный кластер Kubernetes с локальной рабочей станции, и я бы посоветовал вам сначала попробовать его в режиме кластера.

Если вы хотите отправить свои заданияв режиме кластера вы должны быть уверены, что ваш артефакт Job (скрипт Python в вашем случае) и его зависимости доступны из модулей Spark Driver и Executor (самый простой способ - поместить ваш скрипт со всеми зависимостями в образе Spark Docker на Sparkclasspath).

Вместо этого он должен работать для вас так же, как обычно.

Также вы можете сослаться на диаграмму Spark Helm в кластере Kubernetes , который включает интеграцию ноутбуков Jupyter, что упрощает запуск интерактивных сессий Spark в Kubernetes.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...