Кластеризация MultiGPU Kmeans с RAPID зависает - PullRequest
3 голосов
/ 06 марта 2020

Я новичок в Python и Rapids.AI и пытаюсь воссоздать SKLearn KMeans в многоузловом графическом процессоре (у меня есть 2 графических процессора) с использованием Dask и RAPID (я использую пороги с его docker, который монтирует Jupyter Notebook тоже).

Код, который я показываю ниже (также я показываю пример набора данных Iris), зависает, и ячейка ноутбука jupyter никогда не заканчивается. Я пытался использовать клавишу %debug magi c, а также приборную панель Dask, но я не сделал каких-либо четких выводов (я думаю, что это единственный вывод из-за device_m_csv.iloc, но я не уверен в этом). Другая вещь, которая может быть, это то, что я забыл некоторые wait() или compute() или persistent() (на самом деле, я не уверен, в каких случаях они должны использоваться правильно).

Я объясню код, для лучшего чтения:

  • Прежде всего, выполните необходимый импорт
  • Далее начинается с алгоритма KMeans (разделитель: ############## ######### ...)
  • Создайте кластер CUDA с 2 работниками, по одному на GPU (у меня 2 GPU) и 1 поток для работника (я читал, это рекомендуемое значение ) и запустить клиент
  • Считать набор данных из CSV, сделав 2 раздела (chunksize = '2kb')
  • Разделить предыдущий набор данных на данные (более известные как X) и метки ((более известные как y)
  • Создание cu_KСредства с использованием Dask
  • Подгонка модели
  • Прогноз значений
  • Проверка полученной оценки

Извините за то, что я не смог предложить больше данных, но я не смог их получить. Что бы ни понадобилось, чтобы разрешить сомнение, я буду рад предложить это.

Где или что Можете ли вы подумать, что проблема? ..

Заранее большое спасибо.

%%time

# Import libraries and show its versions
import numpy as np; print('NumPy Version:', np.__version__)
import pandas as pd; print('Pandas Version:', pd.__version__)
import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
import nvstrings, nvcategory
import cupy; print('cuPY Version:', cupy.__version__)
import cudf; print('cuDF Version:', cudf.__version__)
import cuml; print('cuML Version:', cuml.__version__)
import dask; print('Dask Version:', dask.__version__)
import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
import seaborn as sns; print('SeaBorn Version:', sns.__version__)
#import timeimport warnings

from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, wait
from dask_ml.cluster import KMeans as skmKMeans
from dask_cuda import LocalCUDACluster

from sklearn import metrics
from sklearn.cluster import KMeans as skKMeans
from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
from cuml.cluster import KMeans as cuKMeans
from cuml.dask.cluster.kmeans import KMeans as cumKMeans
from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score

# Configure matplotlib library
import matplotlib.pyplot as plt
%matplotlib inline

# Configure seaborn library
sns.set()
#sns.set(style="white", color_codes=True)
%config InlineBackend.figure_format = 'svg'

# Configure warnings
#warnings.filterwarnings("ignore")


####################################### KMEANS #############################################################
# Create local cluster
cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)

# Identify number of workers
n_workers = len(client.has_what().keys())

# Read data in host memory
device_m_csv = dask_cudf.read_csv('./DataSet/iris.csv', header = 0, delimiter = ',', chunksize='2kB') # Get complete CSV. Chunksize is 2kb for getting 2 partitions
#x = host_data.iloc[:, [0,1,2,3]].values
device_m_data = device_m_csv.iloc[:, [0, 1, 2, 3]] # Get data columns
device_m_labels = device_m_csv.iloc[:, 4] # Get labels column

# Plot data
#sns.pairplot(device_csv.to_pandas(), hue='variety');

# Define variables
label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

# Create KMeans
cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(device_m_labels.unique()),
                     oversampling_factor = 40,
                     random_state = 0)
# Fit data in KMeans
cu_m_kmeans.fit(device_m_data)

# Predict data
cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

# Check score
#print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
#print('adjusted_rand_score: ', sk_adjusted_rand_score(device_m_labels, cu_m_kmeans.labels_))
#print('silhouette_score: ', sk_silhouette_score(device_m_data.to_pandas(), cu_m_kmeans_labels_predicted))

# Close local cluster
client.close()
cluster.close()

