Мы реализуем задачу, в которой мы должны проанализировать набор данных.
Для некоторого фона, набор данных представляет собой большое количество обзоров в следующем формате:
"review_id", "user_id", "business_id", "review_text", "review_date"
, и это наша текущая искра conig
conf = SparkConf().setAppName(APP_NAME).setMaster("local")
conf.set("spark.driver.maxResultSize", "16g")
conf.set("spark.executor.memory", "16g")
sc = SparkContext(conf=conf)
Наш задача состоит в том, чтобы найти полярность (положительность / отрицательность) каждого обзора, используя Spark RDDs . Эта часть выполняется с помощью .map()
для создания кортежей в формате
(business_id, review_text)
, за которым следует .map()
, который преобразует каждый кортеж в
(business_id, polarity_score)
Когда это применяется к весь набор данных (RDD), мы можем взять образец и увидеть структуру данных на данный момент. Для нашего набора данных это около 870 000 строк.
[
('TvVX8dHRKOwYCCAZaqysXw', 3),
('wIGw7QrBwYdYeCfdn1qTLA', -2),
... 400 more
]
Для тестирования мы запускаем небольшие части набора данных (0,0005) со следующим кодом. Это дает нам подмножество около 400 отзывов.
subset_of_reviews_decoded = reviews_decoded.sample(False, 0.0005, None)
Если мы продолжим работу с логикой c, описанной выше, мы запустим следующий код и напечатаем 10 из них, чтобы проверить, работает ли он должным образом.
# Format = (business_id, value_of_words)
to_values = filtered_reviews.mapValues(lambda words: convert_to_values(words))
print(to_values.take(10))
[
('TvVX8dHRKOwYCCAZaqysXw', 3),
('-Kotg2VsVsU5CSjVx3tmmg', 7),
('tSrkbnxjwNTVqQ0l5BaDNw', 21),
... 7 more
]
Мы достигли этого шага всего за пару секунд времени выполнения, поэтому мы довольны этим. Следующим шагом будет .reduceByKey()
, так как мы хотим объединить значения для всех равных ключей. Все наши исследования показывают, что это правильный способ сделать это, и в документации для pyspark приведена точная проблема в качестве примера.
Итак, помня об этом, мы написали следующий код:
# Format = ( unique_business_id, sum_value_of_words)
reduced = to_values.reduceByKey(lambda x, y: x + y)
print(reduced.take(10))
Вот здесь и проблема арисов. Код делает правильные вещи, но время выполнения огромно. Как видно ниже, каждая задача исполнителя занимает приблизительно 750 мс , и в общей сложности 33 задачи. Это не так уж плохо, но это при работе на подмножестве размером 0,0005
20/03/29 21:17:54 INFO TaskSetManager: Finished task 21.0 in stage 3.0 (TID 24) in 749 ms on localhost (executor driver) (22/33)
Если мы увеличим подмножество до 0,0010 (double) == 750 мс
20/03/29 21:21:44 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 9) in 767 ms on localhost (executor driver) (7/33)
Если увеличить подмножество до 0,0020 (double) == 1100ms
20/03/29 21:22:27 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 5) in 1196 ms on localhost (executor driver) (3/33)
Если мы увеличим подмножество до 0,0100 (пятерка) == 3400 мс
20/03/29 21:24:46 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 5) in 3386 ms on localhost (executor driver) (3/33)
Мы наконец достигли 1% набора данных, а затем каждый шаг занимает 3400ms . Мы считаем, что наш набор данных из 870 000 строк довольно мал по сравнению с реальными большими данными, и мы не можем видеть, как наш простой метод reduceByKey()
с аккумулятором add
может использовать (3,4se c * 33 шага) 112 секунд при работе с 1% этого небольшого набора данных.
Мы делаем что-то ужасно неправильно, применяя здесь reduceByKey()
?
PS: использование Dataframes специально не разрешено