Custom Partitioner для перекошенных данных - PullRequest
0 голосов
/ 25 мая 2019

У меня есть целый мир данных некоторого варианта использования.Каждая страна содержит от 3 до 5 продуктов, которые собираются каждый час для каждого пользователя.Я хочу сделать начальную загрузку, чтобы рассчитать некоторые средние и другие коэффициенты для каждой страны на продукт в час.

input.rdd.map(
      row => (
        (country, product, hour),
        (country, product, hour, user, rating)
        )
    )
val groups = keyGroup.groupByKey()
val output = groups.flatMapValues(x => bootstrap(x)).toDF

Проблема в том, что в некоторых странах данные достаточно велики, что приводит к тому, что весь процесс занимает часы и все еще не завершен.Я пытаюсь получить размер, который примерно равен:

Partition:count ->Countries

0: 2044816 -> India,Turkey

1: 1466790 -> Turkey,India

2: 783772 -> India,Mexico,Japan,South Korea

3: 431538 -> Japan,Mexico,South Korea,India,Indonesia,Turkey,Brazil,Russian Federation

4: 319824 -> South Korea,Brazil,Russian Federation,India,Mexico,United States of America,Turkey,Japan,Bangladesh

5: 268698 -> Bangladesh,Nigeria,Russian Federation,United States of America

6: 264709 -> Russian Federation,United States of America,Germany,Bangladesh,Nigeria,South Africa

7: 227612 -> South Africa,United States of America,Russian Federation,Brazil,South Korea,Germany
...
...
167: 58 -> Mexico,Chile,Uganda,Thailand,Ivory Coast,Antigua and Barbuda,Palau,Luxembourg,United States of America,British Virgin Islands,Iceland,Andorra,Samoa,Vanuatu,Botswana,Saint Lucia,Kiribati,Greenland

168: 69 -> Greenland,Iceland,Chile,Zambia,Estonia,Vanuatu,Cyprus,Malta,Saudi Arabia,Japan,Uruguay,Qatar,United States of America,Luxembourg,Peru,Belize,Papua New Guinea,Samoa,South Sudan

169: 61 -> Myanmar,Belize,Chile,Somalia,Bhutan,Luxembourg,Liberia,Norway,United Kingdom,Burkina Faso,Lithuania,Macedonia,Belgium,Vanuatu,Burundi,DR Congo,Montenegro,Central African Republic,Bosnia and Herzegovina

170: 36 -> Mauritania,Sierra Leone,Hungary,Zambia,Somalia,Federated States of Micronesia,Serbia,Liberia,Nepal,Chile,Israel,Ukraine,Montenegro,Yemen,Croatia,Central African Republic,Armenia,Andorra,United Arab Emirates,Mauritius,Albania,Lebanon,Macedonia

171: 25 -> Spain,Comoros,Libya,Peru,Latvia,Montenegro,Egypt,Malaysia,Central African Republic,Faroe Islands,Tanzania,Palau,Chad,Guatemala,Kiribati,Burundi,Luxembourg,Equatorial Guinea,Barbados,Belgium

172: 14 -> Vietnam,Tanzania,Hungary,Egypt,Comoros,Equatorial Guinea,Guinea-Bissau,Moldova,Macedonia,Guyana,Federated States of Micronesia,New Zealand,Chad

Видно, что данные не разделены равномерно, и у них есть 173 разделов.Данные около 6 ГБ, которые содержат данные за неделю.Если я пытаюсь запустить единственную страну, выполнив перераспределение 1000 , это работает, но вместе это не работает.

Я думаю написать пользовательский разделитель , но я понятия не имею, как мне разбить эти данные для подсчета больших стран.Было бы здорово, если бы кто-нибудь смог мне помочь.

1 Ответ

0 голосов
/ 26 мая 2019

Прежде чем идти по пути написания собственного пользовательского разделителя, вы можете попробовать это: поскольку вы уже знаете, в каких странах имеются искаженные данные, вы можете создать составной ключ со случайно добавленными числами в некотором диапазоне (более широкий диапазон для более искаженных данных)для указанных стран.Вы можете агрегировать по указанному ключу, а затем отбрасывать составной ключ и агрегировать далее.

df.withColumn("composite_key", 
    when(isSkewDataCountryUDF(col("country")), concat(col("country"), randomNumberSuffix())
    .otherwise(col("country")))
.groupBy("composite_key")
.count
.drop("composite_key")
.groupBy("country")
.count

Также попробуйте установить намного более высокие значения spark.default.parallelism и spark.sql.shuffle.partitions

.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...