Пример набора данных Iris:

IrisDatasetExample

РЕДАКТИРОВАТЬ 1

@ Кори, это мой вывод, используя ваш код:

NumPy Version: 1.17.5
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.1
cuPY Version: 6.7.0
cuDF Version: 0.12.0
cuML Version: 0.12.0
Dask Version: 2.10.1
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.12.0
MatPlotLib Version: 3.1.3
SeaBorn Version: 0.10.0
Cluster centers:
           0         1         2         3
0  5.006000  3.428000  1.462000  0.246000
1  5.901613  2.748387  4.393548  1.433871
2  6.850000  3.073684  5.742105  2.071053
adjusted_rand_score:  0.7302382722834697
silhouette_score:  0.5528190123564102

Ответы [ 2 ]

3 голосов
/ 06 марта 2020

Я немного изменил ваш воспроизводимый пример и смог произвести вывод для самой последней ночной RAPIDS.

Это вывод скрипта.

(cuml_dev_2) cjnolet@deeplearn ~ $ python ~/kmeans_mnmg_reproduce.py 
NumPy Version: 1.18.1
Pandas Version: 0.25.3
Scikit-Learn Version: 0.22.2.post1
cuPY Version: 7.2.0
cuDF Version: 0.13.0a+3237.g61e4d9c
cuML Version: 0.13.0a+891.g4f44f7f
Dask Version: 2.11.0+28.g10db6ba
DaskCuda Version: 0+unknown
DaskCuDF Version: 0.13.0a+3237.g61e4d9c
MatPlotLib Version: 3.2.0
SeaBorn Version: 0.10.0
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/dask/array/random.py:27: FutureWarning: dask.array.random.doc_wraps is deprecated and will be removed in a future version
  FutureWarning,
/share/software/miniconda3/envs/cuml_dev_2/lib/python3.7/site-packages/distributed/dashboard/core.py:79: UserWarning: 
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
bokeh.server.util - WARNING - Host wildcard '*' will allow connections originating from multiple (or possibly all) hostnames or IPs. Use non-wildcard values to restrict access explicitly
Cluster centers:
           0         1         2         3
0  5.883607  2.740984  4.388525  1.434426
1  5.006000  3.428000  1.462000  0.246000
2  6.853846  3.076923  5.715385  2.053846
adjusted_rand_score:  0.7163421126838475
silhouette_score:  0.5511916046195927

А вот модифицированный скрипт, который выдает этот вывод:

    # Import libraries and show its versions
    import numpy as np; print('NumPy Version:', np.__version__)
    import pandas as pd; print('Pandas Version:', pd.__version__)
    import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
    import nvstrings, nvcategory
    import cupy; print('cuPY Version:', cupy.__version__)
    import cudf; print('cuDF Version:', cudf.__version__)
    import cuml; print('cuML Version:', cuml.__version__)
    import dask; print('Dask Version:', dask.__version__)
    import dask_cuda; print('DaskCuda Version:', dask_cuda.__version__)
    import dask_cudf; print('DaskCuDF Version:', dask_cudf.__version__)
    import matplotlib; print('MatPlotLib Version:', matplotlib.__version__)
    import seaborn as sns; print('SeaBorn Version:', sns.__version__)
    #import timeimport warnings

    from dask import delayed
    import dask.dataframe as dd
    from dask.distributed import Client, LocalCluster, wait
    from dask_ml.cluster import KMeans as skmKMeans
    from dask_cuda import LocalCUDACluster

    from sklearn import metrics
    from sklearn.cluster import KMeans as skKMeans
    from sklearn.metrics import adjusted_rand_score as sk_adjusted_rand_score, silhouette_score as sk_silhouette_score
    from cuml.cluster import KMeans as cuKMeans
    from cuml.dask.cluster.kmeans import KMeans as cumKMeans
    from cuml.metrics import adjusted_rand_score as cu_adjusted_rand_score
    # Configure matplotlib library
    import matplotlib.pyplot as plt

    # Configure seaborn library
    sns.set()
    #sns.set(style="white", color_codes=True)
    # Configure warnings
    #warnings.filterwarnings("ignore")


    ####################################### KMEANS #############################################################
    # Create local cluster
    cluster = LocalCUDACluster(n_workers=2, threads_per_worker=1)
    client = Client(cluster)

    # Identify number of workers
    n_workers = len(client.has_what().keys())

    # Read data in host memory
    from sklearn.datasets import load_iris

    loader = load_iris()

    #x = host_data.iloc[:, [0,1,2,3]].values
    device_m_data = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.data)), npartitions=2) # Get data columns
    device_m_labels = dask_cudf.from_cudf(cudf.from_pandas(pd.DataFrame(loader.target)), npartitions=2)

    # Plot data
    #sns.pairplot(device_csv.to_pandas(), hue='variety');

    # Define variables
    label_type = { 'Setosa': 1, 'Versicolor': 2, 'Virginica': 3 } # Dictionary of variables type

    # Create KMeans
    cu_m_kmeans = cumKMeans(init = 'k-means||',
                     n_clusters = len(np.unique(loader.target)),
                     oversampling_factor = 40,
                     random_state = 0)
    # Fit data in KMeans
    cu_m_kmeans.fit(device_m_data)

    # Predict data
    cu_m_kmeans_labels_predicted = cu_m_kmeans.predict(device_m_data).compute()

    # Check score
    print('Cluster centers:\n',cu_m_kmeans.cluster_centers_)
    print('adjusted_rand_score: ', sk_adjusted_rand_score(loader.target, cu_m_kmeans_labels_predicted.values.get()))
    print('silhouette_score: ', sk_silhouette_score(device_m_data.compute().to_pandas(), cu_m_kmeans_labels_predicted))

    # Close local cluster
    client.close()
    cluster.close()

