Spark: медленная реализация reduByKey () - PullRequest
0 голосов
/ 29 марта 2020

Мы реализуем задачу, в которой мы должны проанализировать набор данных.

Для некоторого фона, набор данных представляет собой большое количество обзоров в следующем формате:

"review_id", "user_id", "business_id", "review_text", "review_date"

, и это наша текущая искра conig

conf = SparkConf().setAppName(APP_NAME).setMaster("local")
conf.set("spark.driver.maxResultSize", "16g")
conf.set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf)

Наш задача состоит в том, чтобы найти полярность (положительность / отрицательность) каждого обзора, используя Spark RDDs . Эта часть выполняется с помощью .map() для создания кортежей в формате

(business_id, review_text)

, за которым следует .map(), который преобразует каждый кортеж в

(business_id, polarity_score)

Когда это применяется к весь набор данных (RDD), мы можем взять образец и увидеть структуру данных на данный момент. Для нашего набора данных это около 870 000 строк.

[
   ('TvVX8dHRKOwYCCAZaqysXw', 3), 
   ('wIGw7QrBwYdYeCfdn1qTLA', -2),
   ... 400 more
]

Для тестирования мы запускаем небольшие части набора данных (0,0005) со следующим кодом. Это дает нам подмножество около 400 отзывов.

subset_of_reviews_decoded = reviews_decoded.sample(False, 0.0005, None)

Если мы продолжим работу с логикой c, описанной выше, мы запустим следующий код и напечатаем 10 из них, чтобы проверить, работает ли он должным образом.

# Format = (business_id, value_of_words)
  to_values = filtered_reviews.mapValues(lambda words: convert_to_values(words))
  print(to_values.take(10))
[
   ('TvVX8dHRKOwYCCAZaqysXw', 3), 
   ('-Kotg2VsVsU5CSjVx3tmmg', 7), 
   ('tSrkbnxjwNTVqQ0l5BaDNw', 21), 
   ... 7 more
]

Мы достигли этого шага всего за пару секунд времени выполнения, поэтому мы довольны этим. Следующим шагом будет .reduceByKey(), так как мы хотим объединить значения для всех равных ключей. Все наши исследования показывают, что это правильный способ сделать это, и в документации для pyspark приведена точная проблема в качестве примера.

Итак, помня об этом, мы написали следующий код:

# Format = ( unique_business_id, sum_value_of_words)
  reduced = to_values.reduceByKey(lambda x, y: x + y)
  print(reduced.take(10))

Вот здесь и проблема арисов. Код делает правильные вещи, но время выполнения огромно. Как видно ниже, каждая задача исполнителя занимает приблизительно 750 мс , и в общей сложности 33 задачи. Это не так уж плохо, но это при работе на подмножестве размером 0,0005

20/03/29 21:17:54 INFO TaskSetManager: Finished task 21.0 in stage 3.0 (TID 24) in 749 ms on localhost (executor driver) (22/33)

Если мы увеличим подмножество до 0,0010 (double) == 750 мс

20/03/29 21:21:44 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 9) in 767 ms on localhost (executor driver) (7/33)

Если увеличить подмножество до 0,0020 (double) == 1100ms

20/03/29 21:22:27 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 5) in 1196 ms on localhost (executor driver) (3/33)

Если мы увеличим подмножество до 0,0100 (пятерка) == 3400 мс

20/03/29 21:24:46 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 5) in 3386 ms on localhost (executor driver) (3/33)

Мы наконец достигли 1% набора данных, а затем каждый шаг занимает 3400ms . Мы считаем, что наш набор данных из 870 000 строк довольно мал по сравнению с реальными большими данными, и мы не можем видеть, как наш простой метод reduceByKey() с аккумулятором add может использовать (3,4se c * 33 шага) 112 секунд при работе с 1% этого небольшого набора данных.

Мы делаем что-то ужасно неправильно, применяя здесь reduceByKey()?

PS: использование Dataframes специально не разрешено

...