Есть ли способ параллельно выполнять манипуляции с многораздельными наборами искровых данных? - PullRequest
1 голос
/ 02 июля 2019

У меня есть список наборов данных, которые я хочу разделить по определенному ключу, который является общим для всех моих наборов данных, а затем выполнить несколько объединений / группировок, которые одинаковы для всех разделенных наборов данных.

Я пытаюсь разработать алгоритм таким образом, чтобы я использовал Spark's partitionBy для создания раздела по определенному ключу.

Теперь один из способов - запускать операции над каждым разделом в цикле, но это неэффективно.

Я хотел посмотреть, разделил ли я данные вручную, могу ли я параллельно выполнять операции с этими наборами данных.

Я только начал изучать Spark, так что прости меня, если это наивный вопрос.

Рассмотрим набор данных идентификаторов клиентов и их поведенческие данные, такие как просмотр / клики и т. Д. В разных наборах данных. Скажем, один для просмотра, другой для кликов. Сначала я думаю о разделении моих данных по идентификаторам клиентов, а затем для каждого раздела (клиента) присоединяюсь к какому-либо атрибуту, например, к браузеру или устройству, чтобы посмотреть, как ведет себя каждый клиент. В общем, это похоже на вложенное распараллеливание.

Это вообще возможно в Spark? Есть что-то очевидное, чего мне не хватает? К какой документации я могу обратиться?

1 Ответ

0 голосов
/ 02 июля 2019

Попробуйте это -

1. Create test dataset (Totol Record = 70000+) to perform parallel operation on each 

scala> ds.count
res137: Long = 70008

scala> ds.columns
res124: Array[String] = Array(awards, country)

2. Assume partition column as "country".

scala> ds.select("country").distinct.show(false)
+-------+
|country|
+-------+
|CANADA |
|CHINA  |
|USA    |
|EUROPE |
|UK     |
|RUSSIA |
|INDIA  |
+-------+

3. Get sum of records for each country [ **Without parallel process for each partition**]

scala> val countries = ds.select("country").distinct.collect
countries: Array[org.apache.spark.sql.Row] = Array([CANADA], [CHINA], [USA], [EUROPE], [UK], [RUSSIA], [INDIA])

scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562047887130

scala> countries.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))
+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CHINA  |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|USA    |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|UK     |10002|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|INDIA  |10001|
+-------+-----+


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562047896088

scala> println(s"Total Execution Time :  ${(endTime - startTime) / 1000} Seconds")
Total Execution Time :  **8 Seconds**

4. Get sum of records for each country [ **With parallel process for each partition**]

scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562048057431

scala> countries.par.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))

+-------+-----+
|country|count|
+-------+-----+
|INDIA  |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|USA    |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|UK     |10002|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CHINA  |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562048060273

scala> println(s"Total Execution Time :  ${(endTime - startTime) / 1000} Seconds")
Total Execution Time :  **2 Seconds**

Результат: -

With    parallel process on each partition, it took ~ **2 Seconds**
Without parallel process on each partition, it took ~ **8 Seconds**

Я проверил, чтобы проверить количество записей для каждой страны, вы можете выполнить любой процесс, например, запись в таблицу улья или файл hdfs и т. д.

Надеюсь, это полезно .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...