Можете ли вы предоставить выходные данные для версий этих библиотек? Я бы порекомендовал также запустить модифицированный скрипт и посмотреть, будет ли он работать успешно для вас. Если нет, то мы можем углубиться, чтобы выяснить, связано ли это с Docker, версией RAPIDS или чем-то еще.

Если у вас есть доступ к командной строке, в которой запущена записная книжка Jupyter, может быть полезно включить ведение журнала, передавая verbose=True при создании объекта KMeans. Это может помочь нам определить, где что-то застревает.

1 голос
/ 12 марта 2020

Документация Dask действительно хороша и обширна, хотя я допускаю, что иногда гибкость и количество функций, если они предоставляются, могут быть немного подавляющими. Я думаю, что это помогает видеть Dask как API для распределенных вычислений, который дает пользователю контроль над несколькими различными уровнями выполнения, каждый уровень обеспечивает более детальный контроль.

compute(), wait() и persist() являются понятиями, которые исходят из способа, которым задачи, которые лежат в основе серии распределенных вычислений, планируются для ряда рабочих. Общим для всех этих вычислений является граф выполнения, представляющий удаленные задачи и их взаимозависимости. В какой-то момент этот график выполнения запланирован для нескольких рабочих. Dask предоставляет два API-интерфейса в зависимости от того, планируются ли задачи, лежащие в основе графика, немедленно (с нетерпением) или от необходимости выполнения вычислений вручную (лениво).

Оба эти API строят график выполнения по мере создания задач, которые зависят от результатов других задач. Первый использует dask.futures API для немедленного асинхронного выполнения, результаты которого иногда могут потребоваться для wait() перед выполнением других операций. API dask.delayed используется для отложенных исполнений и требует вызова методов, подобных compute() или persist(), чтобы начать вычисления.

Чаще всего пользователи библиотек, таких как RAPIDS, больше озабочены манипулированием своими данные и не так обеспокоены тем, как эти манипуляции запланированы на наборе работников. Объекты dask.dataframe и dask.array построены поверх API delayed и futures. Большинство пользователей взаимодействуют с этими структурами данных, а не взаимодействуют с объектами delayed и futures, но неплохо бы знать о них, если вам когда-нибудь понадобится выполнить некоторые преобразования данных, помимо распределенных dataframe и array объекты предоставляют.

dask.dataframe и dask.array по возможности строят ленивые графы выполнения и предоставляют метод compute() для материализации графа и возврата результата клиенту. Они оба также предоставляют persist() метод для асинхронного запуска вычислений в фоновом режиме. wait() полезно, если вы хотите начать вычисления в фоновом режиме, но не хотите возвращать результаты клиенту.

Надеюсь, это полезно.

...