Мне нужно выполнить итерацию по фрейму данных элементов - точнее по вершинам, но это не имеет особого значения - со следующим заголовком [vertexId, userName, communityId]
- где communityId - это просто метка для данной вершины, выполняющая ряд операций с фреймом в текущей строке на каждом шаге:
verticesIdsList = [row['id'] for row in vertices.select('id').collect()]
for vertexId in verticesIds:
allCommunitiesDf = vertices.select('communityId').distinct()
vertices2CommunitiesDf = verticesHelper.select('id', 'communityId')
verticesIdsDf = verticesHelper.select('id')
kInnerDf = vertices2CommunitiesDf \
.join(aij, [aij.dst == vertices2CommunitiesDf.id]) \
.where(aij.src == vertexId) \
.groupBy('src', 'communityId').sum('weight')
# compute some other params similiar to kInnerDf
# change the communityId label for the current vertex according to a formula
# based on the above computed params
# take into account THAT kInnerDF AND THE OTHER PARAMS NEED TO KNOW THE UPDATED LABELS FOR EACH VERTEX
Хотя набор данных содержит не более 500 элементов (поэтому он действительно мал), и я не использую udfs - просто простые фреймы данных - процесс очень медленный - он занимает более 15 минут. Он работает хуже, чем непараллельный!
Когда я удаляю for l oop и использую вместо него udf - все это выполняется за секунды. Почему я не использую метод udf? Потому что мне нужно повторно вычислить мои параметры в соответствии со всеми метками communityId вершин, а метки communityId меняются по мере обработки каждой вершины.
Это конфигурация, которая дает наилучшие результаты (поскольку набор данных невелик, Я понял, что мне нужно меньше ресурсов, чтобы избежать накладных расходов):
customConfig = pyspark.SparkConf() \
.setAll([('spark.executor.memory', '2g'), \
('spark.sql.shuffle.partitions', '1'), \
('spark.default.parallelism', '1'), \
('spark.executor.cores', '1'), \
('spark.cores.max', '1'), \
('spark.driver.memory','50g'), \
('spark.sql.crossJoin.enabled', True)])
Как я могу заставить этот код работать быстрее? Почему так медленно?