Сделайте для l oop с фреймами данных быстрее pyspark - PullRequest
0 голосов
/ 09 мая 2020

Мне нужно выполнить итерацию по фрейму данных элементов - точнее по вершинам, но это не имеет особого значения - со следующим заголовком [vertexId, userName, communityId] - где communityId - это просто метка для данной вершины, выполняющая ряд операций с фреймом в текущей строке на каждом шаге:

verticesIdsList = [row['id'] for row in vertices.select('id').collect()]

for vertexId in verticesIds:

    allCommunitiesDf = vertices.select('communityId').distinct()
    vertices2CommunitiesDf = verticesHelper.select('id', 'communityId')
    verticesIdsDf = verticesHelper.select('id')

    kInnerDf = vertices2CommunitiesDf \
                .join(aij, [aij.dst == vertices2CommunitiesDf.id]) \
                .where(aij.src == vertexId) \
                .groupBy('src', 'communityId').sum('weight')

   # compute some other params similiar to kInnerDf
   # change the communityId label for the current vertex according to a formula 
   # based on the above computed params
   # take into account THAT kInnerDF AND THE OTHER PARAMS NEED TO KNOW THE UPDATED LABELS FOR EACH VERTEX

Хотя набор данных содержит не более 500 элементов (поэтому он действительно мал), и я не использую udfs - просто простые фреймы данных - процесс очень медленный - он занимает более 15 минут. Он работает хуже, чем непараллельный!

Когда я удаляю for l oop и использую вместо него udf - все это выполняется за секунды. Почему я не использую метод udf? Потому что мне нужно повторно вычислить мои параметры в соответствии со всеми метками communityId вершин, а метки communityId меняются по мере обработки каждой вершины.

Это конфигурация, которая дает наилучшие результаты (поскольку набор данных невелик, Я понял, что мне нужно меньше ресурсов, чтобы избежать накладных расходов):

customConfig = pyspark.SparkConf() \
.setAll([('spark.executor.memory', '2g'), \
    ('spark.sql.shuffle.partitions', '1'), \
    ('spark.default.parallelism', '1'), \
    ('spark.executor.cores', '1'), \
    ('spark.cores.max', '1'), \
    ('spark.driver.memory','50g'), \
    ('spark.sql.crossJoin.enabled', True)])

Как я могу заставить этот код работать быстрее? Почему так медленно?

...