Я работаю над проектом машинного обучения. Первоначально я использовал библиотеку scikit-learn (sklearn). В процессе оптимизации модели я использую классический класс GridSearchCV от sklearn. В настоящее время он распараллеливается, используя все ресурсы хоста, на котором запущен API (библиотека JobLib). Здесь ниже у вас есть пример этого,
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from datetime import datetime
import numpy as np
def count_trials(param_grid):
total_trials = 0
for k,v in param_grid.items():
total_trials += len(v)
return total_trials
# Load data
digits = datasets.load_digits()
X, y = digits.data, digits.target
print("")
print("Iris data set: ")
print("X: {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")
param_grid = {"max_depth": [3, None],
"max_features": ["auto"],
"min_samples_split": [2, 3,4,5,10,20],
"min_samples_leaf": [1, 3,4,6,10,20],
"bootstrap": [True],
"criterion": ["entropy"],
"n_estimators": [40, 80],
}
cv = 5
n_models = count_trials(param_grid)
print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs = GridSearchCV(estimator = RandomForestClassifier(),
param_grid=param_grid,
cv=cv,
refit=True,
scoring="accuracy",
n_jobs = -1)
gs.fit(X, y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
Недавно я обнаружил, что он был расширен для использования ресурсов искрового кластера (библиотеки pyspark и spark_sklearn). Мне удалось настроить спарк-кластер с одним мастером и двумя рабочими. Приведенный ниже код выполняет ту же задачу, что и раньше, но с использованием ресурсов кластера искры.
from sklearn import datasets
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV as SKGridSearchCV
from spark_sklearn import GridSearchCV as SparkGridSearchCV
from pyspark import SparkConf, SparkContext
from datetime import datetime
import numpy as np
def get_context():
sc_conf = SparkConf()
sc_conf.setAppName("test-sklearn-spark-app")
sc_conf.setMaster('spark://<master-IP>:7077')
sc_conf.set('spark.cores.max', '40')
sc_conf.set('spark.logConf', True)
print(sc_conf.getAll())
return SparkContext(conf=sc_conf)
def count_trials(param_grid):
total_trials = 0
for k,v in param_grid.items():
total_trials += len(v)
return total_trials
# Load data
digits = datasets.load_digits()
X, y = digits.data, digits.target
print("")
print("Iris data set: ")
print("X: {}".format(X.shape))
print("labels: {}".format(np.unique(y)))
print("")
param_grid = {"max_depth": [3, None],
"max_features": ["auto"],
"min_samples_split": [2, 3,4,5,10,20],
"min_samples_leaf": [1, 3,4,6,10,20],
"bootstrap": [True],
"criterion": ["entropy"],
"n_estimators": [40, 80],
}
cv = 5
n_models = count_trials(param_grid)
print("trying {} models, with CV = {}. A total of {} fits.".format(n_models,cv,n_models*cv))
sc = get_context()
gs = SparkGridSearchCV(sc = sc,
estimator = RandomForestClassifier(),
param_grid=param_grid,
cv=cv,
refit=True,
scoring="accuracy",
n_jobs = -1)
start_time = datetime.now()
print("Starting at {}".format(start_time))
gs.fit(X, y)
end_time = datetime.now()
print("Ending at {}".format(end_time))
print("\n total time = {}\n".format(end_time - start_time))
Где master-IP - это IP-адрес главного узла. Код работает отлично, используя все ресурсы, доступные в кластере spark.
Затем я настроил кластер kubernetes с одним главным и одним подчиненными узлами. Затем я запустил тот же код, что и раньше, но заменил строку на
sc_conf.setMaster('spark://<master-IP>:7077')
на
sc_conf.setMaster('k8s://<master-IP>:<PORT>')
, где master-IP и PORT - это те, которые я получаю, запустив команду наглавный узел,
kubectl cluster-info
Дело в том, что мой код больше не работает. Он отображает следующие сообщения об ошибках:
19/11/07 12:57:32 ERROR Utils: Uncaught exception in thread kubernetes-executor-snapshots-subscribers-1
org.apache.spark.SparkException: Must specify the executor container image
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep$$anonfun$5.apply(BasicExecutorFeatureStep.scala:40)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.deploy.k8s.features.BasicExecutorFeatureStep.<init>(BasicExecutorFeatureStep.scala:40)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder$$anonfun$$lessinit$greater$default$1$1.apply(KubernetesExecutorBuilder.scala:26)
at org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder.buildFromFeatures(KubernetesExecutorBuilder.scala:43)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$$onNewSnapshots$1.apply$mcVI$sp(ExecutorPodsAllocator.scala:133)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsAllocator$$onNewSnapshots(ExecutorPodsAllocator.scala:126)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$start$1.apply(ExecutorPodsAllocator.scala:68)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anonfun$org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:102)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$$callSubscriber(ExecutorPodsSnapshotsStoreImpl.scala:99)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anonfun$addSubscriber$1.apply$mcV$sp(ExecutorPodsSnapshotsStoreImpl.scala:71)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$$anon$1.run(ExecutorPodsSnapshotsStoreImpl.scala:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Кажется, что он говорит, что я должен указать образ докера, но я не знаю, как мне это сделать.
Кто-нибудьесть опыт с этим? Я искал в Интернете, но ответов нет.
Заранее спасибо,
Ale