Многопроцессорная обработка в наборе данных из pyspark возвращает ошибку JVM - PullRequest
0 голосов
/ 02 апреля 2019

Мне нужно запустить несколько алгоритмов кластеризации в ноутбуке Jupyter параллельно. Функция кластеризации, которую я хочу, чтобы параллельная работа работала при выполнении многопоточности или при индивидуальном запуске Тем не менее, он возвращает

повышение Py4JError ("{0} не существует в JVM" .format (name))

при попытке многопроцессорной обработки. У меня нет большого опыта работы с мультипроцессорами, что я могу делать не так?

Код для кластеризации:

def clustering(ID, df):
    pandas_df = df.select("row", "features", "type") \
    .where(df.type == ID).toPandas()

    print("process " + str(ID) + ": preparing data for clustering")
    feature_series = pandas_df["features"].apply(lambda x: x.toArray())
    objs = [pandas_df, pd.DataFrame(feature_series.tolist())]
    t_df = pd.concat(objs, axis=1)

    print("process " + str(ID) + ": initiating clustering")
    c= #clustering algo here
    print("process " + str(ID) + " DONE!")

    return

Код для многопроцессорной обработки:

import multiprocessing as mp

k = 4

if __name__ == '__main__':
    pl = []
    for i in range(0,k):
        print("sending process:", i)
        process = mp.Process(target=clustering, args=(i, df))
        jobs.append(process)
        process.start()

    for process in pl:
        print("waiting for join from process")
        process.join()

1 Ответ

0 голосов
/ 03 апреля 2019

Ошибка была вызвана тем, что подпроцессам не удалось получить доступ к той же памяти (в которой находился фрейм данных pyspark).

Решено путем предварительного разбиения набора данных путем помещения доступа к фрейму данных pyspark в другую функцию, напримерИтак:

    pandas_df = df.select("row", "features", "type") \
    .where(df.type == ID).toPandas()

И затем запустить кластеризацию на отдельных фреймах данных Pandas.

...